You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2021/09/21 23:10:42 UTC

[kudu] 01/02: [util] allow DnsResolver to refresh DNS addresses

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 246505b538a6449eda7a9936609e9fec4632fe5d
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Wed Sep 15 19:45:09 2021 -0700

    [util] allow DnsResolver to refresh DNS addresses
    
    This patch introduces functionality to the DnsResolver to refresh an
    address, rather than looking it up in the cache. It does this by
    removing any cached entry and performing the lookup.
    
    This will be used in a follow-up change to refresh the address on
    certain transient failures.
    
    A new --dns_addr_resolution_override flag is also introduced for testing
    purposes.
    
    Change-Id: I0616f3e6fb50aba271f106b05d1926fc46a53ed0
    Reviewed-on: http://gerrit.cloudera.org:8080/17849
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/util/net/dns_resolver-test.cc | 94 +++++++++++++++++++++++++++++++++-
 src/kudu/util/net/dns_resolver.cc      | 20 ++++++++
 src/kudu/util/net/dns_resolver.h       |  6 +++
 src/kudu/util/net/net_util.cc          | 23 +++++++++
 src/kudu/util/ttl_cache.h              |  4 ++
 5 files changed, 145 insertions(+), 2 deletions(-)

diff --git a/src/kudu/util/net/dns_resolver-test.cc b/src/kudu/util/net/dns_resolver-test.cc
index f53df7d..b861c68 100644
--- a/src/kudu/util/net/dns_resolver-test.cc
+++ b/src/kudu/util/net/dns_resolver-test.cc
@@ -21,9 +21,10 @@
 #include <cstdlib>
 #include <ostream>
 #include <string>
+#include <thread>
 #include <vector>
 
-#include <gflags/gflags_declare.h>
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -34,10 +35,13 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/test_macros.h"
 
+DECLARE_string(dns_addr_resolution_override);
 DECLARE_uint32(dns_resolver_cache_capacity_mb);
 
+using std::thread;
 using std::vector;
 using strings::Substitute;
 
@@ -59,6 +63,92 @@ TEST(DnsResolverTest, AsyncResolution) {
   }
 }
 
+TEST(DnsResolverTest, RefreshCachedEntry) {
+  gflags::FlagSaver saver;
+  vector<Sockaddr> addrs;
+  DnsResolver resolver(1/* max_threads_num */, 1024 * 1024/* cache_capacity_bytes */);
+  ASSERT_OK(resolver.ResolveAddresses(HostPort("localhost", 12345), &addrs));
+  ASSERT_TRUE(!addrs.empty());
+  for (const Sockaddr& addr : addrs) {
+    LOG(INFO) << "Address: " << addr.ToString();
+    EXPECT_TRUE(HasPrefixString(addr.ToString(), "127."));
+    EXPECT_TRUE(HasSuffixString(addr.ToString(), ":12345"));
+  }
+  // If we override the DNS lookup address, when we refresh the address, the
+  // cached entry gets reset.
+  constexpr const char* kFakeAddr = "1.1.1.1";
+  FLAGS_dns_addr_resolution_override = Substitute("localhost=$0", kFakeAddr);
+  Synchronizer s;
+  resolver.RefreshAddressesAsync(HostPort("localhost", 1111), &addrs,
+                                 s.AsStatusCallback());
+  ASSERT_OK(s.Wait());
+  ASSERT_EQ(1, addrs.size());
+  ASSERT_EQ(Substitute("$0:1111", kFakeAddr), addrs[0].ToString());
+  ASSERT_EQ(1111, addrs[0].port());
+
+  // Once we stop overriding DNS lookups, simply getting the address from the
+  // resolver will read from the cache.
+  FLAGS_dns_addr_resolution_override = "";
+  ASSERT_OK(resolver.ResolveAddresses(HostPort("localhost", 12345), &addrs));
+  ASSERT_EQ(1, addrs.size());
+  ASSERT_EQ(Substitute("$0:12345", kFakeAddr), addrs[0].ToString());
+  ASSERT_EQ(12345, addrs[0].port());
+
+  // But a refresh should return the original address.
+  Synchronizer s2;
+  resolver.RefreshAddressesAsync(HostPort("localhost", 12345), &addrs,
+                                 s2.AsStatusCallback());
+  ASSERT_OK(s2.Wait());
+  EXPECT_FALSE(addrs.empty());
+  for (const Sockaddr& addr : addrs) {
+    LOG(INFO) << "Address: " << addr.ToString();
+    EXPECT_TRUE(HasPrefixString(addr.ToString(), "127."));
+    EXPECT_TRUE(HasSuffixString(addr.ToString(), ":12345"));
+  }
+}
+
+TEST(DnsResolverTest, ConcurrentRefreshesAndResolutions) {
+  constexpr int kNumThreads = 3;
+  constexpr int kNumResolutionsPerThread = 10;
+  DnsResolver resolver(1/* max_threads_num */, 1024 * 1024/* cache_capacity_bytes */);
+  vector<thread> threads;
+  auto cancel_threads = MakeScopedCleanup([&] {
+    for (auto& t : threads) {
+      t.join();
+    }
+  });
+  const auto validate_addrs = [] (const vector<Sockaddr>& addrs) {
+    ASSERT_FALSE(addrs.empty());
+    for (const Sockaddr& addr : addrs) {
+      EXPECT_TRUE(HasPrefixString(addr.ToString(), "127."));
+      EXPECT_TRUE(HasSuffixString(addr.ToString(), ":12345"));
+    }
+  };
+  for (int i = 0; i < kNumThreads - 1; i++) {
+    threads.emplace_back([&] {
+      for (int r = 0; r < kNumResolutionsPerThread; r++) {
+        vector<Sockaddr> addrs;
+        Synchronizer s;
+        resolver.RefreshAddressesAsync(HostPort("localhost", 12345), &addrs,
+                                       s.AsStatusCallback());
+        ASSERT_OK(s.Wait());
+        NO_FATALS(validate_addrs(addrs));
+      }
+    });
+  }
+  threads.emplace_back([&] {
+    for (int r = 0; r < kNumResolutionsPerThread; r++) {
+      vector<Sockaddr> addrs;
+      ASSERT_OK(resolver.ResolveAddresses(HostPort("localhost", 12345), &addrs));
+      NO_FATALS(validate_addrs(addrs));
+    }
+  });
+  for (auto& t : threads) {
+    t.join();
+  }
+  cancel_threads.cancel();
+}
+
 TEST(DnsResolverTest, CachingVsNonCachingResolver) {
   constexpr const auto kNumIterations = 1000;
   constexpr const auto kIdxNonCached = 0;
@@ -80,7 +170,7 @@ TEST(DnsResolverTest, CachingVsNonCachingResolver) {
       }
       ASSERT_TRUE(!addrs.empty());
       for (const Sockaddr& addr : addrs) {
-        EXPECT_TRUE(HasSuffixString(addr.ToString(), Substitute(":$0", port)));
+        EXPECT_TRUE(HasSuffixString(addr.ToString(), Substitute(":$0", port))) << addr.ToString();
       }
     }
     timings[idx] = MonoTime::Now() - start_time;
diff --git a/src/kudu/util/net/dns_resolver.cc b/src/kudu/util/net/dns_resolver.cc
index 6a29811..5eb9f0d 100644
--- a/src/kudu/util/net/dns_resolver.cc
+++ b/src/kudu/util/net/dns_resolver.cc
@@ -92,6 +92,26 @@ void DnsResolver::ResolveAddressesAsync(const HostPort& hostport,
   }
 }
 
+void DnsResolver::RefreshAddressesAsync(const HostPort& hostport,
+                                        vector<Sockaddr>* addresses,
+                                        const StatusCallback& cb) {
+  if (PREDICT_TRUE(cache_)) {
+    cache_->Erase(hostport.host());
+  }
+  const auto s = pool_->Submit([=]() {
+    // Before performing the resolution, check if another task has already
+    // resolved it and cached a new entry.
+    if (this->GetCachedAddresses(hostport, addresses)) {
+      cb(Status::OK());
+      return;
+    }
+    this->DoResolutionCb(hostport, addresses, cb);
+  });
+  if (!s.ok()) {
+    cb(s);
+  }
+}
+
 Status DnsResolver::DoResolution(const HostPort& hostport,
                                  vector<Sockaddr>* addresses) {
   vector<Sockaddr> resolved_addresses;
diff --git a/src/kudu/util/net/dns_resolver.h b/src/kudu/util/net/dns_resolver.h
index 6cde473..3031a6b 100644
--- a/src/kudu/util/net/dns_resolver.h
+++ b/src/kudu/util/net/dns_resolver.h
@@ -77,6 +77,12 @@ class DnsResolver {
                              std::vector<Sockaddr>* addresses,
                              const StatusCallback& cb);
 
+  // Like ResolveAddressesAsync(), but initially removes any existing cached
+  // entry, in favor of resolving the address explicitly.
+  void RefreshAddressesAsync(const HostPort& hostport,
+                             std::vector<Sockaddr>* addresses,
+                             const StatusCallback& cb);
+
  private:
   // The cache is keyed by the host part of the HostPort structure, and the
   // entry stores a vector of all Sockaddr structures produced by DNS resolution
diff --git a/src/kudu/util/net/net_util.cc b/src/kudu/util/net/net_util.cc
index 2f1c671..aba828e 100644
--- a/src/kudu/util/net/net_util.cc
+++ b/src/kudu/util/net/net_util.cc
@@ -55,6 +55,7 @@
 #include "kudu/util/net/socket.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/stopwatch.h"
+#include "kudu/util/string_case.h"
 #include "kudu/util/subprocess.h"
 #include "kudu/util/thread_restrictions.h"
 #include "kudu/util/trace.h"
@@ -71,6 +72,12 @@ DEFINE_string(fail_dns_resolution_hostports, "",
               "dns resolution attempts. Only takes effect if --fail_dns_resolution is 'true'.");
 TAG_FLAG(fail_dns_resolution_hostports, hidden);
 
+DEFINE_string(dns_addr_resolution_override, "",
+              "Comma-separated list of '='-separated pairs of hosts to addresses. The left-hand "
+              "side of the '=' is taken as a host, and will resolve to the right-hand side which "
+              "is expected to be a socket address with no port.");
+TAG_FLAG(dns_addr_resolution_override, hidden);
+
 using std::function;
 using std::string;
 using std::unordered_set;
@@ -193,6 +200,22 @@ Status HostPort::ResolveAddresses(vector<Sockaddr>* addresses) const {
   TRACE_EVENT1("net", "HostPort::ResolveAddresses",
                "host", host_);
   TRACE_COUNTER_SCOPE_LATENCY_US("dns_us");
+  if (PREDICT_FALSE(!FLAGS_dns_addr_resolution_override.empty())) {
+    vector<string> hosts_and_addrs = Split(FLAGS_dns_addr_resolution_override, ",");
+    for (const auto& ha : hosts_and_addrs) {
+      vector<string> host_and_addr = Split(ha, "=");
+      if (host_and_addr.size() != 2) {
+        return Status::InvalidArgument("failed to parse injected address override");
+      }
+      if (iequals(host_and_addr[0], host_)) {
+        Sockaddr addr;
+        RETURN_NOT_OK_PREPEND(addr.ParseString(host_and_addr[1], port_),
+            "failed to parse injected address override");
+        *addresses = { addr };
+        return Status::OK();
+      }
+    }
+  }
   struct addrinfo hints;
   memset(&hints, 0, sizeof(hints));
   hints.ai_family = AF_INET;
diff --git a/src/kudu/util/ttl_cache.h b/src/kudu/util/ttl_cache.h
index 5ee6233..5c9c459 100644
--- a/src/kudu/util/ttl_cache.h
+++ b/src/kudu/util/ttl_cache.h
@@ -180,6 +180,10 @@ class TTLCache {
     return EntryHandle(DCHECK_NOTNULL(entry_ptr->val_ptr), std::move(h));
   }
 
+  void Erase(const K& key) {
+    cache_->Erase(key);
+  }
+
   // For the specified key, add an entry into the cache or replace already
   // existing one. The 'charge' parameter specifies the charge to associate
   // with the entry with regard to the cache's capacity. This method returns