You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2021/02/01 08:37:20 UTC
[kudu] branch master updated: KUDU-2612: loosen restrictions for
BEGIN_COMMIT ops
This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 8c5e655 KUDU-2612: loosen restrictions for BEGIN_COMMIT ops
8c5e655 is described below
commit 8c5e655d2d29e7758f26a373ea240631722ea1cf
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Mon Jan 25 18:28:20 2021 -0800
KUDU-2612: loosen restrictions for BEGIN_COMMIT ops
This patch updates the state validations for the BEGIN_COMMIT. Rather
than only requiring an in-flight transaction to be kOpen (the common
case) or kCommitInProgress (if re-attempting a BEGIN_COMMIT call), it's
more robust to allow the call to be made even after the transaction has
already been finalized. This will allow a to-be-merged implementation of
a commit task to be retried with much less fuss.
Here's some pseudo-code for such a commit task:
1. persist COMMIT_IN_PROGRESS record on TxnStatusManager
2. foreach participant as p: BEGIN_COMMIT(p)
3. commit_ts = max timestamp used across BEGIN_COMMIT ops
4. foreach participant as p: FINALIZE_COMMIT(p, commit_ts)
5. persist COMMITTED record on TxnStatusManager
If the commit task is interrupted (e.g. by some crash) between steps 3
and 5, we may be left with some participants with fully finalized
commits. In such cases, all other participants _must_ also finalize, and
they must finalize with the same timestamp. To ensure this, it must be
possible to re-run a commit task. However, re-running it without this
patch may lead to issues because the BEGIN_COMMIT ops would yield
errors, complaining about an illegal state on participants that were
finalized.
This patch allows for a BEGIN_COMMIT op to succeed and return
immediately if a FINALIZE_COMMIT op has already completed. If so, the
finalized commit timestamp is sent back, allowing for the above commit
task to be repeatable.
Change-Id: Ifa4c5314190c84648c1b1edea7aab776b4882f97
Reviewed-on: http://gerrit.cloudera.org:8080/16992
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
.../integration-tests/txn_participant-itest.cc | 65 ++++++++++++++++++----
src/kudu/tablet/tablet_metadata.cc | 10 +++-
src/kudu/tablet/tablet_metadata.h | 7 ++-
src/kudu/tablet/txn_participant-test.cc | 27 ++++++++-
src/kudu/tablet/txn_participant.h | 33 ++++++++++-
5 files changed, 123 insertions(+), 19 deletions(-)
diff --git a/src/kudu/integration-tests/txn_participant-itest.cc b/src/kudu/integration-tests/txn_participant-itest.cc
index fcbd083..b270bea 100644
--- a/src/kudu/integration-tests/txn_participant-itest.cc
+++ b/src/kudu/integration-tests/txn_participant-itest.cc
@@ -549,6 +549,49 @@ TEST_F(TxnParticipantITest, TestProxyBasicCalls) {
}
}
+TEST_F(TxnParticipantITest, TestBeginCommitAfterFinalize) {
+ constexpr const int kLeaderIdx = 0;
+ constexpr const int kTxnId = 0;
+ vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+ ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(kDefaultTimeout));
+ auto admin_proxy = cluster_->tserver_admin_proxy(kLeaderIdx);
+ const auto tablet_id = replicas[kLeaderIdx]->tablet_id();
+ {
+ TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+ ASSERT_OK(ParticipateInTransactionCheckResp(
+ admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_TXN, &code));
+ ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+ }
+ // Commit the transaction.
+ Timestamp begin_commit_ts;
+ {
+ TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+ ASSERT_OK(ParticipateInTransactionCheckResp(
+ admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_COMMIT,
+ &code, &begin_commit_ts));
+ ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+ ASSERT_NE(Timestamp::kInvalidTimestamp, begin_commit_ts);
+ }
+ {
+ TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+ ASSERT_OK(ParticipateInTransactionCheckResp(
+ admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::FINALIZE_COMMIT, &code));
+ ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
+ }
+ // A call to BEGIN_COMMIT should yield the finalized commit timestamp.
+ Timestamp refetched_begin_commit_ts;
+ {
+ TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
+ Status s = ParticipateInTransactionCheckResp(
+ admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_COMMIT, &code,
+ &refetched_begin_commit_ts);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ ASSERT_EQ(TabletServerErrorPB::TXN_OP_ALREADY_APPLIED, code);
+ ASSERT_NE(Timestamp::kInvalidTimestamp, refetched_begin_commit_ts);
+ ASSERT_EQ(Timestamp(kDummyCommitTimestamp), refetched_begin_commit_ts);
+ }
+}
+
TEST_F(TxnParticipantITest, TestProxyIllegalStatesInCommitSequence) {
constexpr const int kLeaderIdx = 0;
constexpr const int kTxnId = 0;
@@ -581,7 +624,7 @@ TEST_F(TxnParticipantITest, TestProxyIllegalStatesInCommitSequence) {
ASSERT_EQ(TabletServerErrorPB::TXN_ILLEGAL_STATE, code);
}
- // Start commititng and ensure we can't start another transaction.
+ // Start committing and ensure we can't start another transaction.
Timestamp begin_commit_ts;
{
TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
@@ -609,22 +652,21 @@ TEST_F(TxnParticipantITest, TestProxyIllegalStatesInCommitSequence) {
ASSERT_EQ(TabletServerErrorPB::TXN_ILLEGAL_STATE, code);
}
- // Finalize the commit and ensure we can do nothing else.
+ // Finalize the commit and ensure we can't begin or abort.
{
TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
ASSERT_OK(ParticipateInTransactionCheckResp(
admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::FINALIZE_COMMIT, &code));
ASSERT_EQ(TabletServerErrorPB::UNKNOWN_ERROR, code);
}
- {
+ for (auto type : { ParticipantOpPB::BEGIN_COMMIT, ParticipantOpPB::FINALIZE_COMMIT }) {
TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
Status s = ParticipateInTransactionCheckResp(
- admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::FINALIZE_COMMIT, &code);
+ admin_proxy.get(), tablet_id, kTxnId, type, &code);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
ASSERT_EQ(TabletServerErrorPB::TXN_OP_ALREADY_APPLIED, code);
}
for (auto type : { ParticipantOpPB::BEGIN_TXN,
- ParticipantOpPB::BEGIN_COMMIT,
ParticipantOpPB::ABORT_TXN }) {
TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
Status s = ParticipateInTransactionCheckResp(
@@ -816,21 +858,22 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientCommitSequence) {
ASSERT_EQ(refetched_begin_commit_ts, begin_commit_ts);
NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kCommitInProgress, -1 } }));
- // Once we finish committing, we should be unable to do anything.
+ // Once we finish committing, we should be unable to begin or abort.
ASSERT_OK(txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT, kDummyCommitTimestamp),
kDefaultTimeout));
NO_FATALS(CheckReplicasMatchTxns(replicas, {{kTxnId, kCommitted, kDummyCommitTimestamp}}));
- for (const auto type : { ParticipantOpPB::BEGIN_TXN, ParticipantOpPB::BEGIN_COMMIT,
- ParticipantOpPB::ABORT_TXN }) {
+ for (const auto type : { ParticipantOpPB::BEGIN_TXN, ParticipantOpPB::ABORT_TXN }) {
Status s = txn_client->ParticipateInTransaction(
tablet_id, MakeParticipantOp(kTxnId, type), kDefaultTimeout);
ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
}
NO_FATALS(CheckReplicasMatchTxns(replicas, {{kTxnId, kCommitted, kDummyCommitTimestamp}}));
- ASSERT_OK(txn_client->ParticipateInTransaction(
- tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT, kDummyCommitTimestamp),
- kDefaultTimeout));
+ for (const auto type : { ParticipantOpPB::BEGIN_COMMIT, ParticipantOpPB::FINALIZE_COMMIT }) {
+ ASSERT_OK(txn_client->ParticipateInTransaction(
+ tablet_id, MakeParticipantOp(kTxnId, type, kDummyCommitTimestamp),
+ kDefaultTimeout));
+ }
NO_FATALS(CheckReplicasMatchTxns(replicas, {{kTxnId, kCommitted, kDummyCommitTimestamp}}));
}
diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc
index ddd15a2..f572085 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -850,17 +850,25 @@ void TabletMetadata::AbortTransaction(int64_t txn_id, unique_ptr<MinLogIndexAnch
anchors_needing_flush_.emplace_back(std::move(log_anchor));
}
-bool TabletMetadata::HasTxnMetadata(int64_t txn_id, TxnState* state) {
+bool TabletMetadata::HasTxnMetadata(int64_t txn_id, TxnState* state, Timestamp* timestamp) {
std::lock_guard<LockType> l(data_lock_);
auto txn_meta = FindPtrOrNull(txn_metadata_by_txn_id_, txn_id);
if (txn_meta) {
if (!state) return true;
if (txn_meta->commit_timestamp()) {
*state = kCommitted;
+ if (timestamp) {
+ DCHECK(txn_meta->commit_timestamp());
+ *timestamp = *txn_meta->commit_timestamp();
+ }
} else if (txn_meta->aborted()) {
*state = kAborted;
} else if (txn_meta->commit_mvcc_op_timestamp()) {
*state = kCommitInProgress;
+ if (timestamp) {
+ DCHECK(txn_meta->commit_mvcc_op_timestamp());
+ *timestamp = *txn_meta->commit_mvcc_op_timestamp();
+ }
} else {
*state = kOpen;
}
diff --git a/src/kudu/tablet/tablet_metadata.h b/src/kudu/tablet/tablet_metadata.h
index e4ea591..b9edeeb 100644
--- a/src/kudu/tablet/tablet_metadata.h
+++ b/src/kudu/tablet/tablet_metadata.h
@@ -272,8 +272,11 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
void AbortTransaction(int64_t txn_id, std::unique_ptr<log::MinLogIndexAnchorer> log_anchor);
// Returns whether a given transaction has metadata, and if requested, what
- // state the transaction is in.
- bool HasTxnMetadata(int64_t txn_id, TxnState* state = nullptr);
+ // state the transaction is in. If set, 'timestamp' is set to the begin
+ // commit timestamp or finalized commit timestamp, depending on whether
+ // 'state' is kCommitInProgress or kCommitted respectively.
+ bool HasTxnMetadata(int64_t txn_id, TxnState* state = nullptr,
+ Timestamp* timestamp = nullptr);
// Returns the transaction IDs that were persisted as being in-flight,
// terminal (committed or aborted), and having un-flushed MRSs.
diff --git a/src/kudu/tablet/txn_participant-test.cc b/src/kudu/tablet/txn_participant-test.cc
index 6a8fd74..9373110 100644
--- a/src/kudu/tablet/txn_participant-test.cc
+++ b/src/kudu/tablet/txn_participant-test.cc
@@ -269,7 +269,8 @@ TEST_F(TxnParticipantTest, TestTransactionNotFound) {
}
TEST_F(TxnParticipantTest, TestIllegalTransitions) {
- const auto check_valid_op = [&] (const ParticipantOpPB::ParticipantOpType& type, int64_t txn_id) {
+ const auto check_valid_op = [&] (const ParticipantOpPB::ParticipantOpType& type,
+ int64_t txn_id) {
ParticipantResponsePB resp;
ASSERT_OK(CallParticipantOp(
tablet_replica_.get(), txn_id, type, kDummyCommitTimestamp, &resp));
@@ -278,7 +279,7 @@ TEST_F(TxnParticipantTest, TestIllegalTransitions) {
ASSERT_TRUE(resp.has_timestamp());
};
const auto check_bad_ops = [&] (const vector<ParticipantOpPB::ParticipantOpType>& ops,
- int64_t txn_id) {
+ int64_t txn_id) {
for (const auto& type : ops) {
ParticipantResponsePB resp;
ASSERT_OK(CallParticipantOp(
@@ -291,6 +292,24 @@ TEST_F(TxnParticipantTest, TestIllegalTransitions) {
ASSERT_FALSE(resp.has_timestamp());
}
};
+ const auto check_already_applied = [&] (const vector<ParticipantOpPB::ParticipantOpType>& ops,
+ int64_t txn_id, bool expect_timestamp) {
+ for (const auto& type : ops) {
+ ParticipantResponsePB resp;
+ ASSERT_OK(CallParticipantOp(
+ tablet_replica_.get(), txn_id, type, kDummyCommitTimestamp, &resp));
+ SCOPED_TRACE(SecureShortDebugString(resp));
+ ASSERT_TRUE(resp.has_error());
+ ASSERT_TRUE(resp.error().has_status());
+ ASSERT_EQ(AppStatusPB::ILLEGAL_STATE, resp.error().status().code());
+ ASSERT_EQ(TabletServerErrorPB::TXN_OP_ALREADY_APPLIED, resp.error().code());
+ if (expect_timestamp) {
+ ASSERT_TRUE(resp.has_timestamp());
+ } else {
+ ASSERT_FALSE(resp.has_timestamp());
+ }
+ }
+ };
// Once we've begun the transaction, we can't finalize without beginning to
// commit.
NO_FATALS(check_valid_op(ParticipantOpPB::BEGIN_TXN, kTxnId));
@@ -301,6 +320,7 @@ TEST_F(TxnParticipantTest, TestIllegalTransitions) {
// Once we begin committing, we can't start the transaction again.
NO_FATALS(check_valid_op(ParticipantOpPB::BEGIN_COMMIT, kTxnId));
+ NO_FATALS(check_already_applied({ ParticipantOpPB::BEGIN_COMMIT }, kTxnId, true));
NO_FATALS(check_bad_ops({ ParticipantOpPB::BEGIN_TXN }, kTxnId));
ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
{ kTxnId, kCommitInProgress, -1 },
@@ -308,8 +328,9 @@ TEST_F(TxnParticipantTest, TestIllegalTransitions) {
// Once we've begun finalizing, we can't do anything.
NO_FATALS(check_valid_op(ParticipantOpPB::FINALIZE_COMMIT, kTxnId));
+ NO_FATALS(check_already_applied({ ParticipantOpPB::BEGIN_COMMIT }, kTxnId, true));
+ NO_FATALS(check_already_applied({ ParticipantOpPB::FINALIZE_COMMIT }, kTxnId, false));
NO_FATALS(check_bad_ops({ ParticipantOpPB::BEGIN_TXN,
- ParticipantOpPB::BEGIN_COMMIT,
ParticipantOpPB::ABORT_TXN }, kTxnId));
ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
{ kTxnId, kCommitted, kDummyCommitTimestamp },
diff --git a/src/kudu/tablet/txn_participant.h b/src/kudu/tablet/txn_participant.h
index bb0c356..d276840 100644
--- a/src/kudu/tablet/txn_participant.h
+++ b/src/kudu/tablet/txn_participant.h
@@ -25,6 +25,7 @@
#include <vector>
#include <glog/logging.h>
+#include <boost/optional/optional.hpp>
#include "kudu/common/timestamp.h"
#include "kudu/consensus/log_anchor_registry.h"
@@ -134,13 +135,41 @@ class Txn : public RefCountedThreadSafe<Txn> {
Status ValidateBeginCommit(tserver::TabletServerErrorPB::Code* code,
Timestamp* begin_commit_ts) const {
DCHECK(state_lock_.is_locked());
- RETURN_NOT_OK(CheckFinishedInitializing(code));
+ boost::optional<Timestamp> already_applied_timestamp;
+ if (PREDICT_FALSE(state_ == kInitializing)) {
+ Timestamp timestamp;
+ TxnState meta_state;
+ if (!tablet_metadata_->HasTxnMetadata(txn_id_, &meta_state, ×tamp)) {
+ return Status::NotFound("Transaction hasn't been successfully started");
+ }
+ if (PREDICT_FALSE(meta_state != kCommitted && meta_state != kCommitInProgress)) {
+ *code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
+ return Status::IllegalState(
+ strings::Substitute("Cannot begin committing transaction in state: $0",
+ TxnStateToString(state_)));
+ }
+ // There's no in-flight transaction, but we've already committed the
+ // transaction and persisted a commit timestamp. Return the commit
+ // timestamp.
+ already_applied_timestamp = timestamp;
+ }
+ // If we're in the process of committing, return the commit timestamp we
+ // have available to us.
if (PREDICT_FALSE(state_ == kCommitInProgress)) {
- *begin_commit_ts = DCHECK_NOTNULL(commit_op_)->timestamp();
+ already_applied_timestamp = DCHECK_NOTNULL(commit_op_)->timestamp();
+ }
+ if (PREDICT_FALSE(state_ == kCommitted)) {
+ DCHECK_NE(-1, commit_timestamp_);
+ already_applied_timestamp = Timestamp(commit_timestamp_);
+ }
+ if (already_applied_timestamp) {
+ DCHECK_NE(Timestamp::kInvalidTimestamp, *already_applied_timestamp);
+ *begin_commit_ts = *already_applied_timestamp;
*code = tserver::TabletServerErrorPB::TXN_OP_ALREADY_APPLIED;
return Status::IllegalState(
strings::Substitute("Transaction $0 commit already in progress", txn_id_));
}
+ // If the transaction is otherwise not open, return an error.
if (PREDICT_FALSE(state_ != kOpen)) {
*code = tserver::TabletServerErrorPB::TXN_ILLEGAL_STATE;
return Status::IllegalState(