You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/08/22 05:02:05 UTC
kudu git commit: KUDU-2032 (part 2): propagate master hostnames into
client
Repository: kudu
Updated Branches:
refs/heads/master a7d589635 -> 9d12910a4
KUDU-2032 (part 2): propagate master hostnames into client
This changes the client code to remember the user-specified master
addresses and propagate them into the creation of master proxies.
It's not possible to reproduce the necessary DNS configurations in a
minicluster test, but with this patch I am now able to use 'kudu perf
loadgen' against a Kerberized cluster even when my local krb5.conf has
rdns=false.
Change-Id: Ie5838f22c96f757124112b505825a53740468ce1
Reviewed-on: http://gerrit.cloudera.org:8080/7692
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/9d12910a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/9d12910a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/9d12910a
Branch: refs/heads/master
Commit: 9d12910a41c1660275d6cd9366f515c03b0ec5b5
Parents: a7d5896
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Aug 16 17:44:39 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Aug 22 04:32:56 2017 +0000
----------------------------------------------------------------------
src/kudu/client/client-internal.cc | 27 ++++++++-----
src/kudu/client/client-internal.h | 3 +-
src/kudu/client/master_rpc.cc | 42 +++++++++++---------
src/kudu/client/master_rpc.h | 11 ++---
.../integration-tests/external_mini_cluster.cc | 12 +++---
5 files changed, 56 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/9d12910a/src/kudu/client/client-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index ef98aaf..2a1b7a3 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -25,6 +25,7 @@
#include <mutex>
#include <ostream>
#include <string>
+#include <utility>
#include <vector>
#include <boost/bind.hpp> // IWYU pragma: keep
@@ -61,6 +62,7 @@
#include "kudu/util/thread_restrictions.h"
#include "kudu/rpc/connection.h"
+using std::pair;
using std::set;
using std::shared_ptr;
using std::string;
@@ -636,10 +638,13 @@ Status KuduClient::Data::GetTableSchema(KuduClient* client,
void KuduClient::Data::ConnectedToClusterCb(
const Status& status,
- const Sockaddr& leader_addr,
+ const pair<Sockaddr, string>& leader_addr_and_name,
const master::ConnectToMasterResponsePB& connect_response,
CredentialsPolicy cred_policy) {
+ const auto& leader_addr = leader_addr_and_name.first;
+ const auto& leader_hostname = leader_addr_and_name.second;
+
// Ensure that all of the CAs reported by the master are trusted
// in our local TLS configuration.
if (status.ok()) {
@@ -680,9 +685,8 @@ void KuduClient::Data::ConnectedToClusterCb(
}
if (status.ok()) {
- leader_master_hostport_ = HostPort(leader_addr);
- // TODO(KUDU-2032): retain the original hostname passed by caller.
- master_proxy_.reset(new MasterServiceProxy(messenger_, leader_addr, leader_addr.host()));
+ leader_master_hostport_ = HostPort(leader_hostname, leader_addr.port());
+ master_proxy_.reset(new MasterServiceProxy(messenger_, leader_addr, leader_hostname));
}
}
@@ -705,12 +709,15 @@ void KuduClient::Data::ConnectToClusterAsync(KuduClient* client,
CredentialsPolicy creds_policy) {
DCHECK(deadline.Initialized());
- vector<Sockaddr> master_sockaddrs;
+ vector<pair<Sockaddr, string>> master_addrs_with_names;
for (const string& master_server_addr : master_server_addrs_) {
vector<Sockaddr> addrs;
- Status s;
- // TODO: Do address resolution asynchronously as well.
- s = ParseAddressList(master_server_addr, master::Master::kDefaultPort, &addrs);
+ HostPort hp;
+ Status s = hp.ParseString(master_server_addr, master::Master::kDefaultPort);
+ if (s.ok()) {
+ // TODO(todd): Do address resolution asynchronously as well.
+ s = hp.ResolveAddresses(&addrs);
+ }
if (!s.ok()) {
cb.Run(s);
return;
@@ -725,7 +732,7 @@ void KuduClient::Data::ConnectToClusterAsync(KuduClient* client,
<< "Specified master server address '" << master_server_addr << "' "
<< "resolved to multiple IPs. Using " << addrs[0].ToString();
}
- master_sockaddrs.push_back(addrs[0]);
+ master_addrs_with_names.emplace_back(addrs[0], hp.host());
}
// This ensures that no more than one ConnectToClusterRpc of each credentials
@@ -772,7 +779,7 @@ void KuduClient::Data::ConnectToClusterAsync(KuduClient* client,
std::placeholders::_2,
std::placeholders::_3,
creds_policy),
- std::move(master_sockaddrs),
+ std::move(master_addrs_with_names),
deadline,
client->default_rpc_timeout(),
messenger_,
http://git-wip-us.apache.org/repos/asf/kudu/blob/9d12910a/src/kudu/client/client-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.h b/src/kudu/client/client-internal.h
index d19d579..44da7b9 100644
--- a/src/kudu/client/client-internal.h
+++ b/src/kudu/client/client-internal.h
@@ -25,6 +25,7 @@
#include <set>
#include <string>
#include <unordered_set>
+#include <utility>
#include <vector>
#include "kudu/client/client.h"
@@ -155,7 +156,7 @@ class KuduClient::Data {
//
// See also: ConnectToClusterAsync.
void ConnectedToClusterCb(const Status& status,
- const Sockaddr& leader_addr,
+ const std::pair<Sockaddr, std::string>& leader_addr_and_name,
const master::ConnectToMasterResponsePB& connect_response,
rpc::CredentialsPolicy cred_policy);
http://git-wip-us.apache.org/repos/asf/kudu/blob/9d12910a/src/kudu/client/master_rpc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/master_rpc.cc b/src/kudu/client/master_rpc.cc
index cb2fc2c..6828394 100644
--- a/src/kudu/client/master_rpc.cc
+++ b/src/kudu/client/master_rpc.cc
@@ -22,6 +22,7 @@
#include <algorithm>
#include <mutex>
#include <ostream>
+#include <utility>
#include <boost/bind.hpp>
#include <glog/logging.h>
@@ -40,6 +41,7 @@
#include "kudu/util/status.h"
#include "kudu/util/status_callback.h"
+using std::pair;
using std::shared_ptr;
using std::string;
using std::vector;
@@ -54,6 +56,7 @@ using kudu::master::MasterServiceProxy;
using kudu::rpc::CredentialsPolicy;
using kudu::rpc::Messenger;
using kudu::rpc::Rpc;
+using strings::Substitute;
namespace kudu {
namespace client {
@@ -74,7 +77,7 @@ class ConnectToMasterRpc : public rpc::Rpc {
//
// Invokes 'user_cb' upon failure or success of the RPC call.
ConnectToMasterRpc(StatusCallback user_cb,
- const Sockaddr& addr,
+ pair<Sockaddr, string> addr_with_name,
const MonoTime& deadline,
std::shared_ptr<rpc::Messenger> messenger,
CredentialsPolicy creds_policy,
@@ -90,7 +93,9 @@ class ConnectToMasterRpc : public rpc::Rpc {
virtual void SendRpcCb(const Status& status) OVERRIDE;
const StatusCallback user_cb_;
- const Sockaddr addr_;
+
+ // The resolved address to try to connect to, along with its original specified hostname.
+ const pair<Sockaddr, string> addr_with_name_;
// Owned by the caller of this RPC, not this instance.
ConnectToMasterResponsePB* out_;
@@ -105,14 +110,14 @@ class ConnectToMasterRpc : public rpc::Rpc {
ConnectToMasterRpc::ConnectToMasterRpc(StatusCallback user_cb,
- const Sockaddr& addr,
+ pair<Sockaddr, string> addr_with_name,
const MonoTime& deadline,
shared_ptr<Messenger> messenger,
rpc::CredentialsPolicy creds_policy,
ConnectToMasterResponsePB* out)
: Rpc(deadline, std::move(messenger)),
user_cb_(std::move(user_cb)),
- addr_(addr),
+ addr_with_name_(std::move(addr_with_name)),
out_(DCHECK_NOTNULL(out)) {
mutable_retrier()->mutable_controller()->set_credentials_policy(creds_policy);
}
@@ -121,8 +126,7 @@ ConnectToMasterRpc::~ConnectToMasterRpc() {
}
void ConnectToMasterRpc::SendRpc() {
- // TODO(KUDU-2032): retain the hostname for addr_
- MasterServiceProxy proxy(retrier().messenger(), addr_, addr_.host());
+ MasterServiceProxy proxy(retrier().messenger(), addr_with_name_.first, addr_with_name_.second);
rpc::RpcController* controller = mutable_retrier()->mutable_controller();
// TODO(todd): should this be setting an RPC call deadline based on 'deadline'?
// it doesn't seem to be.
@@ -143,8 +147,9 @@ void ConnectToMasterRpc::SendRpc() {
}
string ConnectToMasterRpc::ToString() const {
- return strings::Substitute("ConnectToMasterRpc(address: $0, num_attempts: $1)",
- addr_.ToString(), num_attempts());
+ return strings::Substitute("ConnectToMasterRpc(address: $0:$1, num_attempts: $2)",
+ addr_with_name_.second, addr_with_name_.first.port(),
+ num_attempts());
}
void ConnectToMasterRpc::SendRpcCb(const Status& status) {
@@ -204,21 +209,21 @@ void ConnectToMasterRpc::SendRpcCb(const Status& status) {
////////////////////////////////////////////////////////////
ConnectToClusterRpc::ConnectToClusterRpc(LeaderCallback user_cb,
- vector<Sockaddr> addrs,
+ vector<pair<Sockaddr, string>> addrs_with_names,
MonoTime deadline,
MonoDelta rpc_timeout,
shared_ptr<Messenger> messenger,
rpc::CredentialsPolicy creds_policy)
: Rpc(deadline, std::move(messenger)),
user_cb_(std::move(user_cb)),
- addrs_(std::move(addrs)),
+ addrs_with_names_(std::move(addrs_with_names)),
rpc_timeout_(rpc_timeout),
pending_responses_(0),
completed_(false) {
DCHECK(deadline.Initialized());
// Using resize instead of reserve to explicitly initialized the values.
- responses_.resize(addrs_.size());
+ responses_.resize(addrs_with_names_.size());
mutable_retrier()->mutable_controller()->set_credentials_policy(creds_policy);
}
@@ -226,12 +231,13 @@ ConnectToClusterRpc::~ConnectToClusterRpc() {
}
string ConnectToClusterRpc::ToString() const {
- vector<string> sockaddr_str;
- for (const Sockaddr& addr : addrs_) {
- sockaddr_str.push_back(addr.ToString());
+ vector<string> addrs_str;
+ for (const auto& addr_with_name : addrs_with_names_) {
+ addrs_str.emplace_back(Substitute(
+ "$0:$1", addr_with_name.second, addr_with_name.first.port()));
}
return strings::Substitute("ConnectToClusterRpc(addrs: $0, num_attempts: $1)",
- JoinStrings(sockaddr_str, ","),
+ JoinStrings(addrs_str, ","),
num_attempts());
}
@@ -241,10 +247,10 @@ void ConnectToClusterRpc::SendRpc() {
const MonoTime actual_deadline = std::min(retrier().deadline(), rpc_deadline);
std::lock_guard<simple_spinlock> l(lock_);
- for (int i = 0; i < addrs_.size(); i++) {
+ for (int i = 0; i < addrs_with_names_.size(); i++) {
ConnectToMasterRpc* rpc = new ConnectToMasterRpc(
Bind(&ConnectToClusterRpc::SingleNodeCallback, this, i),
- addrs_[i],
+ addrs_with_names_[i],
actual_deadline,
retrier().messenger(),
retrier().controller().credentials_policy(),
@@ -284,7 +290,7 @@ void ConnectToClusterRpc::SendRpcCb(const Status& status) {
// We are not retrying.
undo_completed.cancel();
if (leader_idx_ != -1) {
- user_cb_(status, addrs_[leader_idx_], responses_[leader_idx_]);
+ user_cb_(status, addrs_with_names_[leader_idx_], responses_[leader_idx_]);
} else {
user_cb_(status, {}, {});
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/9d12910a/src/kudu/client/master_rpc.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/master_rpc.h b/src/kudu/client/master_rpc.h
index 39cec99..c951aa0 100644
--- a/src/kudu/client/master_rpc.h
+++ b/src/kudu/client/master_rpc.h
@@ -20,8 +20,9 @@
#include <functional>
#include <memory>
-#include <vector>
#include <string>
+#include <utility>
+#include <vector>
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
@@ -70,7 +71,7 @@ class ConnectToClusterRpc : public rpc::Rpc,
public:
typedef std::function<void(
const Status& status,
- const Sockaddr& leader_master,
+ const std::pair<Sockaddr, std::string> leader_master,
const master::ConnectToMasterResponsePB& connect_response)> LeaderCallback;
// The host and port of the leader master server is stored in
// 'leader_master', which must remain valid for the lifetime of this
@@ -80,7 +81,7 @@ class ConnectToClusterRpc : public rpc::Rpc,
// until 'deadline' passes. Each RPC has 'rpc_timeout' time to complete
// before it times out and may be retried if 'deadline' has not yet passed.
ConnectToClusterRpc(LeaderCallback user_cb,
- std::vector<Sockaddr> addrs,
+ std::vector<std::pair<Sockaddr, std::string>> addrs_with_names,
MonoTime deadline,
MonoDelta rpc_timeout,
std::shared_ptr<rpc::Messenger> messenger,
@@ -106,8 +107,8 @@ class ConnectToClusterRpc : public rpc::Rpc,
const LeaderCallback user_cb_;
- // The addresses of the masters.
- const std::vector<Sockaddr> addrs_;
+ // The addresses of the masters, along with their original specified names.
+ const std::vector<std::pair<Sockaddr, std::string>> addrs_with_names_;
// The amount of time alloted to each GetMasterRegistration RPC.
const MonoDelta rpc_timeout_;
http://git-wip-us.apache.org/repos/asf/kudu/blob/9d12910a/src/kudu/integration-tests/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster.cc b/src/kudu/integration-tests/external_mini_cluster.cc
index fd1b8cc..2ffe0e0 100644
--- a/src/kudu/integration-tests/external_mini_cluster.cc
+++ b/src/kudu/integration-tests/external_mini_cluster.cc
@@ -25,6 +25,7 @@
#include <memory>
#include <string>
#include <unordered_set>
+#include <utility>
#include <gflags/gflags.h>
#include <gtest/gtest.h>
@@ -79,6 +80,7 @@ using kudu::tserver::ListTabletsRequestPB;
using kudu::tserver::ListTabletsResponsePB;
using kudu::tserver::TabletServerServiceProxy;
using rapidjson::Value;
+using std::pair;
using std::string;
using std::unique_ptr;
using std::unordered_set;
@@ -523,23 +525,23 @@ Status ExternalMiniCluster::WaitForTabletsRunning(ExternalTabletServer* ts,
Status ExternalMiniCluster::GetLeaderMasterIndex(int* idx) {
scoped_refptr<ConnectToClusterRpc> rpc;
Synchronizer sync;
- vector<Sockaddr> addrs;
+ vector<pair<Sockaddr, string>> addrs_with_names;
Sockaddr leader_master_addr;
MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
for (const scoped_refptr<ExternalMaster>& master : masters_) {
- addrs.push_back(master->bound_rpc_addr());
+ addrs_with_names.emplace_back(master->bound_rpc_addr(), master->bound_rpc_addr().host());
}
const auto& cb = [&](const Status& status,
- const Sockaddr& leader_master,
+ const pair<Sockaddr, string>& leader_master,
const master::ConnectToMasterResponsePB& resp) {
if (status.ok()) {
- leader_master_addr = leader_master;
+ leader_master_addr = leader_master.first;
}
sync.StatusCB(status);
};
rpc.reset(new ConnectToClusterRpc(cb,
- std::move(addrs),
+ std::move(addrs_with_names),
deadline,
MonoDelta::FromSeconds(5),
messenger_));