You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/02/03 05:29:44 UTC

kudu git commit: Add new ConnectToMaster RPC, implement client fallback

Repository: kudu
Updated Branches:
  refs/heads/master cb06dd12a -> d25430e28


Add new ConnectToMaster RPC, implement client fallback

This adds a new RPC to the master called 'ConnectToMaster', which the
client uses when first connecting to the cluster. The client used to use
GetMasterRegistration for this purpose, but that returned some
unnecessary information, and was overloaded for multiple purposes. This
purpose-built RPC will be more useful for piggy-backing cluster security
properties, etc.

As before, the client sends the RPC to all of the leaders in parallel,
and believes any that says it is the leader.

In order to preserve backwards compatibility, this introduces a new
feature flag in the master protocol. The client sends the new RPC with
the new feature flag, and if it sees that it was rejected, falls back to
the old RPC.

The code is somewhat ugly: as we've known for a while, the
Rpc/RetriableRpc/RpcRetrier thing is due for a revamp. But, I didn't
want to take on revamping it at this point. So, I hope you'll forgive
the spaghetti.

In addition to the included unit test, I also manually tested the
fallback behavior using 'kudu table list' against an existing cluster
running Kudu 1.2.

Change-Id: I879801400ede4209679e6eb14eceb43775916c78
Reviewed-on: http://gerrit.cloudera.org:8080/5869
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/d25430e2
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/d25430e2
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/d25430e2

Branch: refs/heads/master
Commit: d25430e28e3e9441408acd2b58d4320bc9f3e17b
Parents: cb06dd1
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Feb 1 19:54:53 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Fri Feb 3 05:07:00 2017 +0000

----------------------------------------------------------------------
 src/kudu/client/client-test.cc     |  21 ++++++
 src/kudu/client/master_rpc.cc      | 123 +++++++++++++++++++++-----------
 src/kudu/client/master_rpc.h       |  14 ++--
 src/kudu/master/catalog_manager.cc |   1 +
 src/kudu/master/master.proto       |  33 +++++++--
 src/kudu/master/master_service.cc  |  20 ++++++
 src/kudu/master/master_service.h   |   4 ++
 src/kudu/rpc/rpc_controller.cc     |   1 +
 src/kudu/rpc/rpc_controller.h      |   1 +
 9 files changed, 166 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/d25430e2/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index e6e88a1..ed91910 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -71,6 +71,7 @@
 
 DECLARE_bool(fail_dns_resolution);
 DECLARE_bool(log_inject_latency);
+DECLARE_bool(master_support_connect_to_master_rpc);
 DECLARE_bool(allow_unsafe_replication_factor);
 DECLARE_int32(heartbeat_interval_ms);
 DECLARE_int32(leader_failure_exp_backoff_max_delta_ms);
@@ -4629,5 +4630,25 @@ TEST_F(ClientTest, TestErrorCollector) {
     }
 }
 
+// Test that, when connecting to an old-version master which doesn't support
+// the 'ConnectToCluster' RPC, we still fall back to the old 'GetMasterRegistration'
+// RPC.
+TEST_F(ClientTest, TestConnectToClusterCompatibility) {
+  FLAGS_master_support_connect_to_master_rpc = false;
+
+  const auto& ent = cluster_->mini_master()->master()->metric_entity();
+  const auto& metric = METRIC_handler_latency_kudu_master_MasterService_GetMasterRegistration
+      .Instantiate(ent);
+  int initial_val = metric->TotalCount();
+
+  // Reconnect the client. Since we disabled 'ConnectToCluster', the client should
+  // fall back to using GetMasterRegistration instead.
+  ASSERT_OK(KuduClientBuilder()
+            .add_master_server_addr(cluster_->mini_master()->bound_rpc_addr().ToString())
+            .Build(&client_));
+  int final_val = metric->TotalCount();
+  ASSERT_EQ(initial_val + 1, final_val);
+}
+
 } // namespace client
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/d25430e2/src/kudu/client/master_rpc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/master_rpc.cc b/src/kudu/client/master_rpc.cc
index a04cc32..8c0dbd6 100644
--- a/src/kudu/client/master_rpc.cc
+++ b/src/kudu/client/master_rpc.cc
@@ -37,6 +37,8 @@ using std::string;
 using std::vector;
 
 using kudu::consensus::RaftPeerPB;
+using kudu::master::ConnectToMasterRequestPB;
+using kudu::master::ConnectToMasterResponsePB;
 using kudu::master::GetMasterRegistrationRequestPB;
 using kudu::master::GetMasterRegistrationResponsePB;
 using kudu::master::MasterErrorPB;
@@ -53,22 +55,22 @@ namespace internal {
 ////////////////////////////////////////////////////////////
 namespace {
 
-// An RPC for getting a Master server's registration.
-class GetMasterRegistrationRpc : public rpc::Rpc {
+// An RPC for trying to connect via a particular Master.
+class ConnectToMasterRpc : public rpc::Rpc {
  public:
 
-  // Create a wrapper object for a retriable GetMasterRegistration RPC
+  // Create a wrapper object for a retriable ConnectToMaster RPC
   // to 'addr'. The result is stored in 'out', which must be a valid
   // pointer for the lifetime of this object.
   //
   // Invokes 'user_cb' upon failure or success of the RPC call.
-  GetMasterRegistrationRpc(StatusCallback user_cb,
-                           const Sockaddr& addr,
-                           const MonoTime& deadline,
-                           std::shared_ptr<rpc::Messenger> messenger,
-                           ServerEntryPB* out);
+  ConnectToMasterRpc(StatusCallback user_cb,
+                     const Sockaddr& addr,
+                     const MonoTime& deadline,
+                     std::shared_ptr<rpc::Messenger> messenger,
+                     ConnectToMasterResponsePB* out);
 
-  ~GetMasterRegistrationRpc();
+  ~ConnectToMasterRpc();
 
   virtual void SendRpc() OVERRIDE;
 
@@ -77,66 +79,107 @@ class GetMasterRegistrationRpc : public rpc::Rpc {
  private:
   virtual void SendRpcCb(const Status& status) OVERRIDE;
 
-  StatusCallback user_cb_;
-  Sockaddr addr_;
+  const StatusCallback user_cb_;
+  const Sockaddr addr_;
 
-  ServerEntryPB* out_;
+  // Owned by the caller of this RPC, not this instance.
+  ConnectToMasterResponsePB* out_;
 
-  GetMasterRegistrationResponsePB resp_;
+  // When connecting to Kudu <1.3 masters, the ConnectToMaster
+  // RPC is not supported. Instead we use GetMasterRegistration(),
+  // store the response here, and convert it to look like the new
+  // style response.
+  GetMasterRegistrationResponsePB old_rpc_resp_;
+  bool trying_old_rpc_ = false;
 };
 
 
-GetMasterRegistrationRpc::GetMasterRegistrationRpc(
-    StatusCallback user_cb, const Sockaddr& addr, const MonoTime& deadline,
-    shared_ptr<Messenger> messenger, ServerEntryPB* out)
+ConnectToMasterRpc::ConnectToMasterRpc(
+    StatusCallback user_cb, const Sockaddr& addr,const MonoTime& deadline,
+    shared_ptr<Messenger> messenger, ConnectToMasterResponsePB* out)
     : Rpc(deadline, std::move(messenger)),
       user_cb_(std::move(user_cb)),
       addr_(addr),
       out_(DCHECK_NOTNULL(out)) {}
 
-GetMasterRegistrationRpc::~GetMasterRegistrationRpc() {
+ConnectToMasterRpc::~ConnectToMasterRpc() {
 }
 
-void GetMasterRegistrationRpc::SendRpc() {
+void ConnectToMasterRpc::SendRpc() {
   MasterServiceProxy proxy(retrier().messenger(),
                            addr_);
-  GetMasterRegistrationRequestPB req;
-  proxy.GetMasterRegistrationAsync(req, &resp_,
-                                   mutable_retrier()->mutable_controller(),
-                                   boost::bind(&GetMasterRegistrationRpc::SendRpcCb,
-                                               this,
-                                               Status::OK()));
+  rpc::RpcController* rpc = mutable_retrier()->mutable_controller();
+  // TODO(todd): should this be setting an RPC call deadline based on 'deadline'?
+  // it doesn't seem to be.
+  if (!trying_old_rpc_) {
+    ConnectToMasterRequestPB req;
+    rpc->RequireServerFeature(master::MasterFeatures::CONNECT_TO_MASTER);
+    proxy.ConnectToMasterAsync(req, out_, rpc,
+                               boost::bind(&ConnectToMasterRpc::SendRpcCb,
+                                           this,
+                                           Status::OK()));
+  } else {
+    GetMasterRegistrationRequestPB req;
+    proxy.GetMasterRegistrationAsync(req, &old_rpc_resp_, rpc,
+                                     boost::bind(&ConnectToMasterRpc::SendRpcCb,
+                                                 this,
+                                                 Status::OK()));
+  }
 }
 
-string GetMasterRegistrationRpc::ToString() const {
-  return strings::Substitute("GetMasterRegistrationRpc(address: $0, num_attempts: $1)",
+string ConnectToMasterRpc::ToString() const {
+  return strings::Substitute("ConnectToMasterRpc(address: $0, num_attempts: $1)",
                              addr_.ToString(), num_attempts());
 }
 
-void GetMasterRegistrationRpc::SendRpcCb(const Status& status) {
-  gscoped_ptr<GetMasterRegistrationRpc> deleter(this);
+void ConnectToMasterRpc::SendRpcCb(const Status& status) {
+  // NOTE: 'status' here is actually coming from the RpcRetrier. If we successfully
+  // send an RPC, it will be 'Status::OK'.
+  // TODO(todd): this is the most confusing code I've ever seen...
+  gscoped_ptr<ConnectToMasterRpc> deleter(this);
   Status new_status = status;
   if (new_status.ok() && mutable_retrier()->HandleResponse(this, &new_status)) {
     ignore_result(deleter.release());
     return;
   }
-  if (new_status.ok() && resp_.has_error()) {
-    if (resp_.error().code() == MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED) {
+
+  rpc::RpcController* rpc = mutable_retrier()->mutable_controller();
+  if (!trying_old_rpc_ &&
+      new_status.IsRemoteError() &&
+      rpc->error_response()->unsupported_feature_flags_size() > 0) {
+    VLOG(1) << "Connecting to an old-version cluster which does not support ConnectToCluster(). "
+            << "Falling back to GetMasterRegistration().";
+    trying_old_rpc_ = true;
+    // retry immediately.
+    ignore_result(deleter.release());
+    rpc->Reset();
+    SendRpc();
+    return;
+  }
+
+  // If we sent the old RPC, then translate its response to the new RPC.
+  if (trying_old_rpc_) {
+    out_->Clear();
+    if (old_rpc_resp_.has_error()) {
+      out_->set_allocated_error(old_rpc_resp_.release_error());
+    }
+    if (old_rpc_resp_.has_role()) {
+      out_->set_role(old_rpc_resp_.role());
+    }
+  }
+
+  if (new_status.ok() && out_->has_error()) {
+    if (out_->error().code() == MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED) {
       // If CatalogManager is not initialized, treat the node as a
       // FOLLOWER for the time being, as currently this RPC is only
       // used for the purposes of finding the leader master.
-      resp_.set_role(RaftPeerPB::FOLLOWER);
+      out_->set_role(RaftPeerPB::FOLLOWER);
       new_status = Status::OK();
     } else {
-      out_->mutable_error()->CopyFrom(resp_.error().status());
-      new_status = StatusFromPB(resp_.error().status());
+      out_->mutable_error()->CopyFrom(out_->error().status());
+      new_status = StatusFromPB(out_->error().status());
     }
   }
-  if (new_status.ok()) {
-    out_->mutable_instance_id()->CopyFrom(resp_.instance_id());
-    out_->mutable_registration()->CopyFrom(resp_.registration());
-    out_->set_role(resp_.role());
-  }
   user_cb_.Run(new_status);
 }
 
@@ -184,7 +227,7 @@ void ConnectToClusterRpc::SendRpc() {
 
   std::lock_guard<simple_spinlock> l(lock_);
   for (int i = 0; i < addrs_.size(); i++) {
-    GetMasterRegistrationRpc* rpc = new GetMasterRegistrationRpc(
+    ConnectToMasterRpc* rpc = new ConnectToMasterRpc(
         Bind(&ConnectToClusterRpc::SingleNodeCallback,
              this, ConstRef(addrs_[i]), ConstRef(responses_[i])),
         addrs_[i],
@@ -229,7 +272,7 @@ void ConnectToClusterRpc::SendRpcCb(const Status& status) {
 }
 
 void ConnectToClusterRpc::SingleNodeCallback(const Sockaddr& node_addr,
-                                             const ServerEntryPB& resp,
+                                             const ConnectToMasterResponsePB& resp,
                                              const Status& status) {
   // TODO(todd): handle the situation where one Master is partitioned from
   // the rest of the Master consensus configuration, all are reachable by the client,

http://git-wip-us.apache.org/repos/asf/kudu/blob/d25430e2/src/kudu/client/master_rpc.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/master_rpc.h b/src/kudu/client/master_rpc.h
index 3170693..7fe278d 100644
--- a/src/kudu/client/master_rpc.h
+++ b/src/kudu/client/master_rpc.h
@@ -30,7 +30,9 @@
 
 namespace kudu {
 
-class ServerEntryPB;
+namespace master {
+class ConnectToMasterResponsePB;
+} // namespace master
 class HostPort;
 
 namespace client {
@@ -39,6 +41,10 @@ namespace internal {
 // In parallel, send requests to the specified Master servers until a
 // response comes back from the leader of the Master consensus configuration.
 //
+// In addition to locating the leader, this fetches cluster-wide details
+// such as an authentication token for the current user, the cluster's
+// CA cert, etc.
+//
 // If queries have been made to all of the specified servers, but no
 // leader has been found, we re-try again (with an increasing delay,
 // see: RpcRetrier in kudu/rpc/rpc.{cc,h}) until a specified deadline
@@ -87,7 +93,7 @@ class ConnectToClusterRpc : public rpc::Rpc,
   // master is a leader, or if responses have been received from all
   // of the Masters.
   void SingleNodeCallback(const Sockaddr& node_addr,
-                          const ServerEntryPB& resp,
+                          const master::ConnectToMasterResponsePB& resp,
                           const Status& status);
 
   LeaderCallback user_cb_;
@@ -99,9 +105,7 @@ class ConnectToClusterRpc : public rpc::Rpc,
   MonoDelta rpc_timeout_;
 
   // The received responses.
-  //
-  // See also: GetMasterRegistrationRpc above.
-  std::vector<ServerEntryPB> responses_;
+  std::vector<master::ConnectToMasterResponsePB> responses_;
 
   // Number of pending responses.
   int pending_responses_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/d25430e2/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 1afd3bb..e29aca1 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -3754,6 +3754,7 @@ bool CatalogManager::ScopedLeaderSharedLock::CheckIsInitializedAndIsLeaderOrResp
   CatalogManager::ScopedLeaderSharedLock::CheckIsInitializedAndIsLeaderOrRespond( \
       RespClass* resp, RpcContext* rpc)
 
+INITTED_OR_RESPOND(ConnectToMasterResponsePB);
 INITTED_OR_RESPOND(GetMasterRegistrationResponsePB);
 INITTED_OR_RESPOND(TSHeartbeatResponsePB);
 INITTED_AND_LEADER_OR_RESPOND(AlterTableResponsePB);

http://git-wip-us.apache.org/repos/asf/kudu/blob/d25430e2/src/kudu/master/master.proto
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index acdf0e9..bc23b91 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -543,6 +543,17 @@ message GetTableSchemaResponsePB {
   optional string table_name = 7;
 }
 
+message ConnectToMasterRequestPB {
+}
+
+message ConnectToMasterResponsePB {
+  // Set if there an error.
+  optional MasterErrorPB error = 1;
+
+  // The current role of the master.
+  optional consensus.RaftPeerPB.Role role = 2;
+}
+
 // ============================================================================
 //  Administration/monitoring
 // ============================================================================
@@ -601,15 +612,21 @@ enum MasterFeatures {
   RANGE_PARTITION_BOUNDS = 1;
   // The master supports adding and dropping range partitions.
   ADD_DROP_RANGE_PARTITIONS = 2;
+  // The master supports the 'ConnectToMaster' RPC.
+  CONNECT_TO_MASTER = 3;
 }
 
 service MasterService {
-  rpc Ping(PingRequestPB) returns (PingResponsePB);
-
   // TS->Master RPCs
+  // ------------------------------------------------------------
   rpc TSHeartbeat(TSHeartbeatRequestPB) returns (TSHeartbeatResponsePB);
 
   // Client->Master RPCs
+  // ------------------------------------------------------------
+
+  // Used only by Kudu 1.3 and later.
+  rpc ConnectToMaster(ConnectToMasterRequestPB) returns (ConnectToMasterResponsePB);
+
   rpc GetTabletLocations(GetTabletLocationsRequestPB) returns (GetTabletLocationsResponsePB);
 
   rpc CreateTable(CreateTableRequestPB) returns (CreateTableResponsePB);
@@ -625,14 +642,16 @@ service MasterService {
   rpc GetTableSchema(GetTableSchemaRequestPB) returns (GetTableSchemaResponsePB);
 
   // Administrative/monitoring RPCs
+  // ------------------------------------------------------------
   rpc ListTabletServers(ListTabletServersRequestPB) returns (ListTabletServersResponsePB);
   rpc ListMasters(ListMastersRequestPB) returns (ListMastersResponsePB);
+  rpc Ping(PingRequestPB) returns (PingResponsePB);
+
+  // Master->Master RPCs
+  // ------------------------------------------------------------
 
-  // TODO(todd): rename this RPC to ConnectToCluster() or somesuch. It's only used by
-  // the client. However, we need to keep in mind compatibility. We'll probably have to
-  // do this by adding a new RPC which use the existing protos, and having the server
-  // implement it, but the client not send it. After it's been out for a few releases,
-  // we can start sending it from clients?
+  // NOTE: this RPC is also used by Kudu client <= 1.2 when first connecting to the
+  // cluster.
   rpc GetMasterRegistration(GetMasterRegistrationRequestPB) returns
     (GetMasterRegistrationResponsePB);
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/d25430e2/src/kudu/master/master_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index c6ec327..13c319c 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -40,6 +40,12 @@ DEFINE_int32(master_inject_latency_on_tablet_lookups_ms, 0,
 TAG_FLAG(master_inject_latency_on_tablet_lookups_ms, unsafe);
 TAG_FLAG(master_inject_latency_on_tablet_lookups_ms, hidden);
 
+DEFINE_bool(master_support_connect_to_master_rpc, true,
+            "Whether to support the ConnectToMaster() RPC. Used for testing "
+            "version compatibility fallback in the client.");
+TAG_FLAG(master_support_connect_to_master_rpc, unsafe);
+TAG_FLAG(master_support_connect_to_master_rpc, hidden);
+
 namespace kudu {
 namespace master {
 
@@ -343,10 +349,24 @@ void MasterServiceImpl::GetMasterRegistration(const GetMasterRegistrationRequest
   rpc->RespondSuccess();
 }
 
+void MasterServiceImpl::ConnectToMaster(const ConnectToMasterRequestPB* /*req*/,
+                                        ConnectToMasterResponsePB* resp,
+                                        rpc::RpcContext* rpc) {
+  CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
+  if (!l.CheckIsInitializedOrRespond(resp, rpc)) {
+    return;
+  }
+  resp->set_role(server_->catalog_manager()->Role());
+
+  rpc->RespondSuccess();
+}
+
 bool MasterServiceImpl::SupportsFeature(uint32_t feature) const {
   switch (feature) {
     case MasterFeatures::RANGE_PARTITION_BOUNDS:
     case MasterFeatures::ADD_DROP_RANGE_PARTITIONS: return true;
+    case MasterFeatures::CONNECT_TO_MASTER:
+      return FLAGS_master_support_connect_to_master_rpc;
     default: return false;
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/d25430e2/src/kudu/master/master_service.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/master_service.h b/src/kudu/master/master_service.h
index 89a310a..4aa87eb 100644
--- a/src/kudu/master/master_service.h
+++ b/src/kudu/master/master_service.h
@@ -84,6 +84,10 @@ class MasterServiceImpl : public MasterServiceIf {
                                      GetMasterRegistrationResponsePB* resp,
                                      rpc::RpcContext* rpc) OVERRIDE;
 
+  virtual void ConnectToMaster(const ConnectToMasterRequestPB* req,
+                               ConnectToMasterResponsePB* resp,
+                               rpc::RpcContext* rpc) OVERRIDE;
+
   bool SupportsFeature(uint32_t feature) const override;
 
  private:

http://git-wip-us.apache.org/repos/asf/kudu/blob/d25430e2/src/kudu/rpc/rpc_controller.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_controller.cc b/src/kudu/rpc/rpc_controller.cc
index cfa73e6..adaf5ce 100644
--- a/src/kudu/rpc/rpc_controller.cc
+++ b/src/kudu/rpc/rpc_controller.cc
@@ -53,6 +53,7 @@ void RpcController::Reset() {
     CHECK(finished());
   }
   call_.reset();
+  required_server_features_.clear();
 }
 
 bool RpcController::finished() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/d25430e2/src/kudu/rpc/rpc_controller.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_controller.h b/src/kudu/rpc/rpc_controller.h
index fbe94c7..cce1ff2 100644
--- a/src/kudu/rpc/rpc_controller.h
+++ b/src/kudu/rpc/rpc_controller.h
@@ -53,6 +53,7 @@ class RpcController {
   void Swap(RpcController* other);
 
   // Reset this controller so it may be used with another call.
+  // Note that this resets the required server features.
   void Reset();
 
   // Return true if the call has finished.