You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/12/05 01:58:26 UTC

[6/7] kudu git commit: KUDU-798 (part 3) Replica transactions must start/abort on the consensus update thread

KUDU-798 (part 3) Replica transactions must start/abort on the consensus update thread

In order for a consensus replica to safely move "safe" time, it needs
to know that all transactions that come before it have at least
started. One way to do this is to call transaction_->Start() on
transaction_driver_->Init(). We know Init() is called by the thread
updating consensus, for non-leader transactions. This should be ok as
the leader has already correctly serialized the transactions.

However, if transactions are now started on Init() then, in the case
they abort (i.e. when transaction_driver_->ReplicationFinished() is
called with a non-ok status, again done by the thread updating
consensus), we must make sure that the transaction is actually
removed from mvcc before that method returns. Otherwise consensus
might call Start() on another transaction with the same timestamp
before the failed transaction is removed from mvcc.

To do this this patch adds a way to release only the mvcc txn,
since its the only thing we care about and Prepare() might still
be acquiring row locks.

I ran the new raft_consensus-itest that includes unique/duplicate
key workloads and exactly once semantics in dist-test.

Results of slow mode, 1 stress thread, 1000 loop runs of
raft_consensus-itest on dist-test.

TSAN-prev (16/1000 failures): http://dist-test.cloudera.org//job?job_id=david.alves.1480814757.31031
TSAN-this (10/1000 failures): http://dist-test.cloudera.org//job?job_id=david.alves.1480812541.27015

I inspected the TSAN failures and they were mostly ~DnsResolver()
races on test code itself. The tests themselves passed for the
most part. Some ended up timing out with archives that are too
big to download. This also happens in the previous patch.

ASAN-prev (02/1000 failures): http://dist-test.cloudera.org//job?job_id=david.alves.1480817418.1140
ASAN-this (03/1000 failures): http://dist-test.cloudera.org//job?job_id=david.alves.1480811163.22558

I inspected the ASAN failures. Two of them are test only flakes.
One of them is worrying as the consensus queue remains full,
suggesting a deadlock, but this also happening on the previous
patch. Example:

https://kudu-test-results.s3.amazonaws.com/david.alves.1480817418.1140.36139843995986a54aaf9f533a00111f384e85a6.145.0-artifacts.zip?Signature=PjjGYVvmWns4t0M7j6%2FAZQYVj34%3D&Expires=1480904781&AWSAccessKeyId=AKIAJ2NR2VXMAHTVLMRA

Change-Id: Ie360e597eea86551c453717d7a1a000848027f4c
Reviewed-on: http://gerrit.cloudera.org:8080/5294
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/f9e5993c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/f9e5993c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/f9e5993c

Branch: refs/heads/master
Commit: f9e5993c9390d56700d7ae29d418035b152f6858
Parents: 02a96c7
Author: David Alves <dr...@apache.org>
Authored: Wed Nov 30 16:28:14 2016 -0800
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Sun Dec 4 05:33:32 2016 +0000

----------------------------------------------------------------------
 src/kudu/tablet/transactions/transaction.h      |  3 ++
 .../tablet/transactions/transaction_driver.cc   | 37 +++++++++++++-------
 .../tablet/transactions/transaction_driver.h    | 25 ++++++-------
 .../tablet/transactions/write_transaction.cc    | 26 +++++++++-----
 .../tablet/transactions/write_transaction.h     |  9 ++---
 5 files changed, 60 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/f9e5993c/src/kudu/tablet/transactions/transaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction.h b/src/kudu/tablet/transactions/transaction.h
index 29fcbee..4d8b4d5 100644
--- a/src/kudu/tablet/transactions/transaction.h
+++ b/src/kudu/tablet/transactions/transaction.h
@@ -96,6 +96,9 @@ class Transaction {
   // side-effects.
   virtual Status Prepare() = 0;
 
+  // Aborts the prepare phase.
+  virtual void AbortPrepare() {}
+
   // Actually starts a transaction, assigning a timestamp to the transaction.
   // LEADER replicas execute this in or right after Prepare(), while FOLLOWER/LEARNER
   // replicas execute this right before the Apply() phase as the transaction's

http://git-wip-us.apache.org/repos/asf/kudu/blob/f9e5993c/src/kudu/tablet/transactions/transaction_driver.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.cc b/src/kudu/tablet/transactions/transaction_driver.cc
index 23beaad..ab0606d 100644
--- a/src/kudu/tablet/transactions/transaction_driver.cc
+++ b/src/kudu/tablet/transactions/transaction_driver.cc
@@ -114,6 +114,13 @@ Status TransactionDriver::Init(gscoped_ptr<Transaction> transaction,
     DCHECK(op_id_copy_.IsInitialized());
     replication_state_ = REPLICATING;
     replication_start_time_ = MonoTime::Now();
+    // Start the replica transaction in the thread that is updating consensus, for non-leader
+    // transactions.
+    // Replica transactions were already assigned a timestamp so we don't need to acquire locks
+    // before calling Start(). Starting the the transaction here gives a strong guarantee
+    // to consensus that the transaction is on mvcc when it moves "safe" time so that we don't
+    // risk marking a timestamp "safe" before all transactions before it are in-flight are on mvcc.
+    RETURN_NOT_OK(transaction_->Start());
     if (state()->are_results_tracked()) {
       // If this is a follower transaction, make sure to set the transaction completion callback
       // before the transaction has a chance to fail.
@@ -138,7 +145,6 @@ Status TransactionDriver::Init(gscoped_ptr<Transaction> transaction,
   }
 
   RETURN_NOT_OK(txn_tracker_->Add(this));
-
   return Status::OK();
 }
 
@@ -189,7 +195,7 @@ Status TransactionDriver::ExecuteAsync() {
 
   if (s.ok()) {
     s = prepare_pool_->SubmitClosure(
-      Bind(&TransactionDriver::PrepareAndStartTask, Unretained(this)));
+      Bind(&TransactionDriver::PrepareTask, Unretained(this)));
   }
 
   if (!s.ok()) {
@@ -200,9 +206,9 @@ Status TransactionDriver::ExecuteAsync() {
   return Status::OK();
 }
 
-void TransactionDriver::PrepareAndStartTask() {
-  TRACE_EVENT_FLOW_END0("txn", "PrepareAndStartTask", this);
-  Status prepare_status = PrepareAndStart();
+void TransactionDriver::PrepareTask() {
+  TRACE_EVENT_FLOW_END0("txn", "PrepareTask", this);
+  Status prepare_status = Prepare();
   if (PREDICT_FALSE(!prepare_status.ok())) {
     HandleFailure(prepare_status);
   }
@@ -236,9 +242,10 @@ void TransactionDriver::RegisterFollowerTransactionOnResultTracker() {
   }
 }
 
-Status TransactionDriver::PrepareAndStart() {
-  TRACE_EVENT1("txn", "PrepareAndStart", "txn", this);
-  VLOG_WITH_PREFIX(4) << "PrepareAndStart()";
+Status TransactionDriver::Prepare() {
+  TRACE_EVENT1("txn", "Prepare", "txn", this);
+  VLOG_WITH_PREFIX(4) << "Prepare()";
+
   // Actually prepare and start the transaction.
   prepare_physical_timestamp_ = GetMonoTimeMicros();
 
@@ -260,7 +267,6 @@ Status TransactionDriver::PrepareAndStart() {
     // preempted after the state is prepared apply can be triggered by another thread without the
     // rpc being registered.
     if (transaction_->type() == consensus::REPLICA) {
-      RETURN_NOT_OK(transaction_->Start());
       RegisterFollowerTransactionOnResultTracker();
     // ... else we're a client-started transaction. Make sure we're still the driver of the
     // RPC and give up if we aren't.
@@ -295,9 +301,8 @@ Status TransactionDriver::PrepareAndStart() {
           transaction_->state()->timestamp().ToUint64());
 
       RETURN_NOT_OK(transaction_->Start());
-
-      VLOG_WITH_PREFIX(4) << "Triggering consensus repl";
-      // Trigger the consensus replication.
+      VLOG_WITH_PREFIX(4) << "Triggering consensus replication.";
+      // Trigger consensus replication.
       {
         std::lock_guard<simple_spinlock> lock(lock_);
         replication_state_ = REPLICATING;
@@ -384,6 +389,13 @@ void TransactionDriver::ReplicationFinished(const Status& status) {
     mutable_state()->mutable_op_id()->CopyFrom(op_id_copy_);
   }
 
+  // If we're going to abort, do so before changing the state below. This avoids a race with
+  // the prepare thread, which would race with the thread calling this method to release the driver
+  // while we're aborting, if we were to do it afterwards.
+  if (!status.ok()) {
+    transaction_->AbortPrepare();
+  }
+
   MonoDelta replication_duration;
   PrepareState prepare_state_copy;
   {
@@ -399,7 +411,6 @@ void TransactionDriver::ReplicationFinished(const Status& status) {
     replication_duration = replication_finished_time - replication_start_time_;
   }
 
-
   TRACE_COUNTER_INCREMENT("replication_time_us", replication_duration.ToMicroseconds());
 
   // If we have prepared and replicated, we're ready

http://git-wip-us.apache.org/repos/asf/kudu/blob/f9e5993c/src/kudu/tablet/transactions/transaction_driver.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.h b/src/kudu/tablet/transactions/transaction_driver.h
index 1072703..df62a20 100644
--- a/src/kudu/tablet/transactions/transaction_driver.h
+++ b/src/kudu/tablet/transactions/transaction_driver.h
@@ -53,22 +53,23 @@ class TransactionTracker;
 //  1 - Init() is called on a newly created driver object.
 //      If the driver is instantiated from a REPLICA, then we know that
 //      the operation is already "REPLICATING" (and thus we don't need to
-//      trigger replication ourself later on).
+//      trigger replication ourself later on). In this case the transaction has already
+//      been serialized by the leader, so we also call transaction_->Start().
 //
-//  2 - ExecuteAsync() is called. This submits PrepareAndStartTask() to prepare_pool_
+//  2 - ExecuteAsync() is called. This submits PrepareTask() to prepare_pool_
 //      and returns immediately.
 //
-//  3 - PrepareAndStartTask() calls Prepare() and Start() on the transaction.
+//  3 - PrepareTask() calls Prepare() on the transaction.
 //
 //      Once successfully prepared, if we have not yet replicated (i.e we are leader),
-//      also triggers consensus->Replicate() and changes the replication state to
-//      REPLICATING.
+//      assign a timestamp and call transaction_->Start(). Finally call consensus->Replicate()
+//      and change the replication state to REPLICATING.
 //
 //      On the other hand, if we have already successfully replicated (eg we are the
-//      follower and ConsensusCommitted() has already been called, then we can move
+//      follower and ReplicationFinished() has already been called, then we can move
 //      on to ApplyAsync().
 //
-//  4 - The Consensus implementation calls ConsensusCommitted()
+//  4 - The Consensus implementation calls ReplicationFinished()
 //
 //      This is triggered by consensus when the commit index moves past our own
 //      OpId. On followers, this can happen before Prepare() finishes, and thus
@@ -297,12 +298,12 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver> {
 
   ~TransactionDriver() {}
 
-  // The task submitted to the prepare threadpool to prepare and start
-  // the transaction. If PrepareAndStart() fails, calls HandleFailure.
-  void PrepareAndStartTask();
+  // The task submitted to the prepare threadpool to prepare the transaction. If Prepare() fails,
+  // calls HandleFailure.
+  void PrepareTask();
 
-  // Actually prepare and start.
-  Status PrepareAndStart();
+  // Actually prepare.
+  Status Prepare();
 
   // Submits ApplyTask to the apply pool.
   Status ApplyAsync();

http://git-wip-us.apache.org/repos/asf/kudu/blob/f9e5993c/src/kudu/tablet/transactions/write_transaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/write_transaction.cc b/src/kudu/tablet/transactions/write_transaction.cc
index d2207fc..d7dce19 100644
--- a/src/kudu/tablet/transactions/write_transaction.cc
+++ b/src/kudu/tablet/transactions/write_transaction.cc
@@ -101,6 +101,10 @@ Status WriteTransaction::Prepare() {
   return Status::OK();
 }
 
+void WriteTransaction::AbortPrepare() {
+  state()->ReleaseMvccTxn(TransactionResult::ABORTED);
+}
+
 Status WriteTransaction::Start() {
   TRACE_EVENT0("txn", "WriteTransaction::Start");
   TRACE("Start()");
@@ -255,6 +259,19 @@ void WriteTransactionState::StartApplying() {
 }
 
 void WriteTransactionState::CommitOrAbort(Transaction::TransactionResult result) {
+  ReleaseMvccTxn(result);
+
+  TRACE("Releasing row and schema locks");
+  ReleaseRowLocks();
+  ReleaseSchemaLock();
+
+  // After committing, if there is an RPC going on, the driver will respond to it.
+  // That will delete the RPC request and response objects. So, NULL them here
+  // so we don't read them again after they're deleted.
+  ResetRpcFields();
+}
+
+void WriteTransactionState::ReleaseMvccTxn(Transaction::TransactionResult result) {
   if (mvcc_tx_.get() != nullptr) {
     // Commit the transaction.
     switch (result) {
@@ -267,15 +284,6 @@ void WriteTransactionState::CommitOrAbort(Transaction::TransactionResult result)
     }
   }
   mvcc_tx_.reset();
-
-  TRACE("Releasing row and schema locks");
-  ReleaseRowLocks();
-  ReleaseSchemaLock();
-
-  // After committing, if there is an RPC going on, the driver will respond to it.
-  // That will delete the RPC request and response objects. So, NULL them here
-  // so we don't read them again after they're deleted.
-  ResetRpcFields();
 }
 
 void WriteTransactionState::ReleaseTxResultPB(TxResultPB* result) const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/f9e5993c/src/kudu/tablet/transactions/write_transaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/write_transaction.h b/src/kudu/tablet/transactions/write_transaction.h
index 05f84ff..5cb8f23 100644
--- a/src/kudu/tablet/transactions/write_transaction.h
+++ b/src/kudu/tablet/transactions/write_transaction.h
@@ -120,6 +120,7 @@ class WriteTransactionState : public TransactionState {
   // Release the already-acquired schema lock.
   void ReleaseSchemaLock();
 
+  void ReleaseMvccTxn(Transaction::TransactionResult result);
 
   void set_schema_at_decode_time(const Schema* schema) {
     std::lock_guard<simple_spinlock> l(txn_state_lock_);
@@ -152,12 +153,6 @@ class WriteTransactionState : public TransactionState {
   // Note: request_ and response_ are set to NULL after this method returns.
   void CommitOrAbort(Transaction::TransactionResult result);
 
-  // Aborts the mvcc transaction and releases the component lock.
-  // Only one of Commit() or Abort() should be called.
-  //
-  // REQUIRES: StartApplying() must never have been called.
-  void Abort();
-
   // Returns all the prepared row writes for this transaction. Usually called
   // on the apply phase to actually make changes to the tablet.
   const std::vector<RowOp*>& row_ops() const {
@@ -236,6 +231,8 @@ class WriteTransaction : public Transaction {
   // into the WriteTransactionState.
   virtual Status Prepare() OVERRIDE;
 
+  virtual void AbortPrepare() OVERRIDE;
+
   // Actually starts the Mvcc transaction and assigns a timestamp to this transaction.
   virtual Status Start() OVERRIDE;