You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/08/13 23:56:45 UTC

[kudu] 02/02: KUDU-2612 p8: replay participant ops on bootstrap

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit a69a1d68e31e0c0eec9a47f7669098ca872baaa3
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Sun Aug 2 03:14:46 2020 -0700

    KUDU-2612 p8: replay participant ops on bootstrap
    
    This patch adds the ability to bootstrap a Tablet's TxnParticipant from
    its WALs. The replay is a bit crude, in that we'll always update state
    based on replicate/commit pairs, foregoing the usual state checks. This
    is acceptable because presumably this checking happened the first time
    the participant ops went through replication.
    
    This patch doesn't add any form of WAL anchoring. This will come in a
    separate patch.
    
    Change-Id: I199ed01c2244d16ed6fd7ded063e4c71f3c409ff
    Reviewed-on: http://gerrit.cloudera.org:8080/16304
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 .../integration-tests/txn_participant-itest.cc     | 93 +++++++++++++++++---
 src/kudu/tablet/ops/participant_op.cc              | 98 +++++++++++-----------
 src/kudu/tablet/ops/participant_op.h               | 15 ++++
 src/kudu/tablet/tablet_bootstrap.cc                | 25 ++++++
 src/kudu/tablet/tablet_replica.cc                  |  1 +
 src/kudu/tablet/txn_participant-test-util.h        |  1 +
 src/kudu/tablet/txn_participant-test.cc            | 15 ++++
 7 files changed, 188 insertions(+), 60 deletions(-)

diff --git a/src/kudu/integration-tests/txn_participant-itest.cc b/src/kudu/integration-tests/txn_participant-itest.cc
index c4639cd..44f8ae4 100644
--- a/src/kudu/integration-tests/txn_participant-itest.cc
+++ b/src/kudu/integration-tests/txn_participant-itest.cc
@@ -51,6 +51,7 @@
 DECLARE_bool(raft_enable_pre_election);
 DECLARE_double(leader_failure_max_missed_heartbeat_periods);
 DECLARE_int32(consensus_inject_latency_ms_in_notifications);
+DECLARE_int32(follower_unavailable_considered_failed_sec);
 DECLARE_int32(raft_heartbeat_interval_ms);
 
 using kudu::cluster::InternalMiniCluster;
@@ -116,6 +117,24 @@ class TxnParticipantITest : public KuduTest {
     w.StopAndJoin();
   }
 
+  // Quiesces servers such that the tablet server at index 'ts_idx' is set up
+  // to become leader. Returns a list of all TabletReplicas.
+  vector<TabletReplica*> SetUpLeaderGetReplicas(int ts_idx) {
+    vector<TabletReplica*> replicas;
+    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+      auto* ts = cluster_->mini_tablet_server(i);
+      const auto& tablets = ts->ListTablets();
+      CHECK_EQ(1, tablets.size());
+      scoped_refptr<TabletReplica> r;
+      CHECK(ts->server()->tablet_manager()->LookupTablet(tablets[0], &r));
+      replicas.emplace_back(r.get());
+      if (i != ts_idx) {
+        *ts->server()->mutable_quiescing() = true;
+      }
+    }
+    return replicas;
+  }
+
  protected:
   unique_ptr<InternalMiniCluster> cluster_;
 };
@@ -126,18 +145,7 @@ TEST_F(TxnParticipantITest, TestReplicateParticipantOps) {
   // Keep track of all the participant replicas, and quiesce all but one
   // tserver so we can ensure a specific leader.
   const int kLeaderIdx = 0;
-  vector<TabletReplica*> replicas;
-  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
-    auto* ts = cluster_->mini_tablet_server(i);
-    const auto& tablets = ts->ListTablets();
-    ASSERT_EQ(1, tablets.size());
-    scoped_refptr<TabletReplica> r;
-    ASSERT_TRUE(ts->server()->tablet_manager()->LookupTablet(tablets[0], &r));
-    replicas.emplace_back(r.get());
-    if (i != kLeaderIdx) {
-      *ts->server()->mutable_quiescing() = true;
-    }
-  }
+  vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
   ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
   // Try submitting the ops on all replicas. They should succeed on the leaders
   // and fail on followers.
@@ -182,6 +190,56 @@ TEST_F(TxnParticipantITest, TestReplicateParticipantOps) {
   }
 }
 
+// Test that participant ops are copied when performing a tablet copy,
+// resulting in identical transaction states on the new copy.
+TEST_F(TxnParticipantITest, TestCopyParticipantOps) {
+  NO_FATALS(SetUpTable());
+
+  constexpr const int kNumTxns = 10;
+  constexpr const int kLeaderIdx = 0;
+  constexpr const int kDeadServerIdx = kLeaderIdx + 1;
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+  vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+  auto* leader_replica = replicas[kLeaderIdx];
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(kTimeout));
+
+  // Apply some operations.
+  vector<TxnParticipant::TxnEntry> expected_txns;
+  for (int i = 0; i < kNumTxns; i++) {
+    for (const auto& op : kCommitSequence) {
+      vector<Status> statuses = RunOnReplicas({ leader_replica }, i, op);
+      for (const auto& s : statuses) {
+        SCOPED_TRACE(Substitute("Transaction $0, Op $1", i,
+                                ParticipantOpPB::ParticipantOpType_Name(op)));
+        ASSERT_OK(s);
+      }
+    }
+    expected_txns.emplace_back(
+        TxnParticipant::TxnEntry({ i, Txn::kCommitted, kDummyCommitTimestamp }));
+  }
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_EQ(expected_txns, replicas[i]->tablet()->txn_participant()->GetTxnsForTests());
+    });
+  }
+
+  // Set ourselves up to make a copy.
+  cluster_->mini_tablet_server(kDeadServerIdx)->Shutdown();
+  ASSERT_OK(cluster_->AddTabletServer());
+  FLAGS_follower_unavailable_considered_failed_sec = 1;
+
+  // Eventually, a copy should be made on the new server.
+  ASSERT_EVENTUALLY([&] {
+    auto* new_ts = cluster_->mini_tablet_server(cluster_->num_tablet_servers() - 1);
+    const auto& tablets = new_ts->ListTablets();
+    ASSERT_EQ(1, tablets.size());
+    scoped_refptr<TabletReplica> r;
+    ASSERT_TRUE(new_ts->server()->tablet_manager()->LookupTablet(tablets[0], &r));
+    ASSERT_OK(r->WaitUntilConsensusRunning(kTimeout));
+    ASSERT_EQ(expected_txns, r->tablet()->txn_participant()->GetTxnsForTests());
+  });
+}
+
 class TxnParticipantElectionStormITest : public TxnParticipantITest {
  public:
   void SetUp() override {
@@ -279,6 +337,17 @@ TEST_F(TxnParticipantElectionStormITest, TestFrequentElections) {
       ASSERT_EQ(expected_txns, replicas[i]->tablet()->txn_participant()->GetTxnsForTests());
     });
   }
+
+  // Now restart the cluster and ensure the transaction state is restored.
+  cluster_->Shutdown();
+  ASSERT_OK(cluster_->StartSync());
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    auto* ts = cluster_->mini_tablet_server(i);
+    const auto& tablets = ts->ListTablets();
+    scoped_refptr<TabletReplica> r;
+    ASSERT_TRUE(ts->server()->tablet_manager()->LookupTablet(tablets[0], &r));
+    ASSERT_EQ(expected_txns, r->tablet()->txn_participant()->GetTxnsForTests());
+  }
 }
 
 } // namespace itest
diff --git a/src/kudu/tablet/ops/participant_op.cc b/src/kudu/tablet/ops/participant_op.cc
index f84a833..c1a6d9e 100644
--- a/src/kudu/tablet/ops/participant_op.cc
+++ b/src/kudu/tablet/ops/participant_op.cc
@@ -31,8 +31,6 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/tablet/ops/op.h"
-#include "kudu/tablet/tablet.h"
-#include "kudu/tablet/tablet_replica.h"
 #include "kudu/tablet/txn_participant.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/pb_util.h"
@@ -55,15 +53,19 @@ class rw_semaphore;
 namespace tablet {
 
 ParticipantOpState::ParticipantOpState(TabletReplica* tablet_replica,
+                                       TxnParticipant* txn_participant,
                                        const tserver::ParticipantRequestPB* request,
                                        tserver::ParticipantResponsePB* response)
     : OpState(tablet_replica),
+      txn_participant_(txn_participant),
       request_(DCHECK_NOTNULL(request)),
       response_(response) {}
 
 void ParticipantOpState::AcquireTxnAndLock() {
+  DCHECK(!txn_lock_);
+  DCHECK(!txn_);
   int64_t txn_id = request_->op().txn_id();
-  txn_ = tablet_replica()->tablet()->txn_participant()->GetOrCreateTransaction(txn_id);
+  txn_ = txn_participant_->GetOrCreateTransaction(txn_id);
   txn_->AcquireWriteLock(&txn_lock_);
 }
 
@@ -83,43 +85,69 @@ string ParticipantOpState::ToString() const {
       ParticipantOpPB::ParticipantOpType_Name(request_->op().type()));
 }
 
-void ParticipantOp::NewReplicateMsg(unique_ptr<ReplicateMsg>* replicate_msg) {
-  replicate_msg->reset(new ReplicateMsg);
-  (*replicate_msg)->set_op_type(OperationType::PARTICIPANT_OP);
-  (*replicate_msg)->mutable_participant_request()->CopyFrom(*state()->request());
-  if (state()->are_results_tracked()) {
-    (*replicate_msg)->mutable_request_id()->CopyFrom(state()->request_id());
+Status ParticipantOpState::ValidateOp() const {
+  const auto& op = request()->op();
+  DCHECK(txn_);
+  switch (op.type()) {
+    case ParticipantOpPB::BEGIN_TXN:
+      return txn_->ValidateBeginTransaction();
+    case ParticipantOpPB::BEGIN_COMMIT:
+      return txn_->ValidateBeginCommit();
+    case ParticipantOpPB::FINALIZE_COMMIT:
+      return txn_->ValidateFinalize();
+    case ParticipantOpPB::ABORT_TXN:
+      return txn_->ValidateAbort();
+    case ParticipantOpPB::UNKNOWN:
+      return Status::InvalidArgument("unknown op type");
   }
+  return Status::OK();
 }
 
-Status ParticipantOp::Prepare() {
-  TRACE_EVENT0("op", "ParticipantOp::Prepare");
-  TRACE("PREPARE: Starting.");
-  state_->AcquireTxnAndLock();
-  const auto& op = state_->request()->op();
-  Txn* txn = state_->txn_.get();
-  DCHECK(txn);
+Status ParticipantOpState::PerformOp() {
+  const auto& op = request()->op();
+  Txn* txn = txn_.get();
+  Status s;
   switch (op.type()) {
+    // NOTE: these can currently never fail because we are only updating
+    // metadata. When we begin validating write ops before committing, we'll
+    // need to populate the response with errors.
     case ParticipantOpPB::BEGIN_TXN: {
-      RETURN_NOT_OK(txn->ValidateBeginTransaction());
+      txn->BeginTransaction();
       break;
     }
     case ParticipantOpPB::BEGIN_COMMIT: {
-      RETURN_NOT_OK(txn->ValidateBeginCommit());
+      txn->BeginCommit();
       break;
     }
     case ParticipantOpPB::FINALIZE_COMMIT: {
-      RETURN_NOT_OK(txn->ValidateFinalize());
+      txn->FinalizeCommit(op.finalized_commit_timestamp());
       break;
     }
     case ParticipantOpPB::ABORT_TXN: {
-      RETURN_NOT_OK(txn->ValidateAbort());
+      txn->AbortTransaction();
       break;
     }
     case ParticipantOpPB::UNKNOWN: {
       return Status::InvalidArgument("unknown op type");
     }
   }
+  return Status::OK();
+}
+
+void ParticipantOp::NewReplicateMsg(unique_ptr<ReplicateMsg>* replicate_msg) {
+  replicate_msg->reset(new ReplicateMsg);
+  (*replicate_msg)->set_op_type(OperationType::PARTICIPANT_OP);
+  (*replicate_msg)->mutable_participant_request()->CopyFrom(*state()->request());
+  if (state()->are_results_tracked()) {
+    (*replicate_msg)->mutable_request_id()->CopyFrom(state()->request_id());
+  }
+}
+
+Status ParticipantOp::Prepare() {
+  TRACE_EVENT0("op", "ParticipantOp::Prepare");
+  TRACE("PREPARE: Starting.");
+  state_->AcquireTxnAndLock();
+  RETURN_NOT_OK(state_->ValidateOp());
   TRACE("PREPARE: Finished.");
   return Status::OK();
 }
@@ -135,33 +163,7 @@ Status ParticipantOp::Start() {
 Status ParticipantOp::Apply(CommitMsg** commit_msg) {
   TRACE_EVENT0("op", "ParticipantOp::Apply");
   TRACE("APPLY: Starting.");
-  const auto& op = state_->request()->op();
-  Txn* txn = state_->txn_.get();
-  Status s;
-  switch (op.type()) {
-    // NOTE: these can currently never fail because we are only updating
-    // metadata. When we begin validating write ops before committing, we'll
-    // need to populate the response with errors.
-    case ParticipantOpPB::BEGIN_TXN: {
-      txn->BeginTransaction();
-      break;
-    }
-    case ParticipantOpPB::BEGIN_COMMIT: {
-      txn->BeginCommit();
-      break;
-    }
-    case ParticipantOpPB::FINALIZE_COMMIT: {
-      txn->FinalizeCommit(op.finalized_commit_timestamp());
-      break;
-    }
-    case ParticipantOpPB::ABORT_TXN: {
-      txn->AbortTransaction();
-      break;
-    }
-    case ParticipantOpPB::UNKNOWN: {
-      return Status::InvalidArgument("unknown op type");
-    }
-  }
+  CHECK_OK(state_->PerformOp());
   *commit_msg = google::protobuf::Arena::CreateMessage<CommitMsg>(state_->pb_arena());
   (*commit_msg)->set_op_type(OperationType::PARTICIPANT_OP);
   TRACE("APPLY: Finished.");
@@ -171,7 +173,7 @@ Status ParticipantOp::Apply(CommitMsg** commit_msg) {
 void ParticipantOp::Finish(OpResult result) {
   auto txn_id = state_->request()->op().txn_id();
   state_->ReleaseTxn();
-  TxnParticipant* txn_participant = state_->tablet_replica()->tablet()->txn_participant();
+  TxnParticipant* txn_participant = state_->txn_participant_;
   if (PREDICT_FALSE(result == Op::ABORTED)) {
     txn_participant->ClearIfInitFailed(txn_id);
     TRACE("FINISH: Op aborted");
diff --git a/src/kudu/tablet/ops/participant_op.h b/src/kudu/tablet/ops/participant_op.h
index 12b8392..ea2ccc2 100644
--- a/src/kudu/tablet/ops/participant_op.h
+++ b/src/kudu/tablet/ops/participant_op.h
@@ -42,6 +42,7 @@ class ParticipantOpState : public OpState {
   // response.
   // TODO(awong): track this on the RPC results tracker.
   ParticipantOpState(TabletReplica* tablet_replica,
+                     TxnParticipant* txn_participant,
                      const tserver::ParticipantRequestPB* request,
                      tserver::ParticipantResponsePB* response = nullptr);
   const tserver::ParticipantRequestPB* request() const override {
@@ -57,6 +58,11 @@ class ParticipantOpState : public OpState {
   // it doesn't already exist. Locks the transaction for writes.
   void AcquireTxnAndLock();
 
+  // Performs the transaction state change requested by this op. Must be called
+  // while the transaction lock is held, i.e. between the calls to
+  // AcquireTxnAndLock() and ReleaseTxn().
+  Status PerformOp();
+
   // Releases the transaction and its lock.
   void ReleaseTxn();
 
@@ -66,6 +72,15 @@ class ParticipantOpState : public OpState {
   }
  private:
   friend class ParticipantOp;
+
+  // Returns an error if the transaction is not in an appropriate state for
+  // the state change requested by this op.
+  Status ValidateOp() const;
+
+  // The particpant being mutated. This may differ from the one we'd get from
+  // TabletReplica if, for instance, we're bootstrapping a new Tablet.
+  TxnParticipant* txn_participant_;
+
   const tserver::ParticipantRequestPB* request_;
   tserver::ParticipantResponsePB* response_;
 
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 17b0c28..ae53d0a 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -68,6 +68,7 @@
 #include "kudu/tablet/mvcc.h"
 #include "kudu/tablet/ops/alter_schema_op.h"
 #include "kudu/tablet/ops/op.h"
+#include "kudu/tablet/ops/participant_op.h"
 #include "kudu/tablet/ops/write_op.h"
 #include "kudu/tablet/row_op.h"
 #include "kudu/tablet/rowset.h"
@@ -113,6 +114,7 @@ using kudu::consensus::OpIdEquals;
 using kudu::consensus::OpIdToString;
 using kudu::consensus::OperationType;
 using kudu::consensus::OperationType_Name;
+using kudu::consensus::PARTICIPANT_OP;
 using kudu::consensus::RaftConfigPB;
 using kudu::consensus::ReplicateMsg;
 using kudu::consensus::WRITE_OP;
@@ -272,6 +274,9 @@ class TabletBootstrap {
   Status PlayChangeConfigRequest(const IOContext* io_context, ReplicateMsg* replicate_msg,
                                  const CommitMsg& commit_msg);
 
+  Status PlayTxnParticipantOpRequest(const IOContext* io_context, ReplicateMsg* replicate_msg,
+                                     const CommitMsg& commit_msg);
+
   Status PlayNoOpRequest(const IOContext* io_context, ReplicateMsg* replicate_msg,
                          const CommitMsg& commit_msg);
 
@@ -1084,6 +1089,10 @@ Status TabletBootstrap::HandleEntryPair(const IOContext* io_context, LogEntryPB*
       RETURN_NOT_OK_REPLAY(PlayChangeConfigRequest, io_context, replicate, commit);
       break;
 
+    case PARTICIPANT_OP:
+      RETURN_NOT_OK_REPLAY(PlayTxnParticipantOpRequest, io_context, replicate, commit);
+      break;
+
     case NO_OP:
       RETURN_NOT_OK_REPLAY(PlayNoOpRequest, io_context, replicate, commit);
       break;
@@ -1516,6 +1525,22 @@ Status TabletBootstrap::PlayChangeConfigRequest(const IOContext* /*io_context*/,
   return AppendCommitMsg(commit_msg);
 }
 
+Status TabletBootstrap::PlayTxnParticipantOpRequest(const IOContext* /*io_context*/,
+                                                    ReplicateMsg* replicate_msg,
+                                                    const CommitMsg& commit_msg) {
+  ParticipantOpState op_state(tablet_replica_.get(),
+                              tablet_->txn_participant(),
+                              &replicate_msg->participant_request());
+  op_state.AcquireTxnAndLock();
+  SCOPED_CLEANUP({
+    op_state.ReleaseTxn();
+  });
+  // NOTE: don't bother validating the current state of the op. Presumably that
+  // happened the first time this op was written.
+  RETURN_NOT_OK(op_state.PerformOp());
+  return AppendCommitMsg(commit_msg);
+}
+
 Status TabletBootstrap::PlayNoOpRequest(const IOContext* /*io_context*/,
                                         ReplicateMsg* /*replicate_msg*/,
                                         const CommitMsg& commit_msg) {
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index 7052d24..1716fc9 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -662,6 +662,7 @@ Status TabletReplica::StartFollowerOp(const scoped_refptr<ConsensusRound>& round
       unique_ptr<ParticipantOpState> op_state(
           new ParticipantOpState(
               this,
+              tablet_->txn_participant(),
               &replicate_msg->participant_request()));
       op_state->SetResultTracker(result_tracker_);
       op.reset(new ParticipantOp(std::move(op_state), consensus::REPLICA));
diff --git a/src/kudu/tablet/txn_participant-test-util.h b/src/kudu/tablet/txn_participant-test-util.h
index 43268ef..b4bed41 100644
--- a/src/kudu/tablet/txn_participant-test-util.h
+++ b/src/kudu/tablet/txn_participant-test-util.h
@@ -49,6 +49,7 @@ Status CallParticipantOp(TabletReplica* replica,
   }
   std::unique_ptr<ParticipantOpState> op_state(new ParticipantOpState(
       replica,
+      replica->tablet()->txn_participant(),
       &req,
       resp));
   CountDownLatch latch(1);
diff --git a/src/kudu/tablet/txn_participant-test.cc b/src/kudu/tablet/txn_participant-test.cc
index ca58f84..5dbc925 100644
--- a/src/kudu/tablet/txn_participant-test.cc
+++ b/src/kudu/tablet/txn_participant-test.cc
@@ -287,5 +287,20 @@ TEST_F(TxnParticipantTest, TestConcurrentOps) {
   }
 }
 
+TEST_F(TxnParticipantTest, TestReplayParticipantOps) {
+  constexpr const int64_t kTxnId = 1;
+  for (const auto& type : kCommitSequence) {
+    ParticipantResponsePB resp;
+    ASSERT_OK(CallParticipantOp(
+        tablet_replica_.get(), kTxnId, type, kDummyCommitTimestamp, &resp));
+    SCOPED_TRACE(SecureShortDebugString(resp));
+    ASSERT_FALSE(resp.has_error());
+    ASSERT_TRUE(resp.has_timestamp());
+  }
+  ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
+      { kTxnId, Txn::kCommitted, kDummyCommitTimestamp }
+  }), txn_participant()->GetTxnsForTests());
+}
+
 } // namespace tablet
 } // namespace kudu