You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/08/22 05:02:05 UTC

kudu git commit: KUDU-2032 (part 2): propagate master hostnames into client

Repository: kudu
Updated Branches:
  refs/heads/master a7d589635 -> 9d12910a4


KUDU-2032 (part 2): propagate master hostnames into client

This changes the client code to remember the user-specified master
addresses and propagate them into the creation of master proxies.

It's not possible to reproduce the necessary DNS configurations in a
minicluster test, but with this patch I am now able to use 'kudu perf
loadgen' against a Kerberized cluster even when my local krb5.conf has
rdns=false.

Change-Id: Ie5838f22c96f757124112b505825a53740468ce1
Reviewed-on: http://gerrit.cloudera.org:8080/7692
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/9d12910a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/9d12910a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/9d12910a

Branch: refs/heads/master
Commit: 9d12910a41c1660275d6cd9366f515c03b0ec5b5
Parents: a7d5896
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Aug 16 17:44:39 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Aug 22 04:32:56 2017 +0000

----------------------------------------------------------------------
 src/kudu/client/client-internal.cc              | 27 ++++++++-----
 src/kudu/client/client-internal.h               |  3 +-
 src/kudu/client/master_rpc.cc                   | 42 +++++++++++---------
 src/kudu/client/master_rpc.h                    | 11 ++---
 .../integration-tests/external_mini_cluster.cc  | 12 +++---
 5 files changed, 56 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/9d12910a/src/kudu/client/client-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index ef98aaf..2a1b7a3 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -25,6 +25,7 @@
 #include <mutex>
 #include <ostream>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include <boost/bind.hpp> // IWYU pragma: keep
@@ -61,6 +62,7 @@
 #include "kudu/util/thread_restrictions.h"
 #include "kudu/rpc/connection.h"
 
+using std::pair;
 using std::set;
 using std::shared_ptr;
 using std::string;
@@ -636,10 +638,13 @@ Status KuduClient::Data::GetTableSchema(KuduClient* client,
 
 void KuduClient::Data::ConnectedToClusterCb(
     const Status& status,
-    const Sockaddr& leader_addr,
+    const pair<Sockaddr, string>& leader_addr_and_name,
     const master::ConnectToMasterResponsePB& connect_response,
     CredentialsPolicy cred_policy) {
 
+  const auto& leader_addr = leader_addr_and_name.first;
+  const auto& leader_hostname = leader_addr_and_name.second;
+
   // Ensure that all of the CAs reported by the master are trusted
   // in our local TLS configuration.
   if (status.ok()) {
@@ -680,9 +685,8 @@ void KuduClient::Data::ConnectedToClusterCb(
     }
 
     if (status.ok()) {
-      leader_master_hostport_ = HostPort(leader_addr);
-      // TODO(KUDU-2032): retain the original hostname passed by caller.
-      master_proxy_.reset(new MasterServiceProxy(messenger_, leader_addr, leader_addr.host()));
+      leader_master_hostport_ = HostPort(leader_hostname, leader_addr.port());
+      master_proxy_.reset(new MasterServiceProxy(messenger_, leader_addr, leader_hostname));
     }
   }
 
@@ -705,12 +709,15 @@ void KuduClient::Data::ConnectToClusterAsync(KuduClient* client,
                                              CredentialsPolicy creds_policy) {
   DCHECK(deadline.Initialized());
 
-  vector<Sockaddr> master_sockaddrs;
+  vector<pair<Sockaddr, string>> master_addrs_with_names;
   for (const string& master_server_addr : master_server_addrs_) {
     vector<Sockaddr> addrs;
-    Status s;
-    // TODO: Do address resolution asynchronously as well.
-    s = ParseAddressList(master_server_addr, master::Master::kDefaultPort, &addrs);
+    HostPort hp;
+    Status s = hp.ParseString(master_server_addr, master::Master::kDefaultPort);
+    if (s.ok()) {
+      // TODO(todd): Do address resolution asynchronously as well.
+      s = hp.ResolveAddresses(&addrs);
+    }
     if (!s.ok()) {
       cb.Run(s);
       return;
@@ -725,7 +732,7 @@ void KuduClient::Data::ConnectToClusterAsync(KuduClient* client,
           << "Specified master server address '" << master_server_addr << "' "
           << "resolved to multiple IPs. Using " << addrs[0].ToString();
     }
-    master_sockaddrs.push_back(addrs[0]);
+    master_addrs_with_names.emplace_back(addrs[0], hp.host());
   }
 
   // This ensures that no more than one ConnectToClusterRpc of each credentials
@@ -772,7 +779,7 @@ void KuduClient::Data::ConnectToClusterAsync(KuduClient* client,
             std::placeholders::_2,
             std::placeholders::_3,
             creds_policy),
-        std::move(master_sockaddrs),
+        std::move(master_addrs_with_names),
         deadline,
         client->default_rpc_timeout(),
         messenger_,

http://git-wip-us.apache.org/repos/asf/kudu/blob/9d12910a/src/kudu/client/client-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.h b/src/kudu/client/client-internal.h
index d19d579..44da7b9 100644
--- a/src/kudu/client/client-internal.h
+++ b/src/kudu/client/client-internal.h
@@ -25,6 +25,7 @@
 #include <set>
 #include <string>
 #include <unordered_set>
+#include <utility>
 #include <vector>
 
 #include "kudu/client/client.h"
@@ -155,7 +156,7 @@ class KuduClient::Data {
   //
   // See also: ConnectToClusterAsync.
   void ConnectedToClusterCb(const Status& status,
-                            const Sockaddr& leader_addr,
+                            const std::pair<Sockaddr, std::string>& leader_addr_and_name,
                             const master::ConnectToMasterResponsePB& connect_response,
                             rpc::CredentialsPolicy cred_policy);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/9d12910a/src/kudu/client/master_rpc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/master_rpc.cc b/src/kudu/client/master_rpc.cc
index cb2fc2c..6828394 100644
--- a/src/kudu/client/master_rpc.cc
+++ b/src/kudu/client/master_rpc.cc
@@ -22,6 +22,7 @@
 #include <algorithm>
 #include <mutex>
 #include <ostream>
+#include <utility>
 
 #include <boost/bind.hpp>
 #include <glog/logging.h>
@@ -40,6 +41,7 @@
 #include "kudu/util/status.h"
 #include "kudu/util/status_callback.h"
 
+using std::pair;
 using std::shared_ptr;
 using std::string;
 using std::vector;
@@ -54,6 +56,7 @@ using kudu::master::MasterServiceProxy;
 using kudu::rpc::CredentialsPolicy;
 using kudu::rpc::Messenger;
 using kudu::rpc::Rpc;
+using strings::Substitute;
 
 namespace kudu {
 namespace client {
@@ -74,7 +77,7 @@ class ConnectToMasterRpc : public rpc::Rpc {
   //
   // Invokes 'user_cb' upon failure or success of the RPC call.
   ConnectToMasterRpc(StatusCallback user_cb,
-                     const Sockaddr& addr,
+                     pair<Sockaddr, string> addr_with_name,
                      const MonoTime& deadline,
                      std::shared_ptr<rpc::Messenger> messenger,
                      CredentialsPolicy creds_policy,
@@ -90,7 +93,9 @@ class ConnectToMasterRpc : public rpc::Rpc {
   virtual void SendRpcCb(const Status& status) OVERRIDE;
 
   const StatusCallback user_cb_;
-  const Sockaddr addr_;
+
+  // The resolved address to try to connect to, along with its original specified hostname.
+  const pair<Sockaddr, string> addr_with_name_;
 
   // Owned by the caller of this RPC, not this instance.
   ConnectToMasterResponsePB* out_;
@@ -105,14 +110,14 @@ class ConnectToMasterRpc : public rpc::Rpc {
 
 
 ConnectToMasterRpc::ConnectToMasterRpc(StatusCallback user_cb,
-    const Sockaddr& addr,
+    pair<Sockaddr, string> addr_with_name,
     const MonoTime& deadline,
     shared_ptr<Messenger> messenger,
     rpc::CredentialsPolicy creds_policy,
     ConnectToMasterResponsePB* out)
       : Rpc(deadline, std::move(messenger)),
         user_cb_(std::move(user_cb)),
-        addr_(addr),
+        addr_with_name_(std::move(addr_with_name)),
         out_(DCHECK_NOTNULL(out)) {
   mutable_retrier()->mutable_controller()->set_credentials_policy(creds_policy);
 }
@@ -121,8 +126,7 @@ ConnectToMasterRpc::~ConnectToMasterRpc() {
 }
 
 void ConnectToMasterRpc::SendRpc() {
-  // TODO(KUDU-2032): retain the hostname for addr_
-  MasterServiceProxy proxy(retrier().messenger(), addr_, addr_.host());
+  MasterServiceProxy proxy(retrier().messenger(), addr_with_name_.first, addr_with_name_.second);
   rpc::RpcController* controller = mutable_retrier()->mutable_controller();
   // TODO(todd): should this be setting an RPC call deadline based on 'deadline'?
   // it doesn't seem to be.
@@ -143,8 +147,9 @@ void ConnectToMasterRpc::SendRpc() {
 }
 
 string ConnectToMasterRpc::ToString() const {
-  return strings::Substitute("ConnectToMasterRpc(address: $0, num_attempts: $1)",
-                             addr_.ToString(), num_attempts());
+  return strings::Substitute("ConnectToMasterRpc(address: $0:$1, num_attempts: $2)",
+                             addr_with_name_.second, addr_with_name_.first.port(),
+                             num_attempts());
 }
 
 void ConnectToMasterRpc::SendRpcCb(const Status& status) {
@@ -204,21 +209,21 @@ void ConnectToMasterRpc::SendRpcCb(const Status& status) {
 ////////////////////////////////////////////////////////////
 
 ConnectToClusterRpc::ConnectToClusterRpc(LeaderCallback user_cb,
-                                         vector<Sockaddr> addrs,
+                                         vector<pair<Sockaddr, string>> addrs_with_names,
                                          MonoTime deadline,
                                          MonoDelta rpc_timeout,
                                          shared_ptr<Messenger> messenger,
                                          rpc::CredentialsPolicy creds_policy)
     : Rpc(deadline, std::move(messenger)),
       user_cb_(std::move(user_cb)),
-      addrs_(std::move(addrs)),
+      addrs_with_names_(std::move(addrs_with_names)),
       rpc_timeout_(rpc_timeout),
       pending_responses_(0),
       completed_(false) {
   DCHECK(deadline.Initialized());
 
   // Using resize instead of reserve to explicitly initialized the values.
-  responses_.resize(addrs_.size());
+  responses_.resize(addrs_with_names_.size());
   mutable_retrier()->mutable_controller()->set_credentials_policy(creds_policy);
 }
 
@@ -226,12 +231,13 @@ ConnectToClusterRpc::~ConnectToClusterRpc() {
 }
 
 string ConnectToClusterRpc::ToString() const {
-  vector<string> sockaddr_str;
-  for (const Sockaddr& addr : addrs_) {
-    sockaddr_str.push_back(addr.ToString());
+  vector<string> addrs_str;
+  for (const auto& addr_with_name : addrs_with_names_) {
+    addrs_str.emplace_back(Substitute(
+        "$0:$1", addr_with_name.second, addr_with_name.first.port()));
   }
   return strings::Substitute("ConnectToClusterRpc(addrs: $0, num_attempts: $1)",
-                             JoinStrings(sockaddr_str, ","),
+                             JoinStrings(addrs_str, ","),
                              num_attempts());
 }
 
@@ -241,10 +247,10 @@ void ConnectToClusterRpc::SendRpc() {
   const MonoTime actual_deadline = std::min(retrier().deadline(), rpc_deadline);
 
   std::lock_guard<simple_spinlock> l(lock_);
-  for (int i = 0; i < addrs_.size(); i++) {
+  for (int i = 0; i < addrs_with_names_.size(); i++) {
     ConnectToMasterRpc* rpc = new ConnectToMasterRpc(
         Bind(&ConnectToClusterRpc::SingleNodeCallback, this, i),
-        addrs_[i],
+        addrs_with_names_[i],
         actual_deadline,
         retrier().messenger(),
         retrier().controller().credentials_policy(),
@@ -284,7 +290,7 @@ void ConnectToClusterRpc::SendRpcCb(const Status& status) {
   // We are not retrying.
   undo_completed.cancel();
   if (leader_idx_ != -1) {
-    user_cb_(status, addrs_[leader_idx_], responses_[leader_idx_]);
+    user_cb_(status, addrs_with_names_[leader_idx_], responses_[leader_idx_]);
   } else {
     user_cb_(status, {}, {});
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/9d12910a/src/kudu/client/master_rpc.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/master_rpc.h b/src/kudu/client/master_rpc.h
index 39cec99..c951aa0 100644
--- a/src/kudu/client/master_rpc.h
+++ b/src/kudu/client/master_rpc.h
@@ -20,8 +20,9 @@
 
 #include <functional>
 #include <memory>
-#include <vector>
 #include <string>
+#include <utility>
+#include <vector>
 
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
@@ -70,7 +71,7 @@ class ConnectToClusterRpc : public rpc::Rpc,
  public:
   typedef std::function<void(
       const Status& status,
-      const Sockaddr& leader_master,
+      const std::pair<Sockaddr, std::string> leader_master,
       const master::ConnectToMasterResponsePB& connect_response)> LeaderCallback;
   // The host and port of the leader master server is stored in
   // 'leader_master', which must remain valid for the lifetime of this
@@ -80,7 +81,7 @@ class ConnectToClusterRpc : public rpc::Rpc,
   // until 'deadline' passes. Each RPC has 'rpc_timeout' time to complete
   // before it times out and may be retried if 'deadline' has not yet passed.
   ConnectToClusterRpc(LeaderCallback user_cb,
-                      std::vector<Sockaddr> addrs,
+                      std::vector<std::pair<Sockaddr, std::string>> addrs_with_names,
                       MonoTime deadline,
                       MonoDelta rpc_timeout,
                       std::shared_ptr<rpc::Messenger> messenger,
@@ -106,8 +107,8 @@ class ConnectToClusterRpc : public rpc::Rpc,
 
   const LeaderCallback user_cb_;
 
-  // The addresses of the masters.
-  const std::vector<Sockaddr> addrs_;
+  // The addresses of the masters, along with their original specified names.
+  const std::vector<std::pair<Sockaddr, std::string>> addrs_with_names_;
 
   // The amount of time alloted to each GetMasterRegistration RPC.
   const MonoDelta rpc_timeout_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/9d12910a/src/kudu/integration-tests/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster.cc b/src/kudu/integration-tests/external_mini_cluster.cc
index fd1b8cc..2ffe0e0 100644
--- a/src/kudu/integration-tests/external_mini_cluster.cc
+++ b/src/kudu/integration-tests/external_mini_cluster.cc
@@ -25,6 +25,7 @@
 #include <memory>
 #include <string>
 #include <unordered_set>
+#include <utility>
 
 #include <gflags/gflags.h>
 #include <gtest/gtest.h>
@@ -79,6 +80,7 @@ using kudu::tserver::ListTabletsRequestPB;
 using kudu::tserver::ListTabletsResponsePB;
 using kudu::tserver::TabletServerServiceProxy;
 using rapidjson::Value;
+using std::pair;
 using std::string;
 using std::unique_ptr;
 using std::unordered_set;
@@ -523,23 +525,23 @@ Status ExternalMiniCluster::WaitForTabletsRunning(ExternalTabletServer* ts,
 Status ExternalMiniCluster::GetLeaderMasterIndex(int* idx) {
   scoped_refptr<ConnectToClusterRpc> rpc;
   Synchronizer sync;
-  vector<Sockaddr> addrs;
+  vector<pair<Sockaddr, string>> addrs_with_names;
   Sockaddr leader_master_addr;
   MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
 
   for (const scoped_refptr<ExternalMaster>& master : masters_) {
-    addrs.push_back(master->bound_rpc_addr());
+    addrs_with_names.emplace_back(master->bound_rpc_addr(), master->bound_rpc_addr().host());
   }
   const auto& cb = [&](const Status& status,
-                       const Sockaddr& leader_master,
+                       const pair<Sockaddr, string>& leader_master,
                        const master::ConnectToMasterResponsePB& resp) {
     if (status.ok()) {
-      leader_master_addr = leader_master;
+      leader_master_addr = leader_master.first;
     }
     sync.StatusCB(status);
   };
   rpc.reset(new ConnectToClusterRpc(cb,
-                                    std::move(addrs),
+                                    std::move(addrs_with_names),
                                     deadline,
                                     MonoDelta::FromSeconds(5),
                                     messenger_));