You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2019/06/03 04:41:45 UTC
[kudu] branch master updated: KUDU-2791: TTL cache in DNS resolver
(part 2)
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 48467cc KUDU-2791: TTL cache in DNS resolver (part 2)
48467cc is described below
commit 48467ccf4c7a6135453708e3ea5123738fef19b3
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Wed May 29 23:58:20 2019 -0700
KUDU-2791: TTL cache in DNS resolver (part 2)
This changelist adds necessary plumbing in various places where
it's beneficial to use caching DnsResolver instead of
HostPort::ResolveAddresses().
Change-Id: Ib22e87d28573fdb93ba18cd6d99d8bde7524a4fc
Reviewed-on: http://gerrit.cloudera.org:8080/13469
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
src/kudu/client/client-internal.cc | 7 +--
src/kudu/consensus/consensus_peers.cc | 38 ++++++++------
src/kudu/consensus/consensus_peers.h | 14 +++--
src/kudu/integration-tests/raft_consensus-itest.cc | 5 +-
src/kudu/kserver/kserver.cc | 4 +-
src/kudu/master/master_service.cc | 1 +
src/kudu/master/sys_catalog.cc | 11 ++--
src/kudu/master/ts_descriptor-test.cc | 14 ++---
src/kudu/master/ts_descriptor.cc | 13 +++--
src/kudu/master/ts_descriptor.h | 6 ++-
src/kudu/master/ts_manager.cc | 6 ++-
src/kudu/master/ts_manager.h | 2 +
src/kudu/server/server_base.cc | 8 +++
src/kudu/server/server_base.h | 11 ++--
src/kudu/tablet/tablet_replica-test.cc | 61 ++++++++++++----------
src/kudu/tablet/tablet_replica.cc | 5 +-
src/kudu/tablet/tablet_replica.h | 4 +-
src/kudu/tserver/heartbeater.cc | 45 +++++++---------
.../tserver/tablet_copy_source_session-test.cc | 10 ++--
src/kudu/tserver/tablet_server.cc | 26 +++------
src/kudu/tserver/ts_tablet_manager.cc | 3 +-
src/kudu/util/net/net_util.cc | 2 +-
22 files changed, 169 insertions(+), 127 deletions(-)
diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index 842ae8f..09e42bb 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -415,8 +415,9 @@ Status KuduClient::Data::InitLocalHostNames() {
}
vector<Sockaddr> addresses;
- RETURN_NOT_OK_PREPEND(HostPort(hostname, 0).ResolveAddresses(&addresses),
- Substitute("Could not resolve local host name '$0'", hostname));
+ RETURN_NOT_OK_PREPEND(dns_resolver_->ResolveAddresses(HostPort(hostname, 0),
+ &addresses),
+ Substitute("Could not resolve local host name '$0'", hostname));
for (const Sockaddr& addr : addresses) {
// Similar to above, ignore local or wildcard addresses.
@@ -592,7 +593,7 @@ void KuduClient::Data::ConnectToClusterAsync(KuduClient* client,
Status s = hp.ParseString(master_server_addr, master::Master::kDefaultPort);
if (s.ok()) {
// TODO(todd): Do address resolution asynchronously as well.
- s = hp.ResolveAddresses(&addrs);
+ s = dns_resolver_->ResolveAddresses(hp, &addrs);
}
if (!s.ok()) {
cb.Run(s);
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index e3a30be..e6fd67a 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -50,6 +50,7 @@
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/net/dns_resolver.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/pb_util.h"
@@ -536,15 +537,17 @@ string RpcPeerProxy::PeerName() const {
namespace {
-Status CreateConsensusServiceProxyForHost(const shared_ptr<Messenger>& messenger,
- const HostPort& hostport,
- gscoped_ptr<ConsensusServiceProxy>* new_proxy) {
+Status CreateConsensusServiceProxyForHost(
+ const HostPort& hostport,
+ const shared_ptr<Messenger>& messenger,
+ DnsResolver* dns_resolver,
+ gscoped_ptr<ConsensusServiceProxy>* new_proxy) {
vector<Sockaddr> addrs;
- RETURN_NOT_OK(hostport.ResolveAddresses(&addrs));
+ RETURN_NOT_OK(dns_resolver->ResolveAddresses(hostport, &addrs));
if (addrs.size() > 1) {
- LOG(WARNING)<< "Peer address '" << hostport.ToString() << "' "
- << "resolves to " << addrs.size() << " different addresses. Using "
- << addrs[0].ToString();
+ LOG(WARNING) << Substitute(
+ "Peer address '$0' resolves to $1 different addresses. "
+ "Using $2", hostport.ToString(), addrs.size(), addrs[0].ToString());
}
new_proxy->reset(new ConsensusServiceProxy(messenger, addrs[0], hostport.host()));
return Status::OK();
@@ -552,28 +555,33 @@ Status CreateConsensusServiceProxyForHost(const shared_ptr<Messenger>& messenger
} // anonymous namespace
-RpcPeerProxyFactory::RpcPeerProxyFactory(shared_ptr<Messenger> messenger)
- : messenger_(std::move(messenger)) {}
+RpcPeerProxyFactory::RpcPeerProxyFactory(shared_ptr<Messenger> messenger,
+ DnsResolver* dns_resolver)
+ : messenger_(std::move(messenger)),
+ dns_resolver_(dns_resolver) {
+}
Status RpcPeerProxyFactory::NewProxy(const RaftPeerPB& peer_pb,
gscoped_ptr<PeerProxy>* proxy) {
gscoped_ptr<HostPort> hostport(new HostPort);
RETURN_NOT_OK(HostPortFromPB(peer_pb.last_known_addr(), hostport.get()));
gscoped_ptr<ConsensusServiceProxy> new_proxy;
- RETURN_NOT_OK(CreateConsensusServiceProxyForHost(messenger_, *hostport, &new_proxy));
+ RETURN_NOT_OK(CreateConsensusServiceProxyForHost(
+ *hostport, messenger_, dns_resolver_, &new_proxy));
proxy->reset(new RpcPeerProxy(std::move(hostport), std::move(new_proxy)));
return Status::OK();
}
-RpcPeerProxyFactory::~RpcPeerProxyFactory() {}
-
-Status SetPermanentUuidForRemotePeer(const shared_ptr<Messenger>& messenger,
- RaftPeerPB* remote_peer) {
+Status SetPermanentUuidForRemotePeer(
+ const shared_ptr<rpc::Messenger>& messenger,
+ DnsResolver* resolver,
+ RaftPeerPB* remote_peer) {
DCHECK(!remote_peer->has_permanent_uuid());
HostPort hostport;
RETURN_NOT_OK(HostPortFromPB(remote_peer->last_known_addr(), &hostport));
gscoped_ptr<ConsensusServiceProxy> proxy;
- RETURN_NOT_OK(CreateConsensusServiceProxyForHost(messenger, hostport, &proxy));
+ RETURN_NOT_OK(CreateConsensusServiceProxyForHost(
+ hostport, messenger, resolver, &proxy));
GetNodeInstanceRequestPB req;
GetNodeInstanceResponsePB resp;
rpc::RpcController controller;
diff --git a/src/kudu/consensus/consensus_peers.h b/src/kudu/consensus/consensus_peers.h
index 3bcdcb5..7379512 100644
--- a/src/kudu/consensus/consensus_peers.h
+++ b/src/kudu/consensus/consensus_peers.h
@@ -38,6 +38,7 @@
#include "kudu/util/status.h"
namespace kudu {
+class DnsResolver;
class ThreadPoolToken;
namespace rpc {
@@ -278,26 +279,29 @@ class RpcPeerProxy : public PeerProxy {
// PeerProxyFactory implementation that generates RPCPeerProxies
class RpcPeerProxyFactory : public PeerProxyFactory {
public:
- explicit RpcPeerProxyFactory(std::shared_ptr<rpc::Messenger> messenger);
+ RpcPeerProxyFactory(std::shared_ptr<rpc::Messenger> messenger,
+ DnsResolver* dns_resolver);
+ ~RpcPeerProxyFactory() = default;
Status NewProxy(const RaftPeerPB& peer_pb,
gscoped_ptr<PeerProxy>* proxy) override;
- ~RpcPeerProxyFactory();
-
const std::shared_ptr<rpc::Messenger>& messenger() const override {
return messenger_;
}
private:
std::shared_ptr<rpc::Messenger> messenger_;
+ DnsResolver* dns_resolver_;
};
// Query the consensus service at last known host/port that is
// specified in 'remote_peer' and set the 'permanent_uuid' field based
// on the response.
-Status SetPermanentUuidForRemotePeer(const std::shared_ptr<rpc::Messenger>& messenger,
- RaftPeerPB* remote_peer);
+Status SetPermanentUuidForRemotePeer(
+ const std::shared_ptr<rpc::Messenger>& messenger,
+ DnsResolver* resolver,
+ RaftPeerPB* remote_peer);
} // namespace consensus
} // namespace kudu
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 6593f2b..5068e9c 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -74,6 +74,7 @@
#include "kudu/util/env_util.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/net/dns_resolver.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.h"
@@ -654,7 +655,9 @@ TEST_F(RaftConsensusITest, TestGetPermanentUuid) {
std::shared_ptr<rpc::Messenger> messenger;
ASSERT_OK(builder.Build(&messenger));
- ASSERT_OK(consensus::SetPermanentUuidForRemotePeer(messenger, &peer));
+ auto resolver = std::make_shared<DnsResolver>();
+ ASSERT_OK(consensus::SetPermanentUuidForRemotePeer(
+ messenger, resolver.get(), &peer));
ASSERT_EQ(expected_uuid, peer.permanent_uuid());
}
diff --git a/src/kudu/kserver/kserver.cc b/src/kudu/kserver/kserver.cc
index 512a7fb..985e777 100644
--- a/src/kudu/kserver/kserver.cc
+++ b/src/kudu/kserver/kserver.cc
@@ -55,13 +55,11 @@ static bool ValidateThreadPoolThreadLimit(const char* /*flagname*/, int32_t valu
}
DEFINE_validator(server_thread_pool_max_thread_count, &ValidateThreadPoolThreadLimit);
+using kudu::server::ServerBaseOptions;
using std::string;
using strings::Substitute;
namespace kudu {
-
-using server::ServerBaseOptions;
-
namespace kserver {
METRIC_DEFINE_histogram(server, op_apply_queue_length, "Operation Apply Queue Length",
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index fadfae4..eaf5364 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -215,6 +215,7 @@ void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req,
}
Status s = server_->ts_manager()->RegisterTS(req->common().ts_instance(),
req->registration(),
+ server_->dns_resolver(),
&ts_desc);
if (!s.ok()) {
LOG(WARNING) << Substitute("Unable to register tserver ($0): $1",
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 4918f11..e90531f 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -304,10 +304,10 @@ Status SysCatalogTable::CreateDistributedConfig(const MasterOptions& options,
LOG(INFO) << SecureShortDebugString(peer)
<< " has no permanent_uuid. Determining permanent_uuid...";
RaftPeerPB new_peer = peer;
- RETURN_NOT_OK_PREPEND(consensus::SetPermanentUuidForRemotePeer(master_->messenger(),
- &new_peer),
- Substitute("Unable to resolve UUID for peer $0",
- SecureShortDebugString(peer)));
+ RETURN_NOT_OK_PREPEND(consensus::SetPermanentUuidForRemotePeer(
+ master_->messenger(), master_->dns_resolver(), &new_peer),
+ Substitute("Unable to resolve UUID for peer $0",
+ SecureShortDebugString(peer)));
resolved_config.add_peers()->CopyFrom(new_peer);
}
}
@@ -396,7 +396,8 @@ Status SysCatalogTable::SetupTablet(const scoped_refptr<tablet::TabletMetadata>&
master_->messenger(),
scoped_refptr<rpc::ResultTracker>(),
log,
- master_->tablet_prepare_pool()),
+ master_->tablet_prepare_pool(),
+ master_->dns_resolver()),
"Failed to Start() TabletReplica");
tablet_replica_->RegisterMaintenanceOps(master_->maintenance_manager());
diff --git a/src/kudu/master/ts_descriptor-test.cc b/src/kudu/master/ts_descriptor-test.cc
index b2caf50..dc8acc2 100644
--- a/src/kudu/master/ts_descriptor-test.cc
+++ b/src/kudu/master/ts_descriptor-test.cc
@@ -19,7 +19,6 @@
#include <memory>
#include <string>
-#include <vector>
#include <boost/optional/optional.hpp>
#include <glog/logging.h>
@@ -27,13 +26,12 @@
#include "kudu/common/common.pb.h"
#include "kudu/common/wire_protocol.pb.h"
-#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/net/dns_resolver.h"
#include "kudu/util/test_macros.h"
using std::shared_ptr;
using std::string;
-using std::vector;
-using strings::Substitute;
+using std::unique_ptr;
namespace kudu {
namespace master {
@@ -69,8 +67,10 @@ TEST(TSDescriptorTest, TestRegistration) {
NodeInstancePB instance;
ServerRegistrationPB registration;
SetupBasicRegistrationInfo(uuid, &instance, ®istration);
+ unique_ptr<DnsResolver> dns_resolver(new DnsResolver);
shared_ptr<TSDescriptor> desc;
- ASSERT_OK(TSDescriptor::RegisterNew(instance, registration, {}, &desc));
+ ASSERT_OK(TSDescriptor::RegisterNew(
+ instance, registration, {}, dns_resolver.get(), &desc));
// Spot check some fields and the ToString value.
ASSERT_EQ(uuid, desc->permanent_uuid());
@@ -87,8 +87,10 @@ TEST(TSDescriptorTest, TestLocationCmd) {
NodeInstancePB instance;
ServerRegistrationPB registration;
SetupBasicRegistrationInfo(uuid, &instance, ®istration);
+ unique_ptr<DnsResolver> dns_resolver(new DnsResolver);
shared_ptr<TSDescriptor> desc;
- ASSERT_OK(TSDescriptor::RegisterNew(instance, registration, location, &desc));
+ ASSERT_OK(TSDescriptor::RegisterNew(
+ instance, registration, location, dns_resolver.get(), &desc));
ASSERT_EQ(location, desc->location());
}
} // namespace master
diff --git a/src/kudu/master/ts_descriptor.cc b/src/kudu/master/ts_descriptor.cc
index 5912faa..4c1c0a1 100644
--- a/src/kudu/master/ts_descriptor.cc
+++ b/src/kudu/master/ts_descriptor.cc
@@ -35,6 +35,7 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tserver/tserver_admin.proxy.h"
#include "kudu/util/flag_tags.h"
+#include "kudu/util/net/dns_resolver.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/pb_util.h"
@@ -58,10 +59,12 @@ namespace master {
Status TSDescriptor::RegisterNew(const NodeInstancePB& instance,
const ServerRegistrationPB& registration,
const boost::optional<std::string>& location,
+ DnsResolver* dns_resolver,
shared_ptr<TSDescriptor>* desc) {
shared_ptr<TSDescriptor> ret(TSDescriptor::make_shared(instance.permanent_uuid()));
- RETURN_NOT_OK(ret->Register(instance, registration, location));
- desc->swap(ret);
+ RETURN_NOT_OK(ret->Register(
+ instance, registration, location, dns_resolver));
+ *desc = std::move(ret);
return Status::OK();
}
@@ -97,7 +100,8 @@ static bool HostPortPBsEqual(const google::protobuf::RepeatedPtrField<HostPortPB
Status TSDescriptor::Register(const NodeInstancePB& instance,
const ServerRegistrationPB& registration,
- const boost::optional<std::string>& location) {
+ const boost::optional<std::string>& location,
+ DnsResolver* dns_resolver) {
std::lock_guard<rw_spinlock> l(lock_);
CHECK_EQ(instance.permanent_uuid(), permanent_uuid_);
@@ -138,6 +142,7 @@ Status TSDescriptor::Register(const NodeInstancePB& instance,
registration_.reset(new ServerRegistrationPB(registration));
ts_admin_proxy_.reset();
consensus_proxy_.reset();
+ dns_resolver_ = dns_resolver;
location_ = location;
return Status::OK();
}
@@ -217,7 +222,7 @@ Status TSDescriptor::ResolveSockaddr(Sockaddr* addr, string* host) const {
HostPort last_hostport;
vector<Sockaddr> addrs;
for (const HostPort& hostport : hostports) {
- RETURN_NOT_OK(hostport.ResolveAddresses(&addrs));
+ RETURN_NOT_OK(dns_resolver_->ResolveAddresses(hostport, &addrs));
if (!addrs.empty()) {
last_hostport = hostport;
break;
diff --git a/src/kudu/master/ts_descriptor.h b/src/kudu/master/ts_descriptor.h
index ab58d33..0b64019 100644
--- a/src/kudu/master/ts_descriptor.h
+++ b/src/kudu/master/ts_descriptor.h
@@ -37,6 +37,7 @@
namespace kudu {
+class DnsResolver;
class Sockaddr;
namespace consensus {
@@ -62,6 +63,7 @@ class TSDescriptor : public enable_make_shared<TSDescriptor> {
static Status RegisterNew(const NodeInstancePB& instance,
const ServerRegistrationPB& registration,
const boost::optional<std::string>& location,
+ DnsResolver* dns_resolver,
std::shared_ptr<TSDescriptor>* desc);
virtual ~TSDescriptor() = default;
@@ -79,7 +81,8 @@ class TSDescriptor : public enable_make_shared<TSDescriptor> {
// Register this tablet server.
Status Register(const NodeInstancePB& instance,
const ServerRegistrationPB& registration,
- const boost::optional<std::string>& location);
+ const boost::optional<std::string>& location,
+ DnsResolver* dns_resolver);
const std::string &permanent_uuid() const { return permanent_uuid_; }
int64_t latest_seqno() const;
@@ -171,6 +174,7 @@ class TSDescriptor : public enable_make_shared<TSDescriptor> {
std::shared_ptr<tserver::TabletServerAdminServiceProxy> ts_admin_proxy_;
std::shared_ptr<consensus::ConsensusServiceProxy> consensus_proxy_;
+ DnsResolver* dns_resolver_;
DISALLOW_COPY_AND_ASSIGN(TSDescriptor);
};
diff --git a/src/kudu/master/ts_manager.cc b/src/kudu/master/ts_manager.cc
index 92a9b09..de4a1ac 100644
--- a/src/kudu/master/ts_manager.cc
+++ b/src/kudu/master/ts_manager.cc
@@ -102,6 +102,7 @@ bool TSManager::LookupTSByUUID(const string& uuid,
Status TSManager::RegisterTS(const NodeInstancePB& instance,
const ServerRegistrationPB& registration,
+ DnsResolver* dns_resolver,
std::shared_ptr<TSDescriptor>* desc) {
// Pre-condition: registration info should contain at least one RPC end-point.
if (registration.rpc_addresses().empty()) {
@@ -148,10 +149,11 @@ Status TSManager::RegisterTS(const NodeInstancePB& instance,
auto* descriptor_ptr = FindOrNull(servers_by_id_, uuid);
if (descriptor_ptr) {
descriptor = *descriptor_ptr;
- RETURN_NOT_OK(descriptor->Register(instance, registration, location));
+ RETURN_NOT_OK(descriptor->Register(
+ instance, registration, location, dns_resolver));
} else {
RETURN_NOT_OK(TSDescriptor::RegisterNew(
- instance, registration, location, &descriptor));
+ instance, registration, location, dns_resolver, &descriptor));
InsertOrDie(&servers_by_id_, uuid, descriptor);
new_tserver = true;
}
diff --git a/src/kudu/master/ts_manager.h b/src/kudu/master/ts_manager.h
index bf52a9c..359432a 100644
--- a/src/kudu/master/ts_manager.h
+++ b/src/kudu/master/ts_manager.h
@@ -30,6 +30,7 @@
namespace kudu {
+class DnsResolver;
class NodeInstancePB;
class ServerRegistrationPB;
@@ -74,6 +75,7 @@ class TSManager {
// If successful, *desc reset to the registered descriptor.
Status RegisterTS(const NodeInstancePB& instance,
const ServerRegistrationPB& registration,
+ DnsResolver* dns_resolver,
std::shared_ptr<TSDescriptor>* desc);
// Return all of the currently registered TS descriptors into the provided
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 8894373..7a57f33 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -74,6 +74,7 @@
#include "kudu/util/metrics.h"
#include "kudu/util/minidump.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/net/dns_resolver.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/pb_util.h"
@@ -206,6 +207,9 @@ DEFINE_int32(rpc_default_keepalive_time_ms, 65000,
TAG_FLAG(rpc_default_keepalive_time_ms, advanced);
DECLARE_bool(use_hybrid_clock);
+DECLARE_int32(dns_resolver_max_threads_num);
+DECLARE_uint32(dns_resolver_cache_capacity_mb);
+DECLARE_uint32(dns_resolver_cache_ttl_sec);
using kudu::security::RpcAuthentication;
using kudu::security::RpcEncryption;
@@ -355,6 +359,10 @@ ServerBase::ServerBase(string name, const ServerBaseOptions& options,
result_tracker_(new rpc::ResultTracker(shared_ptr<MemTracker>(
MemTracker::CreateTracker(-1, "result-tracker", mem_tracker_)))),
is_first_run_(false),
+ dns_resolver_(new DnsResolver(
+ FLAGS_dns_resolver_max_threads_num,
+ FLAGS_dns_resolver_cache_capacity_mb * 1024 * 1024,
+ MonoDelta::FromSeconds(FLAGS_dns_resolver_cache_ttl_sec))),
options_(options),
stop_background_threads_latch_(1) {
FsManagerOpts fs_opts;
diff --git a/src/kudu/server/server_base.h b/src/kudu/server/server_base.h
index 203dd99..a2aee01 100644
--- a/src/kudu/server/server_base.h
+++ b/src/kudu/server/server_base.h
@@ -14,8 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-#ifndef KUDU_SERVER_SERVER_BASE_H
-#define KUDU_SERVER_SERVER_BASE_H
+#pragma once
#include <cstdint>
#include <memory>
@@ -32,6 +31,7 @@
namespace kudu {
+class DnsResolver;
class FsManager;
class MemTracker;
class MetricEntity;
@@ -104,6 +104,8 @@ class ServerBase {
// Returns this server's clock.
clock::Clock* clock() { return clock_.get(); }
+ DnsResolver* dns_resolver() { return dns_resolver_.get(); }
+
// Return a PB describing the status of the server (version info, bound ports, etc)
Status GetStatusPB(ServerStatusPB* status) const;
@@ -193,6 +195,7 @@ class ServerBase {
// The ACL of users who may act as part of the Kudu service.
security::SimpleAcl service_acl_;
+
private:
Status InitAcls();
void GenerateInstanceID();
@@ -209,6 +212,9 @@ class ServerBase {
Status StartExcessLogFileDeleterThread();
void ExcessLogFileDeleterThread();
+ // Utility object for DNS name resolutions.
+ std::unique_ptr<DnsResolver> dns_resolver_;
+
ServerBaseOptions options_;
std::unique_ptr<DiagnosticsLog> diag_log_;
@@ -222,4 +228,3 @@ class ServerBase {
} // namespace server
} // namespace kudu
-#endif /* KUDU_SERVER_SERVER_BASE_H */
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index a9f7382..9cf08d7 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -71,6 +71,7 @@
#include "kudu/util/maintenance_manager.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/net/dns_resolver.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
@@ -81,35 +82,35 @@ METRIC_DECLARE_entity(tablet);
DECLARE_int32(flush_threshold_mb);
+using kudu::consensus::CommitMsg;
+using kudu::consensus::ConsensusBootstrapInfo;
+using kudu::consensus::ConsensusMetadata;
+using kudu::consensus::ConsensusMetadataManager;
+using kudu::consensus::OpId;
+using kudu::consensus::RECEIVED_OPID;
+using kudu::consensus::RaftConfigPB;
+using kudu::consensus::RaftConsensus;
+using kudu::consensus::RaftPeerPB;
+using kudu::log::Log;
+using kudu::log::LogOptions;
+using kudu::pb_util::SecureDebugString;
+using kudu::pb_util::SecureShortDebugString;
+using kudu::rpc::Messenger;
+using kudu::rpc::ResultTracker;
+using kudu::tserver::AlterSchemaRequestPB;
+using kudu::tserver::AlterSchemaResponsePB;
+using kudu::tserver::WriteRequestPB;
+using kudu::tserver::WriteResponsePB;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+
namespace kudu {
class MemTracker;
namespace tablet {
-using consensus::CommitMsg;
-using consensus::ConsensusBootstrapInfo;
-using consensus::ConsensusMetadata;
-using consensus::ConsensusMetadataManager;
-using consensus::OpId;
-using consensus::RECEIVED_OPID;
-using consensus::RaftConfigPB;
-using consensus::RaftConsensus;
-using consensus::RaftPeerPB;
-using log::Log;
-using log::LogOptions;
-using pb_util::SecureDebugString;
-using pb_util::SecureShortDebugString;
-using rpc::Messenger;
-using rpc::ResultTracker;
-using std::shared_ptr;
-using std::string;
-using std::unique_ptr;
-using tserver::AlterSchemaRequestPB;
-using tserver::AlterSchemaResponsePB;
-using tserver::WriteRequestPB;
-using tserver::WriteResponsePB;
-
static Schema GetTestSchema() {
return Schema({ ColumnSchema("key", INT32) }, 1);
}
@@ -117,9 +118,10 @@ static Schema GetTestSchema() {
class TabletReplicaTest : public KuduTabletTest {
public:
TabletReplicaTest()
- : KuduTabletTest(GetTestSchema()),
- insert_counter_(0),
- delete_counter_(0) {
+ : KuduTabletTest(GetTestSchema()),
+ insert_counter_(0),
+ delete_counter_(0),
+ dns_resolver_(new DnsResolver) {
}
void SetUpReplica(bool new_replica = true) {
@@ -184,7 +186,8 @@ class TabletReplicaTest : public KuduTabletTest {
messenger_,
scoped_refptr<ResultTracker>(),
log,
- prepare_pool_.get());
+ prepare_pool_.get(),
+ dns_resolver_.get());
}
Status StartReplicaAndWaitUntilLeader(const ConsensusBootstrapInfo& info) {
@@ -232,7 +235,8 @@ class TabletReplicaTest : public KuduTabletTest {
messenger_,
scoped_refptr<ResultTracker>(),
log,
- prepare_pool_.get()));
+ prepare_pool_.get(),
+ dns_resolver_.get()));
// Wait for the replica to be usable.
const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
ASSERT_OK(tablet_replica_->consensus()->WaitUntilLeaderForTests(kTimeout));
@@ -383,6 +387,7 @@ class TabletReplicaTest : public KuduTabletTest {
gscoped_ptr<ThreadPool> prepare_pool_;
gscoped_ptr<ThreadPool> apply_pool_;
gscoped_ptr<ThreadPool> raft_pool_;
+ unique_ptr<DnsResolver> dns_resolver_;
scoped_refptr<ConsensusMetadataManager> cmeta_manager_;
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index a284215..f9c4000 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -164,7 +164,8 @@ Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info,
shared_ptr<Messenger> messenger,
scoped_refptr<ResultTracker> result_tracker,
scoped_refptr<Log> log,
- ThreadPool* prepare_pool) {
+ ThreadPool* prepare_pool,
+ DnsResolver* resolver) {
DCHECK(tablet) << "A TabletReplica must be provided with a Tablet";
DCHECK(log) << "A TabletReplica must be provided with a Log";
@@ -210,7 +211,7 @@ Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info,
VLOG(2) << "T " << tablet_id() << " P " << consensus_->peer_uuid() << ": Peer starting";
VLOG(2) << "RaftConfig before starting: " << SecureDebugString(consensus_->CommittedConfig());
- peer_proxy_factory.reset(new RpcPeerProxyFactory(messenger_));
+ peer_proxy_factory.reset(new RpcPeerProxyFactory(messenger_, resolver));
time_manager.reset(new TimeManager(clock_, tablet_->mvcc_manager()->GetCleanTimestamp()));
}
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index 50fab2d..1bcffa7 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -47,6 +47,7 @@
#include "kudu/util/status.h"
namespace kudu {
+class DnsResolver;
class MaintenanceManager;
class MaintenanceOp;
class MonoDelta;
@@ -108,7 +109,8 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
std::shared_ptr<rpc::Messenger> messenger,
scoped_refptr<rpc::ResultTracker> result_tracker,
scoped_refptr<log::Log> log,
- ThreadPool* prepare_pool);
+ ThreadPool* prepare_pool,
+ DnsResolver* resolver);
// Synchronously transition this replica to STOPPED state from any other
// state. This also stops RaftConsensus. If a Stop() operation is already in
diff --git a/src/kudu/tserver/heartbeater.cc b/src/kudu/tserver/heartbeater.cc
index 62856fd..b21a002 100644
--- a/src/kudu/tserver/heartbeater.cc
+++ b/src/kudu/tserver/heartbeater.cc
@@ -59,6 +59,7 @@
#include "kudu/util/logging.h"
#include "kudu/util/monotime.h"
#include "kudu/util/mutex.h"
+#include "kudu/util/net/dns_resolver.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/pb_util.h"
@@ -116,31 +117,8 @@ using strings::Substitute;
namespace kudu {
-namespace rpc {
-class Messenger;
-}
-
namespace tserver {
-namespace {
-
-// Creates a proxy to 'hostport'.
-Status MasterServiceProxyForHostPort(const HostPort& hostport,
- const shared_ptr<rpc::Messenger>& messenger,
- gscoped_ptr<MasterServiceProxy>* proxy) {
- vector<Sockaddr> addrs;
- RETURN_NOT_OK(hostport.ResolveAddresses(&addrs));
- if (addrs.size() > 1) {
- LOG(WARNING) << "Master address '" << hostport.ToString() << "' "
- << "resolves to " << addrs.size() << " different addresses. Using "
- << addrs[0].ToString();
- }
- proxy->reset(new MasterServiceProxy(messenger, addrs[0], hostport.host()));
- return Status::OK();
-}
-
-} // anonymous namespace
-
// Most of the actual logic of the heartbeater is inside this inner class,
// to avoid having too many dependencies from the header itself.
//
@@ -170,6 +148,8 @@ class Heartbeater::Thread {
Status SetupRegistration(ServerRegistrationPB* reg);
void SetupCommonField(master::TSToMasterCommonPB* common);
bool IsCurrentThread() const;
+ // Creates a proxy to 'hostport'.
+ Status MasterServiceProxyForHostPort(gscoped_ptr<MasterServiceProxy>* proxy);
// The host and port of the master that this thread will heartbeat to.
//
@@ -333,8 +313,7 @@ Heartbeater::Thread::Thread(HostPort master_address, TabletServer* server)
Status Heartbeater::Thread::ConnectToMaster() {
gscoped_ptr<MasterServiceProxy> new_proxy;
- RETURN_NOT_OK(MasterServiceProxyForHostPort(master_address_, server_->messenger(), &new_proxy));
-
+ RETURN_NOT_OK(MasterServiceProxyForHostPort(&new_proxy));
// Ping the master to verify that it's alive.
master::PingRequestPB req;
master::PingResponsePB resp;
@@ -727,5 +706,21 @@ void Heartbeater::Thread::GenerateFullTabletReport(TabletReportPB* report) {
server_->tablet_manager()->PopulateFullTabletReport(report);
}
+Status Heartbeater::Thread::MasterServiceProxyForHostPort(
+ gscoped_ptr<MasterServiceProxy>* proxy) {
+ vector<Sockaddr> addrs;
+ RETURN_NOT_OK(server_->dns_resolver()->ResolveAddresses(master_address_,
+ &addrs));
+ CHECK(!addrs.empty());
+ if (addrs.size() > 1) {
+ LOG(WARNING) << Substitute(
+ "Master address '$0' resolves to $1 different addresses. Using $2",
+ master_address_.ToString(), addrs.size(), addrs[0].ToString());
+ }
+ proxy->reset(new MasterServiceProxy(
+ server_->messenger(), addrs[0], master_address_.host()));
+ return Status::OK();
+}
+
} // namespace tserver
} // namespace kudu
diff --git a/src/kudu/tserver/tablet_copy_source_session-test.cc b/src/kudu/tserver/tablet_copy_source_session-test.cc
index 0633312..fa18b0a 100644
--- a/src/kudu/tserver/tablet_copy_source_session-test.cc
+++ b/src/kudu/tserver/tablet_copy_source_session-test.cc
@@ -66,6 +66,7 @@
#include "kudu/util/faststring.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/net/dns_resolver.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/slice.h"
@@ -109,8 +110,9 @@ using tablet::WriteTransactionState;
class TabletCopyTest : public KuduTabletTest {
public:
TabletCopyTest()
- : KuduTabletTest(Schema({ ColumnSchema("key", STRING),
- ColumnSchema("val", INT32) }, 1)) {
+ : KuduTabletTest(Schema({ ColumnSchema("key", STRING),
+ ColumnSchema("val", INT32) }, 1)),
+ dns_resolver_(new DnsResolver) {
CHECK_OK(ThreadPoolBuilder("prepare").Build(&prepare_pool_));
CHECK_OK(ThreadPoolBuilder("apply").Build(&apply_pool_));
CHECK_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
@@ -177,7 +179,8 @@ class TabletCopyTest : public KuduTabletTest {
messenger,
scoped_refptr<rpc::ResultTracker>(),
log,
- prepare_pool_.get()));
+ prepare_pool_.get(),
+ dns_resolver_.get()));
ASSERT_OK(tablet_replica_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10)));
ASSERT_OK(tablet_replica_->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
}
@@ -258,6 +261,7 @@ class TabletCopyTest : public KuduTabletTest {
gscoped_ptr<ThreadPool> prepare_pool_;
gscoped_ptr<ThreadPool> apply_pool_;
gscoped_ptr<ThreadPool> raft_pool_;
+ unique_ptr<DnsResolver> dns_resolver_;
scoped_refptr<TabletReplica> tablet_replica_;
scoped_refptr<TabletCopySourceSession> session_;
};
diff --git a/src/kudu/tserver/tablet_server.cc b/src/kudu/tserver/tablet_server.cc
index 5924c17..b7a5356 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -38,6 +38,7 @@
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/tserver/tserver_path_handlers.h"
#include "kudu/util/maintenance_manager.h"
+#include "kudu/util/net/dns_resolver.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/status.h"
@@ -47,21 +48,6 @@ using kudu::fs::ErrorHandlerType;
using kudu::rpc::ServiceIf;
namespace kudu {
-
-namespace {
-
-Status ValidateMasterAddressResolution(const UnorderedHostPortSet& master_addrs) {
- for (const HostPort& addr : master_addrs) {
- RETURN_NOT_OK_PREPEND(addr.ResolveAddresses(nullptr),
- strings::Substitute(
- "Couldn't resolve master service address '$0'",
- addr.ToString()));
- }
- return Status::OK();
-}
-
-} // anonymous namespace
-
namespace tserver {
TabletServer::TabletServer(const TabletServerOptions& opts)
@@ -105,15 +91,19 @@ Status TabletServer::Init() {
// We don't validate that we can connect at this point -- it should
// be allowed to start the TS and the master in whichever order --
// our heartbeat thread will loop until successfully connecting.
- RETURN_NOT_OK(ValidateMasterAddressResolution(master_addrs));
+ for (const auto& addr : master_addrs) {
+ RETURN_NOT_OK_PREPEND(dns_resolver()->ResolveAddresses(addr, nullptr),
+ strings::Substitute("couldn't resolve master service address '$0'",
+ addr.ToString()));
+ }
RETURN_NOT_OK(KuduServer::Init());
if (web_server_) {
RETURN_NOT_OK(path_handlers_->Register(web_server_.get()));
}
- maintenance_manager_.reset(new MaintenanceManager(
- MaintenanceManager::kDefaultOptions, fs_manager_->uuid()));
+ maintenance_manager_ = std::make_shared<MaintenanceManager>(
+ MaintenanceManager::kDefaultOptions, fs_manager_->uuid());
heartbeater_.reset(new Heartbeater(std::move(master_addrs), this));
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 2dc868e..9e83b50 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -1086,7 +1086,8 @@ void TSTabletManager::OpenTablet(const scoped_refptr<TabletReplica>& replica,
server_->messenger(),
server_->result_tracker(),
log,
- server_->tablet_prepare_pool());
+ server_->tablet_prepare_pool(),
+ server_->dns_resolver());
if (!s.ok()) {
LOG(ERROR) << LogPrefix(tablet_id) << "Tablet failed to start: "
<< s.ToString();
diff --git a/src/kudu/util/net/net_util.cc b/src/kudu/util/net/net_util.cc
index b6bb89a..ae35456 100644
--- a/src/kudu/util/net/net_util.cc
+++ b/src/kudu/util/net/net_util.cc
@@ -83,7 +83,7 @@ namespace {
using AddrInfo = unique_ptr<addrinfo, function<void(addrinfo*)>>;
-// An utility wrapper around getaddrinfo() call to convert the return code
+// A utility wrapper around getaddrinfo() call to convert the return code
// of the libc library function into Status.
Status GetAddrInfo(const string& hostname,
const addrinfo& hints,