You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ab...@apache.org on 2019/06/01 08:38:13 UTC

[kudu] 03/04: KUDU-2791: TTL cache in DNS resolver (part 1)

This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 1cf86dfd4b503958daeda94164c4730a697556aa
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Wed May 29 18:07:07 2019 -0700

    KUDU-2791: TTL cache in DNS resolver (part 1)
    
    Added TTL cache for the results of name resolution in DnsResolver.
    Updated corresponding tests as well.  A follow-up changelist will
    add necessary plumbing in various places where it's beneficial to
    use caching DnsResolver instead of HostPort::ResolveAddresses().
    
    This changelist also introduces runtime flags which map into parameters
    of the TTL cache used in DnsResolver. By default, the cache's capacity
    is 1 MiByte, and its records' TTL is 15 seconds.
    
    1 MiByte capacity is big enough to accommodate thousands of records.
    From the other side, it's low enough to avoid running an extra thread
    for scrubbing the cache of expired records, allowing the cache to purge
    expired records only when it is at capacity.
    
    Record's TTL of 15 seconds is half of the minimum recommended as stated
    by various 'best practices': 30 seconds for DNS A records
    (see [1] -- [4] below).  Kudu masters and tablet servers are not
    supposed to run behind a load balancer, so 15 seconds TTL for a cached
    DNS A record seems to be good enough in the context of running a Kudu
    cluster.
    
    [1] https://ns1.com/knowledgebase/ttl-best-practices
    [2] https://serverfault.com/questions/7478/recommended-dns-ttl
    [3] https://support.google.com/a/answer/48090?hl=en
    [4] https://ns1.com/blog/what-is-the-lowest-ttl-i-can-get-away-with
    
    Change-Id: Ia1bbd55a8231fd541d2087f9202f24e80bc79f0b
    Reviewed-on: http://gerrit.cloudera.org:8080/13266
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/client/client-internal.cc     |  19 ++++--
 src/kudu/client/client-internal.h      |   3 +-
 src/kudu/client/client.cc              |   3 -
 src/kudu/client/meta_cache.cc          |   6 +-
 src/kudu/util/net/dns_resolver-test.cc |  60 ++++++++++++++----
 src/kudu/util/net/dns_resolver.cc      | 112 ++++++++++++++++++++++++++++-----
 src/kudu/util/net/dns_resolver.h       |  74 +++++++++++++++++-----
 src/kudu/util/net/net_util.cc          |  16 ++---
 8 files changed, 233 insertions(+), 60 deletions(-)

diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index cde72f3..842ae8f 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -32,6 +32,7 @@
 
 #include <boost/bind.hpp> // IWYU pragma: keep
 #include <boost/function.hpp>
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 
 #include "kudu/client/authz_token_cache.h"
@@ -67,6 +68,10 @@
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/thread_restrictions.h"
 
+DECLARE_int32(dns_resolver_max_threads_num);
+DECLARE_uint32(dns_resolver_cache_capacity_mb);
+DECLARE_uint32(dns_resolver_cache_ttl_sec);
+
 using std::pair;
 using std::set;
 using std::shared_ptr;
@@ -152,8 +157,13 @@ Status RetryFunc(const MonoTime& deadline,
 }
 
 KuduClient::Data::Data()
-    : hive_metastore_sasl_enabled_(false),
-      latest_observed_timestamp_(KuduClient::kNoTimestamp) {}
+    : dns_resolver_(new DnsResolver(
+          FLAGS_dns_resolver_max_threads_num,
+          FLAGS_dns_resolver_cache_capacity_mb * 1024 * 1024,
+          MonoDelta::FromSeconds(FLAGS_dns_resolver_cache_ttl_sec))),
+      hive_metastore_sasl_enabled_(false),
+      latest_observed_timestamp_(KuduClient::kNoTimestamp) {
+}
 
 KuduClient::Data::~Data() {
   // Workaround for KUDU-956: the user may close a KuduClient while a flush
@@ -595,8 +605,9 @@ void KuduClient::Data::ConnectToClusterAsync(KuduClient* client,
     }
     if (addrs.size() > 1) {
       KLOG_EVERY_N_SECS(WARNING, 1)
-          << "Specified master server address '" << master_server_addr << "' "
-          << "resolved to multiple IPs. Using " << addrs[0].ToString();
+          << Substitute("Specified master server address '$0' resolved to "
+                        "multiple IPs. Using $1",
+                        master_server_addr, addrs[0].ToString());
     }
     master_addrs_with_names.emplace_back(addrs[0], hp.host());
   }
diff --git a/src/kudu/client/client-internal.h b/src/kudu/client/client-internal.h
index 1f492e2..2098bda 100644
--- a/src/kudu/client/client-internal.h
+++ b/src/kudu/client/client-internal.h
@@ -30,7 +30,6 @@
 
 #include "kudu/client/authz_token_cache.h"
 #include "kudu/client/client.h"
-#include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/master/master.pb.h"
@@ -242,8 +241,8 @@ class KuduClient::Data {
   // The request tracker for this client.
   scoped_refptr<rpc::RequestTracker> request_tracker_;
 
+  std::unique_ptr<DnsResolver> dns_resolver_;
   std::shared_ptr<rpc::Messenger> messenger_;
-  gscoped_ptr<DnsResolver> dns_resolver_;
   scoped_refptr<internal::MetaCache> meta_cache_;
 
   // Authorization tokens stored for each table, indexed by table ID. Note that
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 5078acb..041625a 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -94,14 +94,12 @@
 #include "kudu/util/logging.h"
 #include "kudu/util/logging_callback.h"
 #include "kudu/util/monotime.h"
-#include "kudu/util/net/dns_resolver.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/oid_generator.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/version_info.h"
 
 using kudu::master::AlterTableRequestPB;
-using kudu::master::AlterTableRequestPB_Step;
 using kudu::master::AlterTableResponsePB;
 using kudu::master::CreateTableRequestPB;
 using kudu::master::CreateTableResponsePB;
@@ -369,7 +367,6 @@ Status KuduClientBuilder::Build(shared_ptr<KuduClient>* client) {
                         "Could not connect to the cluster");
 
   c->data_->meta_cache_.reset(new MetaCache(c.get(), data_->replica_visibility_));
-  c->data_->dns_resolver_.reset(new DnsResolver);
 
   // Init local host names used for locality decisions.
   RETURN_NOT_OK_PREPEND(c->data_->InitLocalHostNames(),
diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc
index 30c144b..eee0d31 100644
--- a/src/kudu/client/meta_cache.cc
+++ b/src/kudu/client/meta_cache.cc
@@ -136,9 +136,9 @@ void RemoteTabletServer::InitProxy(KuduClient* client, const StatusCallback& cb)
   }
 
   auto addrs = new vector<Sockaddr>();
-  client->data_->dns_resolver_->ResolveAddresses(
-    hp, addrs, Bind(&RemoteTabletServer::DnsResolutionFinished,
-                    Unretained(this), hp, addrs, client, cb));
+  client->data_->dns_resolver_->ResolveAddressesAsync(
+      hp, addrs, Bind(&RemoteTabletServer::DnsResolutionFinished,
+                      Unretained(this), hp, addrs, client, cb));
 }
 
 void RemoteTabletServer::Update(const master::TSInfoPB& pb) {
diff --git a/src/kudu/util/net/dns_resolver-test.cc b/src/kudu/util/net/dns_resolver-test.cc
index f08b089..f53df7d 100644
--- a/src/kudu/util/net/dns_resolver-test.cc
+++ b/src/kudu/util/net/dns_resolver-test.cc
@@ -17,36 +17,39 @@
 
 #include "kudu/util/net/dns_resolver.h"
 
+#include <cstdint>
+#include <cstdlib>
 #include <ostream>
 #include <string>
 #include <vector>
 
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
 #include "kudu/util/async_util.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/test_macros.h"
-#include "kudu/util/test_util.h"
+
+DECLARE_uint32(dns_resolver_cache_capacity_mb);
 
 using std::vector;
+using strings::Substitute;
 
 namespace kudu {
 
-class DnsResolverTest : public KuduTest {
- protected:
-  DnsResolver resolver_;
-};
-
-TEST_F(DnsResolverTest, TestResolution) {
+TEST(DnsResolverTest, AsyncResolution) {
   vector<Sockaddr> addrs;
+  // Non-caching asynchronous DNS resolver.
+  DnsResolver resolver(1/* max_threads_num */);
   Synchronizer s;
-  {
-    HostPort hp("localhost", 12345);
-    resolver_.ResolveAddresses(hp, &addrs, s.AsStatusCallback());
-  }
+  resolver.ResolveAddressesAsync(HostPort("localhost", 12345), &addrs,
+                                 s.AsStatusCallback());
   ASSERT_OK(s.Wait());
   ASSERT_TRUE(!addrs.empty());
   for (const Sockaddr& addr : addrs) {
@@ -56,4 +59,39 @@ TEST_F(DnsResolverTest, TestResolution) {
   }
 }
 
+TEST(DnsResolverTest, CachingVsNonCachingResolver) {
+  constexpr const auto kNumIterations = 1000;
+  constexpr const auto kIdxNonCached = 0;
+  constexpr const auto kIdxCached = 1;
+  constexpr const char* const kHost = "localhost";
+
+  MonoDelta timings[2];
+  for (auto idx = 0; idx < ARRAYSIZE(timings); ++idx) {
+    // DNS resolver's cache capacity of 0 means the results are not cached.
+    size_t capacity_mb = idx * 1024 * 1024;
+    DnsResolver resolver(1, capacity_mb, MonoDelta::FromSeconds(10));
+    const auto start_time = MonoTime::Now();
+    for (auto i = 0; i < kNumIterations; ++i) {
+      vector<Sockaddr> addrs;
+      uint16_t port = rand() % kNumIterations + kNumIterations;
+      {
+        HostPort hp(kHost, port);
+        ASSERT_OK(resolver.ResolveAddresses(hp, &addrs));
+      }
+      ASSERT_TRUE(!addrs.empty());
+      for (const Sockaddr& addr : addrs) {
+        EXPECT_TRUE(HasSuffixString(addr.ToString(), Substitute(":$0", port)));
+      }
+    }
+    timings[idx] = MonoTime::Now() - start_time;
+  }
+  LOG(INFO) << Substitute("$0 non-cached resolutions of '$1' took $2",
+                          kNumIterations, kHost,
+                          timings[kIdxNonCached].ToString());
+  LOG(INFO) << Substitute("$0     cached resolutions of '$1' took $2",
+                          kNumIterations, kHost,
+                          timings[kIdxCached].ToString());
+  ASSERT_GT(timings[kIdxNonCached], timings[kIdxCached]);
+}
+
 } // namespace kudu
diff --git a/src/kudu/util/net/dns_resolver.cc b/src/kudu/util/net/dns_resolver.cc
index 4803688..5038c2f 100644
--- a/src/kudu/util/net/dns_resolver.cc
+++ b/src/kudu/util/net/dns_resolver.cc
@@ -17,49 +17,131 @@
 
 #include "kudu/util/net/dns_resolver.h"
 
+#include <functional>
+#include <memory>
+#include <utility>
 #include <vector>
 
-#include <boost/bind.hpp> // IWYU pragma: keep
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 
 #include "kudu/gutil/callback.h"
+#include "kudu/gutil/port.h"
 #include "kudu/util/flag_tags.h"
-#include "kudu/util/threadpool.h"
+#include "kudu/util/malloc.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
 #include "kudu/util/status.h"
+#include "kudu/util/threadpool.h"
+#include "kudu/util/ttl_cache.h"
+
+DEFINE_int32(dns_resolver_max_threads_num, 1,
+             "The maximum number of threads to use for async DNS resolution");
+TAG_FLAG(dns_resolver_max_threads_num, advanced);
 
-DEFINE_int32(dns_num_resolver_threads, 1, "The number of threads to use for DNS resolution");
-TAG_FLAG(dns_num_resolver_threads, advanced);
+DEFINE_uint32(dns_resolver_cache_capacity_mb, 1,
+              "Capacity of DNS resolver cache, in MiBytes. For each key, the "
+              "cache stores records returned by getaddrinfo(). A value of 0 "
+              "means the results of DNS name resolution are not cached.");
+TAG_FLAG(dns_resolver_cache_capacity_mb, advanced);
 
+DEFINE_uint32(dns_resolver_cache_ttl_sec, 15,
+              "TTL of records in the DNS resolver cache, in seconds.");
+TAG_FLAG(dns_resolver_cache_ttl_sec, advanced);
+
+using std::unique_ptr;
 using std::vector;
 
 namespace kudu {
 
-DnsResolver::DnsResolver() {
+DnsResolver::DnsResolver(int max_threads_num,
+                         size_t cache_capacity_bytes,
+                         MonoDelta cache_ttl) {
   CHECK_OK(ThreadPoolBuilder("dns-resolver")
-           .set_max_threads(FLAGS_dns_num_resolver_threads)
+           .set_max_threads(max_threads_num)
            .Build(&pool_));
+  if (cache_capacity_bytes > 0) {
+    // Cache TTL should be a valid time interval if cache is enabled.
+    CHECK(cache_ttl.Initialized() && cache_ttl.ToNanoseconds() > 0);
+    cache_.reset(new HostRecordCache(cache_capacity_bytes, cache_ttl));
+  }
 }
 
 DnsResolver::~DnsResolver() {
   pool_->Shutdown();
 }
 
-namespace {
-void DoResolution(const HostPort &hostport, vector<Sockaddr>* addresses,
-                  const StatusCallback& cb) {
-  cb.Run(hostport.ResolveAddresses(addresses));
+Status DnsResolver::ResolveAddresses(const HostPort& hostport,
+                                     vector<Sockaddr>* addresses) {
+  if (GetCachedAddresses(hostport, addresses)) {
+    return Status::OK();
+  }
+  return DoResolution(hostport, addresses);
 }
-} // anonymous namespace
 
-void DnsResolver::ResolveAddresses(const HostPort& hostport,
-                                   vector<Sockaddr>* addresses,
-                                   const StatusCallback& cb) {
-  Status s = pool_->SubmitFunc(boost::bind(&DoResolution, hostport, addresses, cb));
+void DnsResolver::ResolveAddressesAsync(const HostPort& hostport,
+                                        vector<Sockaddr>* addresses,
+                                        const StatusCallback& cb) {
+  if (GetCachedAddresses(hostport, addresses)) {
+    return cb.Run(Status::OK());
+  }
+  const auto s = pool_->SubmitFunc(std::bind(&DnsResolver::DoResolutionCb,
+                                             this, hostport, addresses, cb));
   if (!s.ok()) {
     cb.Run(s);
   }
 }
 
+Status DnsResolver::DoResolution(const HostPort& hostport,
+                                 vector<Sockaddr>* addresses) {
+  vector<Sockaddr> resolved_addresses;
+  RETURN_NOT_OK(hostport.ResolveAddresses(&resolved_addresses));
+
+  if (PREDICT_TRUE(cache_)) {
+    unique_ptr<vector<Sockaddr>> cached_addresses(
+        new vector<Sockaddr>(resolved_addresses));
+    const auto& entry_key = hostport.host();
+    const auto entry_charge = kudu_malloc_usable_size(cached_addresses.get()) +
+        cached_addresses->capacity() > 0
+        ? kudu_malloc_usable_size(cached_addresses->data()) : 0;
+#ifndef NDEBUG
+    // Clear the port number.
+    for (auto& addr : *cached_addresses) {
+      addr.set_port(0);
+    }
+#endif
+    cache_->Put(entry_key, std::move(cached_addresses), entry_charge);
+  }
+
+  if (addresses) {
+    *addresses = std::move(resolved_addresses);
+  }
+  return Status::OK();
+}
+
+void DnsResolver::DoResolutionCb(const HostPort& hostport,
+                                 vector<Sockaddr>* addresses,
+                                 const StatusCallback& cb) {
+  cb.Run(DoResolution(hostport, addresses));
+}
+
+bool DnsResolver::GetCachedAddresses(const HostPort& hostport,
+                                     vector<Sockaddr>* addresses) {
+  if (PREDICT_TRUE(cache_)) {
+    auto handle = cache_->Get(hostport.host());
+    if (handle) {
+      if (addresses) {
+        vector<Sockaddr> result_addresses(handle.value());
+        for (auto& addr : result_addresses) {
+          addr.set_port(hostport.port());
+        }
+        *addresses = std::move(result_addresses);
+      }
+      return true;
+    }
+  }
+  return false;
+}
+
 } // namespace kudu
diff --git a/src/kudu/util/net/dns_resolver.h b/src/kudu/util/net/dns_resolver.h
index 06dfa48..8526385 100644
--- a/src/kudu/util/net/dns_resolver.h
+++ b/src/kudu/util/net/dns_resolver.h
@@ -14,49 +14,93 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_UTIL_NET_DNS_RESOLVER_H
-#define KUDU_UTIL_NET_DNS_RESOLVER_H
 
+#pragma once
+
+#include <stddef.h>
+
+#include <memory>
+#include <string>
 #include <vector>
 
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/status.h"
 #include "kudu/util/status_callback.h"
+#include "kudu/util/ttl_cache.h"
 
 namespace kudu {
 
 class HostPort;
-class Sockaddr;
 class ThreadPool;
 
-// DNS Resolver which supports async address resolution.
+// A utility class for DNS resolution. The resolver supports both synchronous
+// and asynchronous address resolution. Optionally, the resolved entries are
+// cached and re-used when not yet expired. The constructor's arguments define
+// parameters for the TTL cache containing the results of prior DNS lookups.
+// The cache doesn't store negative results, i.e. nothing is stored in the cache
+// when DNS resolution fails for the specified HostPort.
 class DnsResolver {
  public:
-  DnsResolver();
+  // The 'max_threads_num' parameter specifies the maximum number of threads in
+  // the pool used for asynchronous DNS resolution. The 'cache_capacity_bytes'
+  // parameter defines the capacity of the underlying TTL cache for resolved
+  // DNS records. If set to 0, resolved DNS entries are not cached. The
+  // 'cache_ttl' parameter defines the TTL for cache's entries.
+  explicit DnsResolver(int max_threads_num = 1,
+                       size_t cache_capacity_bytes = 0,
+                       MonoDelta cache_ttl = MonoDelta::FromSeconds(60));
   ~DnsResolver();
 
-  // Resolve any addresses corresponding to this host:port pair.
-  // Note that a host may resolve to more than one IP address.
+  // Synchronously resolve addresses corresponding to the specified host:port
+  // pair in 'hostport'. Note that a host may resolve to more than one IP
+  // address.
   //
-  // 'addresses' may be NULL, in which case this function simply checks that
-  // the host/port pair can be resolved, without returning anything.
+  // The 'addresses' output parameter may be nullptr, in which case this method
+  // simply checks that the host/port pair can be resolved, without returning
+  // the actual results.
+  Status ResolveAddresses(const HostPort& hostport,
+                          std::vector<Sockaddr>* addresses);
+
+  // The asynchronous version of ResolveAddresses() method.
+  // See ResolveAddresses() for information on 'hostport' and 'addresses'
+  // parameters.
   //
-  // When the result is available, or an error occurred, 'cb' is called
-  // with the result Status.
+  // When the result is available, or an error occurred, 'cb' is called with
+  // the result Status.
   //
   // NOTE: the callback should be fast since it is called by the DNS
   // resolution thread.
   // NOTE: in some rare cases, the callback may also be called inline
   // from this function call, on the caller's thread.
-  void ResolveAddresses(const HostPort& hostport,
-                        std::vector<Sockaddr>* addresses,
-                        const StatusCallback& cb);
+  void ResolveAddressesAsync(const HostPort& hostport,
+                             std::vector<Sockaddr>* addresses,
+                             const StatusCallback& cb);
 
  private:
+  // The cache is keyed by the host part of the HostPort structure, and the
+  // entry stores a vector of all Sockaddr structures produced by DNS resolution
+  // of the key. The port number is stored as a part of Sockaddr structure:
+  // it's not relevant for any lookup and re-written upon retrieval
+  // of the corresponding entry for the specified key.
+  typedef TTLCache<std::string, std::vector<Sockaddr>> HostRecordCache;
+
+  Status DoResolution(const HostPort& hostport,
+                      std::vector<Sockaddr>* addresses);
+
+  void DoResolutionCb(const HostPort& hostport,
+                      std::vector<Sockaddr>* addresses,
+                      const StatusCallback& cb);
+
+  bool GetCachedAddresses(const HostPort& hostport,
+                          std::vector<Sockaddr>* addresses);
+
   gscoped_ptr<ThreadPool> pool_;
+  std::unique_ptr<HostRecordCache> cache_;
 
   DISALLOW_COPY_AND_ASSIGN(DnsResolver);
 };
 
 } // namespace kudu
-#endif /* KUDU_UTIL_NET_DNS_RESOLVER_H */
diff --git a/src/kudu/util/net/net_util.cc b/src/kudu/util/net/net_util.cc
index ecb5bd4..b6bb89a 100644
--- a/src/kudu/util/net/net_util.cc
+++ b/src/kudu/util/net/net_util.cc
@@ -191,20 +191,22 @@ Status HostPort::ResolveAddresses(vector<Sockaddr>* addresses) const {
   LOG_SLOW_EXECUTION(WARNING, 200, op_description) {
     RETURN_NOT_OK(GetAddrInfo(host_, hints, op_description, &result));
   }
+  vector<Sockaddr> result_addresses;
   for (const addrinfo* ai = result.get(); ai != nullptr; ai = ai->ai_next) {
-    CHECK_EQ(ai->ai_family, AF_INET);
-    struct sockaddr_in* addr = reinterpret_cast<struct sockaddr_in*>(ai->ai_addr);
+    CHECK_EQ(AF_INET, ai->ai_family);
+    sockaddr_in* addr = reinterpret_cast<sockaddr_in*>(ai->ai_addr);
     addr->sin_port = htons(port_);
     Sockaddr sockaddr(*addr);
-    if (addresses) {
-      addresses->push_back(sockaddr);
-    }
-    VLOG(2) << "Resolved address " << sockaddr.ToString()
-            << " for host/port " << ToString();
+    VLOG(2) << Substitute("resolved address $0 for host/port $1",
+                          sockaddr.ToString(), ToString());
+    result_addresses.emplace_back(sockaddr);
   }
   if (PREDICT_FALSE(FLAGS_fail_dns_resolution)) {
     return Status::NetworkError("injected DNS resolution failure");
   }
+  if (addresses) {
+    *addresses = std::move(result_addresses);
+  }
   return Status::OK();
 }