You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2020/12/08 21:16:08 UTC

[kudu] 02/05: [tserver] KUDU-2612: participant op RPC endpoint

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 70bce76af3008031e1dd9559bfa6dca1763d895f
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Tue Dec 1 14:56:10 2020 -0800

    [tserver] KUDU-2612: participant op RPC endpoint
    
    This adds an RPC endpoint to the tablet servers that allows proxies to
    interact with transaction participants. This will be used in step 13 and
    step 18 of the transaction write path[1], allowing the TxnStatusManagers
    to update participants' transaction states.
    
    [1] https://docs.google.com/document/d/1qv7Zejpfzg-HvF5azRL49g5lRLQ4437EmJ53GiupcWQ/edit#heading=h.4lm41o75ev1x
    
    Change-Id: Ic48895438ce67e453d235934ac560efe8415921b
    Reviewed-on: http://gerrit.cloudera.org:8080/16814
    Reviewed-by: Hao Hao <ha...@cloudera.com>
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 .../integration-tests/txn_participant-itest.cc     | 236 +++++++++++++++++++++
 src/kudu/tserver/tablet_service.cc                 |  32 +++
 src/kudu/tserver/tablet_service.h                  |   6 +
 src/kudu/tserver/tserver_admin.proto               |   7 +-
 4 files changed, 280 insertions(+), 1 deletion(-)

diff --git a/src/kudu/integration-tests/txn_participant-itest.cc b/src/kudu/integration-tests/txn_participant-itest.cc
index 81be4c0..0373b69 100644
--- a/src/kudu/integration-tests/txn_participant-itest.cc
+++ b/src/kudu/integration-tests/txn_participant-itest.cc
@@ -25,6 +25,7 @@
 #include <utility>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
@@ -45,6 +46,8 @@
 #include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/integration-tests/test_workload.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/mvcc.h"
 #include "kudu/tablet/tablet-test-util.h"
 #include "kudu/tablet/tablet.h"
@@ -58,6 +61,7 @@
 #include "kudu/tserver/ts_tablet_manager.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/tserver/tserver_admin.pb.h"
+#include "kudu/tserver/tserver_admin.proxy.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
@@ -74,6 +78,7 @@ DECLARE_int32(follower_unavailable_considered_failed_sec);
 DECLARE_int32(log_segment_size_mb);
 DECLARE_int32(maintenance_manager_polling_interval_ms);
 DECLARE_int32(raft_heartbeat_interval_ms);
+DECLARE_int32(tablet_bootstrap_inject_latency_ms);
 
 METRIC_DECLARE_histogram(log_gc_duration);
 
@@ -85,7 +90,9 @@ using kudu::tablet::TabletReplica;
 using kudu::tablet::Txn;
 using kudu::tablet::TxnParticipant;
 using kudu::tserver::ParticipantOpPB;
+using kudu::tserver::ParticipantRequestPB;
 using kudu::tserver::ParticipantResponsePB;
+using kudu::tserver::TabletServerAdminServiceProxy;
 using kudu::tserver::WriteRequestPB;
 using std::string;
 using std::thread;
@@ -98,6 +105,39 @@ namespace kudu {
 namespace itest {
 
 namespace {
+ParticipantRequestPB ParticipantRequest(const string& tablet_id, int64_t txn_id,
+                                        ParticipantOpPB::ParticipantOpType type) {
+  ParticipantRequestPB req;
+  req.set_tablet_id(tablet_id);
+  auto* op_pb = req.mutable_op();
+  op_pb->set_txn_id(txn_id);
+  op_pb->set_type(type);
+  if (type == ParticipantOpPB::FINALIZE_COMMIT) {
+    op_pb->set_finalized_commit_timestamp(kDummyCommitTimestamp);
+  }
+  return req;
+}
+
+Status ParticipateInTransaction(TabletServerAdminServiceProxy* admin_proxy,
+                                const string& tablet_id, int64_t txn_id,
+                                ParticipantOpPB::ParticipantOpType type,
+                                ParticipantResponsePB* resp) {
+  rpc::RpcController rpc;
+  return admin_proxy->ParticipateInTransaction(
+      ParticipantRequest(tablet_id, txn_id, type), resp, &rpc);
+}
+
+Status ParticipateInTransactionCheckResp(TabletServerAdminServiceProxy* admin_proxy,
+                                         const string& tablet_id, int64_t txn_id,
+                                         ParticipantOpPB::ParticipantOpType type) {
+  ParticipantResponsePB resp;
+  RETURN_NOT_OK(ParticipateInTransaction(admin_proxy, tablet_id, txn_id, type, &resp));
+  if (resp.has_error()) {
+    return StatusFromPB(resp.error().status());
+  }
+  return Status::OK();
+}
+
 vector<Status> RunOnReplicas(const vector<TabletReplica*>& replicas,
                              int64_t txn_id,
                              ParticipantOpPB::ParticipantOpType type,
@@ -452,6 +492,202 @@ TEST_F(TxnParticipantITest, TestWaitOnAbortCommit) {
   ASSERT_OK(WaitForCompletedOps(follower_replica, before_abort_ts, kShortTimeout));
 }
 
+TEST_F(TxnParticipantITest, TestProxyBasicCalls) {
+  constexpr const int kLeaderIdx = 0;
+  constexpr const int kTxnId = 0;
+  vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+  ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
+  auto admin_proxy = cluster_->tserver_admin_proxy(kLeaderIdx);
+  for (const auto& op : kCommitSequence) {
+    const auto req = ParticipantRequest(replicas[kLeaderIdx]->tablet_id(), kTxnId, op);
+    ParticipantResponsePB resp;
+    rpc::RpcController rpc;
+    ASSERT_OK(admin_proxy->ParticipateInTransaction(req, &resp, &rpc));
+  }
+}
+
+TEST_F(TxnParticipantITest, TestProxyInvalidStatesInCommitSequence) {
+  constexpr const int kLeaderIdx = 0;
+  constexpr const int kTxnId = 0;
+  vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+  ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
+  auto admin_proxy = cluster_->tserver_admin_proxy(kLeaderIdx);
+
+  // Begin after already beginning.
+  const auto tablet_id = replicas[kLeaderIdx]->tablet_id();
+  ASSERT_OK(ParticipateInTransactionCheckResp(
+      admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_TXN));
+  // TODO(awong): IllegalState error codes may be easily misconstrued with
+  // errors coming from Raft. We should make the participants return something
+  // like InvalidArgument if there's an unexpected state.
+  // TODO(awong): make repeated, idempotent return OK instead of an error.
+  Status s = ParticipateInTransactionCheckResp(
+      admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_TXN);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+
+  // We can't finalize the commit without beginning to commit first.
+  s = ParticipateInTransactionCheckResp(
+      admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::FINALIZE_COMMIT);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+
+  // Start commititng and ensure we can't start another transaction.
+  ASSERT_OK(ParticipateInTransactionCheckResp(
+      admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_COMMIT));
+
+  s = ParticipateInTransactionCheckResp(
+      admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_COMMIT);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+
+  s = ParticipateInTransactionCheckResp(
+      admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_TXN);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+
+  // Finalize the commit and ensure we can do nothing else.
+  ASSERT_OK(ParticipateInTransactionCheckResp(
+      admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::FINALIZE_COMMIT));
+  s = ParticipateInTransactionCheckResp(
+      admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::FINALIZE_COMMIT);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+
+  s = ParticipateInTransactionCheckResp(
+      admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_TXN);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+
+  s = ParticipateInTransactionCheckResp(
+      admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_COMMIT);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+
+  s = ParticipateInTransactionCheckResp(
+      admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::ABORT_TXN);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+}
+
+TEST_F(TxnParticipantITest, TestProxyInvalidStatesInAbortSequence) {
+  constexpr const int kLeaderIdx = 0;
+  constexpr const int kTxnId = 0;
+  vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+  ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
+  auto admin_proxy = cluster_->tserver_admin_proxy(kLeaderIdx);
+
+  // Begin after already beginning.
+  const auto tablet_id = replicas[kLeaderIdx]->tablet_id();
+  ASSERT_OK(ParticipateInTransactionCheckResp(
+      admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_TXN));
+  // TODO(awong): IllegalState error codes may be easily misconstrued with
+  // errors coming from Raft. We should make the participants return something
+  // like InvalidArgument if there's an unexpected state.
+  // TODO(awong): make repeated, idempotent return OK instead of an error.
+  Status s = ParticipateInTransactionCheckResp(
+      admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_TXN);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+
+  s = ParticipateInTransactionCheckResp(
+      admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::FINALIZE_COMMIT);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+
+  // Abort the transaction and ensure we can do nothing else.
+  ASSERT_OK(ParticipateInTransactionCheckResp(
+      admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::ABORT_TXN));
+
+  s = ParticipateInTransactionCheckResp(
+      admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::ABORT_TXN);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+
+  s = ParticipateInTransactionCheckResp(
+      admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::FINALIZE_COMMIT);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+
+  s = ParticipateInTransactionCheckResp(
+      admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_TXN);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+
+  s = ParticipateInTransactionCheckResp(
+      admin_proxy.get(), tablet_id, kTxnId, ParticipantOpPB::BEGIN_COMMIT);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+}
+
+TEST_F(TxnParticipantITest, TestProxyNonLeader) {
+  constexpr const int kLeaderIdx = 0;
+  constexpr const int kNonLeaderIdx = kLeaderIdx + 1;
+  constexpr const int kTxnId = 0;
+  vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+  ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
+  auto admin_proxy = cluster_->tserver_admin_proxy(kNonLeaderIdx);
+  for (const auto& op : kCommitSequence) {
+    const auto req = ParticipantRequest(replicas[kLeaderIdx]->tablet_id(), kTxnId, op);
+    ParticipantResponsePB resp;
+    rpc::RpcController rpc;
+    ASSERT_OK(admin_proxy->ParticipateInTransaction(req, &resp, &rpc));
+    ASSERT_TRUE(resp.has_error());
+    auto resp_error = StatusFromPB(resp.error().status());
+    ASSERT_TRUE(resp_error.IsIllegalState()) << resp_error.ToString();
+    ASSERT_STR_CONTAINS(resp_error.ToString(), "not leader");
+  }
+}
+
+TEST_F(TxnParticipantITest, TestProxyTabletBootstrapping) {
+  constexpr const int kLeaderIdx = 0;
+  constexpr const int kTxnId = 0;
+  vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+  auto* leader_replica = replicas[kLeaderIdx];
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
+
+  FLAGS_tablet_bootstrap_inject_latency_ms = 1000;
+  cluster_->mini_tablet_server(kLeaderIdx)->Shutdown();
+  ASSERT_OK(cluster_->mini_tablet_server(kLeaderIdx)->Restart());
+  replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+
+  auto admin_proxy = cluster_->tserver_admin_proxy(kLeaderIdx);
+  for (const auto& op : kCommitSequence) {
+    const auto req = ParticipantRequest(replicas[kLeaderIdx]->tablet_id(), kTxnId, op);
+    ParticipantResponsePB resp;
+    rpc::RpcController rpc;
+    ASSERT_OK(admin_proxy->ParticipateInTransaction(req, &resp, &rpc));
+    ASSERT_TRUE(resp.has_error());
+    auto resp_error = StatusFromPB(resp.error().status());
+    ASSERT_TRUE(resp_error.IsIllegalState()) << resp_error.ToString();
+    ASSERT_STR_CONTAINS(resp_error.ToString(), "not RUNNING");
+  }
+}
+
+TEST_F(TxnParticipantITest, TestProxyTabletNotRunning) {
+  constexpr const int kLeaderIdx = 0;
+  constexpr const int kTxnId = 0;
+  vector<TabletReplica*> replicas = SetUpLeaderGetReplicas(kLeaderIdx);
+  auto* leader_replica = replicas[kLeaderIdx];
+  ASSERT_OK(leader_replica->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
+  auto* tablet_manager = cluster_->mini_tablet_server(kLeaderIdx)->server()->tablet_manager();
+  ASSERT_OK(tablet_manager->DeleteTablet(leader_replica->tablet_id(),
+      tablet::TABLET_DATA_TOMBSTONED, boost::none));
+
+  auto admin_proxy = cluster_->tserver_admin_proxy(kLeaderIdx);
+  for (const auto& op : kCommitSequence) {
+    const auto req = ParticipantRequest(replicas[kLeaderIdx]->tablet_id(), kTxnId, op);
+    ParticipantResponsePB resp;
+    rpc::RpcController rpc;
+    ASSERT_OK(admin_proxy->ParticipateInTransaction(req, &resp, &rpc));
+    ASSERT_TRUE(resp.has_error());
+    auto resp_error = StatusFromPB(resp.error().status());
+    ASSERT_TRUE(resp_error.IsIllegalState()) << resp_error.ToString();
+    ASSERT_STR_CONTAINS(resp_error.ToString(), "not RUNNING");
+  }
+}
+
+TEST_F(TxnParticipantITest, TestProxyTabletNotFound) {
+  constexpr const int kTxnId = 0;
+  auto admin_proxy = cluster_->tserver_admin_proxy(0);
+  for (const auto& op : kCommitSequence) {
+    const auto req = ParticipantRequest("dummy-tablet-id", kTxnId, op);
+    ParticipantResponsePB resp;
+    rpc::RpcController rpc;
+    ASSERT_OK(admin_proxy->ParticipateInTransaction(req, &resp, &rpc));
+    ASSERT_TRUE(resp.has_error());
+    auto resp_error = StatusFromPB(resp.error().status());
+    ASSERT_TRUE(resp_error.IsNotFound()) << resp_error.ToString();
+    ASSERT_STR_CONTAINS(resp_error.ToString(), "not found");
+  }
+}
+
 class TxnParticipantElectionStormITest : public TxnParticipantITest {
  public:
   void SetUp() override {
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index ec6c608..12c6326 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -79,6 +79,7 @@
 #include "kudu/tablet/mvcc.h"
 #include "kudu/tablet/ops/alter_schema_op.h"
 #include "kudu/tablet/ops/op.h"
+#include "kudu/tablet/ops/participant_op.h"
 #include "kudu/tablet/ops/write_op.h"
 #include "kudu/tablet/rowset.h"
 #include "kudu/tablet/tablet.h"
@@ -218,6 +219,7 @@ using kudu::security::TokenPB;
 using kudu::server::ServerBase;
 using kudu::tablet::AlterSchemaOpState;
 using kudu::tablet::MvccSnapshot;
+using kudu::tablet::ParticipantOpState;
 using kudu::tablet::TABLET_DATA_COPYING;
 using kudu::tablet::TABLET_DATA_DELETED;
 using kudu::tablet::TABLET_DATA_TOMBSTONED;
@@ -1296,6 +1298,36 @@ void TabletServiceAdminImpl::CoordinateTransaction(const CoordinateTransactionRe
   context->RespondSuccess();
 }
 
+void TabletServiceAdminImpl::ParticipateInTransaction(const ParticipantRequestPB* req,
+                                                      ParticipantResponsePB* resp,
+                                                      rpc::RpcContext* context) {
+  scoped_refptr<TabletReplica> replica;
+  if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp,
+                                           context, &replica)) {
+    return;
+  }
+  shared_ptr<Tablet> tablet;
+  TabletServerErrorPB::Code error_code;
+  Status s = GetTabletRef(replica, &tablet, &error_code);
+  if (PREDICT_FALSE(!s.ok())) {
+    SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
+    return;
+  }
+  // TODO(awong): consider memory-based throttling?
+  // TODO(awong): we should also persist the transaction's owner, and prevent
+  // other users from mutating it.
+  unique_ptr<ParticipantOpState> op_state(
+      new ParticipantOpState(replica.get(), tablet->txn_participant(), req, resp));
+  op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
+      new RpcOpCompletionCallback<ParticipantResponsePB>(context, resp)));
+  s = replica->SubmitTxnParticipantOp(std::move(op_state));
+  if (PREDICT_FALSE(!s.ok())) {
+    SetupErrorAndRespond(resp->mutable_error(), s,
+                         TabletServerErrorPB::UNKNOWN_ERROR,
+                         context);
+  }
+}
+
 bool TabletServiceAdminImpl::SupportsFeature(uint32_t feature) const {
   switch (feature) {
     case TabletServerFeatures::COLUMN_PREDICATES:
diff --git a/src/kudu/tserver/tablet_service.h b/src/kudu/tserver/tablet_service.h
index 2c96835..a7ce79f 100644
--- a/src/kudu/tserver/tablet_service.h
+++ b/src/kudu/tserver/tablet_service.h
@@ -96,6 +96,8 @@ class CreateTabletRequestPB;
 class CreateTabletResponsePB;
 class DeleteTabletRequestPB;
 class DeleteTabletResponsePB;
+class ParticipantRequestPB;
+class ParticipantResponsePB;
 class QuiesceTabletServerRequestPB;
 class QuiesceTabletServerResponsePB;
 class ScanResultCollector;
@@ -233,6 +235,10 @@ class TabletServiceAdminImpl : public TabletServerAdminServiceIf {
                              CoordinateTransactionResponsePB* resp,
                              rpc::RpcContext* context) override;
 
+  void ParticipateInTransaction(const ParticipantRequestPB* req,
+                                ParticipantResponsePB* resp,
+                                rpc::RpcContext* context) override;
+
   bool SupportsFeature(uint32_t feature) const override;
 
  private:
diff --git a/src/kudu/tserver/tserver_admin.proto b/src/kudu/tserver/tserver_admin.proto
index e95a12a..9572318 100644
--- a/src/kudu/tserver/tserver_admin.proto
+++ b/src/kudu/tserver/tserver_admin.proto
@@ -111,7 +111,8 @@ message ParticipantOpPB {
 }
 
 message ParticipantRequestPB {
-  optional ParticipantOpPB op = 1;
+  optional bytes tablet_id = 1;
+  optional ParticipantOpPB op = 2;
 }
 
 message ParticipantResponsePB {
@@ -251,6 +252,10 @@ service TabletServerAdminService {
   // Coordinate the lifecycle of a transaction.
   rpc CoordinateTransaction(CoordinateTransactionRequestPB)
       returns (CoordinateTransactionResponsePB);
+
+  // Request that a tablet participate in a transaction, or update the state of
+  // an existing transaction participant.
+  rpc ParticipateInTransaction(ParticipantRequestPB) returns (ParticipantResponsePB);
 }
 
 message QuiesceTabletServerRequestPB {