You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2021/09/21 23:10:41 UTC

[kudu] branch master updated (63a3293 -> 41ebabf)

This is an automated email from the ASF dual-hosted git repository.

awong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 63a3293  [common] small re-factoring on PartitionPruner
     new 246505b  [util] allow DnsResolver to refresh DNS addresses
     new 41ebabf  [rpc] KUDU-75: refresh DNS entries if proxies hit a network error

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/rpc/CMakeLists.txt            |   1 +
 src/kudu/rpc/connection_id.cc          |   4 +-
 src/kudu/rpc/connection_id.h           |   4 +
 src/kudu/rpc/mt-rpc-test.cc            |   4 +-
 src/kudu/rpc/protoc-gen-krpc.cc        |  19 ++-
 src/kudu/rpc/proxy-test.cc             | 221 +++++++++++++++++++++++++++++++++
 src/kudu/rpc/proxy.cc                  | 156 +++++++++++++++++++++--
 src/kudu/rpc/proxy.h                   |  67 +++++++++-
 src/kudu/rpc/rpc-test-base.h           |  26 ++--
 src/kudu/rpc/rpc-test.cc               |  82 ++++++------
 src/kudu/rpc/rpc_stub-test.cc          |   2 +-
 src/kudu/util/net/dns_resolver-test.cc |  94 +++++++++++++-
 src/kudu/util/net/dns_resolver.cc      |  20 +++
 src/kudu/util/net/dns_resolver.h       |   6 +
 src/kudu/util/net/net_util.cc          |  23 ++++
 src/kudu/util/net/sockaddr.cc          |   3 +
 src/kudu/util/net/sockaddr.h           |   2 +-
 src/kudu/util/ttl_cache.h              |   4 +
 18 files changed, 662 insertions(+), 76 deletions(-)
 create mode 100644 src/kudu/rpc/proxy-test.cc

[kudu] 01/02: [util] allow DnsResolver to refresh DNS addresses

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 246505b538a6449eda7a9936609e9fec4632fe5d
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Wed Sep 15 19:45:09 2021 -0700

    [util] allow DnsResolver to refresh DNS addresses
    
    This patch introduces functionality to the DnsResolver to refresh an
    address, rather than looking it up in the cache. It does this by
    removing any cached entry and performing the lookup.
    
    This will be used in a follow-up change to refresh the address on
    certain transient failures.
    
    A new --dns_addr_resolution_override flag is also introduced for testing
    purposes.
    
    Change-Id: I0616f3e6fb50aba271f106b05d1926fc46a53ed0
    Reviewed-on: http://gerrit.cloudera.org:8080/17849
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/util/net/dns_resolver-test.cc | 94 +++++++++++++++++++++++++++++++++-
 src/kudu/util/net/dns_resolver.cc      | 20 ++++++++
 src/kudu/util/net/dns_resolver.h       |  6 +++
 src/kudu/util/net/net_util.cc          | 23 +++++++++
 src/kudu/util/ttl_cache.h              |  4 ++
 5 files changed, 145 insertions(+), 2 deletions(-)

diff --git a/src/kudu/util/net/dns_resolver-test.cc b/src/kudu/util/net/dns_resolver-test.cc
index f53df7d..b861c68 100644
--- a/src/kudu/util/net/dns_resolver-test.cc
+++ b/src/kudu/util/net/dns_resolver-test.cc
@@ -21,9 +21,10 @@
 #include <cstdlib>
 #include <ostream>
 #include <string>
+#include <thread>
 #include <vector>
 
-#include <gflags/gflags_declare.h>
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -34,10 +35,13 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/test_macros.h"
 
+DECLARE_string(dns_addr_resolution_override);
 DECLARE_uint32(dns_resolver_cache_capacity_mb);
 
+using std::thread;
 using std::vector;
 using strings::Substitute;
 
@@ -59,6 +63,92 @@ TEST(DnsResolverTest, AsyncResolution) {
   }
 }
 
+TEST(DnsResolverTest, RefreshCachedEntry) {
+  gflags::FlagSaver saver;
+  vector<Sockaddr> addrs;
+  DnsResolver resolver(1/* max_threads_num */, 1024 * 1024/* cache_capacity_bytes */);
+  ASSERT_OK(resolver.ResolveAddresses(HostPort("localhost", 12345), &addrs));
+  ASSERT_TRUE(!addrs.empty());
+  for (const Sockaddr& addr : addrs) {
+    LOG(INFO) << "Address: " << addr.ToString();
+    EXPECT_TRUE(HasPrefixString(addr.ToString(), "127."));
+    EXPECT_TRUE(HasSuffixString(addr.ToString(), ":12345"));
+  }
+  // If we override the DNS lookup address, when we refresh the address, the
+  // cached entry gets reset.
+  constexpr const char* kFakeAddr = "1.1.1.1";
+  FLAGS_dns_addr_resolution_override = Substitute("localhost=$0", kFakeAddr);
+  Synchronizer s;
+  resolver.RefreshAddressesAsync(HostPort("localhost", 1111), &addrs,
+                                 s.AsStatusCallback());
+  ASSERT_OK(s.Wait());
+  ASSERT_EQ(1, addrs.size());
+  ASSERT_EQ(Substitute("$0:1111", kFakeAddr), addrs[0].ToString());
+  ASSERT_EQ(1111, addrs[0].port());
+
+  // Once we stop overriding DNS lookups, simply getting the address from the
+  // resolver will read from the cache.
+  FLAGS_dns_addr_resolution_override = "";
+  ASSERT_OK(resolver.ResolveAddresses(HostPort("localhost", 12345), &addrs));
+  ASSERT_EQ(1, addrs.size());
+  ASSERT_EQ(Substitute("$0:12345", kFakeAddr), addrs[0].ToString());
+  ASSERT_EQ(12345, addrs[0].port());
+
+  // But a refresh should return the original address.
+  Synchronizer s2;
+  resolver.RefreshAddressesAsync(HostPort("localhost", 12345), &addrs,
+                                 s2.AsStatusCallback());
+  ASSERT_OK(s2.Wait());
+  EXPECT_FALSE(addrs.empty());
+  for (const Sockaddr& addr : addrs) {
+    LOG(INFO) << "Address: " << addr.ToString();
+    EXPECT_TRUE(HasPrefixString(addr.ToString(), "127."));
+    EXPECT_TRUE(HasSuffixString(addr.ToString(), ":12345"));
+  }
+}
+
+TEST(DnsResolverTest, ConcurrentRefreshesAndResolutions) {
+  constexpr int kNumThreads = 3;
+  constexpr int kNumResolutionsPerThread = 10;
+  DnsResolver resolver(1/* max_threads_num */, 1024 * 1024/* cache_capacity_bytes */);
+  vector<thread> threads;
+  auto cancel_threads = MakeScopedCleanup([&] {
+    for (auto& t : threads) {
+      t.join();
+    }
+  });
+  const auto validate_addrs = [] (const vector<Sockaddr>& addrs) {
+    ASSERT_FALSE(addrs.empty());
+    for (const Sockaddr& addr : addrs) {
+      EXPECT_TRUE(HasPrefixString(addr.ToString(), "127."));
+      EXPECT_TRUE(HasSuffixString(addr.ToString(), ":12345"));
+    }
+  };
+  for (int i = 0; i < kNumThreads - 1; i++) {
+    threads.emplace_back([&] {
+      for (int r = 0; r < kNumResolutionsPerThread; r++) {
+        vector<Sockaddr> addrs;
+        Synchronizer s;
+        resolver.RefreshAddressesAsync(HostPort("localhost", 12345), &addrs,
+                                       s.AsStatusCallback());
+        ASSERT_OK(s.Wait());
+        NO_FATALS(validate_addrs(addrs));
+      }
+    });
+  }
+  threads.emplace_back([&] {
+    for (int r = 0; r < kNumResolutionsPerThread; r++) {
+      vector<Sockaddr> addrs;
+      ASSERT_OK(resolver.ResolveAddresses(HostPort("localhost", 12345), &addrs));
+      NO_FATALS(validate_addrs(addrs));
+    }
+  });
+  for (auto& t : threads) {
+    t.join();
+  }
+  cancel_threads.cancel();
+}
+
 TEST(DnsResolverTest, CachingVsNonCachingResolver) {
   constexpr const auto kNumIterations = 1000;
   constexpr const auto kIdxNonCached = 0;
@@ -80,7 +170,7 @@ TEST(DnsResolverTest, CachingVsNonCachingResolver) {
       }
       ASSERT_TRUE(!addrs.empty());
       for (const Sockaddr& addr : addrs) {
-        EXPECT_TRUE(HasSuffixString(addr.ToString(), Substitute(":$0", port)));
+        EXPECT_TRUE(HasSuffixString(addr.ToString(), Substitute(":$0", port))) << addr.ToString();
       }
     }
     timings[idx] = MonoTime::Now() - start_time;
diff --git a/src/kudu/util/net/dns_resolver.cc b/src/kudu/util/net/dns_resolver.cc
index 6a29811..5eb9f0d 100644
--- a/src/kudu/util/net/dns_resolver.cc
+++ b/src/kudu/util/net/dns_resolver.cc
@@ -92,6 +92,26 @@ void DnsResolver::ResolveAddressesAsync(const HostPort& hostport,
   }
 }
 
+void DnsResolver::RefreshAddressesAsync(const HostPort& hostport,
+                                        vector<Sockaddr>* addresses,
+                                        const StatusCallback& cb) {
+  if (PREDICT_TRUE(cache_)) {
+    cache_->Erase(hostport.host());
+  }
+  const auto s = pool_->Submit([=]() {
+    // Before performing the resolution, check if another task has already
+    // resolved it and cached a new entry.
+    if (this->GetCachedAddresses(hostport, addresses)) {
+      cb(Status::OK());
+      return;
+    }
+    this->DoResolutionCb(hostport, addresses, cb);
+  });
+  if (!s.ok()) {
+    cb(s);
+  }
+}
+
 Status DnsResolver::DoResolution(const HostPort& hostport,
                                  vector<Sockaddr>* addresses) {
   vector<Sockaddr> resolved_addresses;
diff --git a/src/kudu/util/net/dns_resolver.h b/src/kudu/util/net/dns_resolver.h
index 6cde473..3031a6b 100644
--- a/src/kudu/util/net/dns_resolver.h
+++ b/src/kudu/util/net/dns_resolver.h
@@ -77,6 +77,12 @@ class DnsResolver {
                              std::vector<Sockaddr>* addresses,
                              const StatusCallback& cb);
 
+  // Like ResolveAddressesAsync(), but initially removes any existing cached
+  // entry, in favor of resolving the address explicitly.
+  void RefreshAddressesAsync(const HostPort& hostport,
+                             std::vector<Sockaddr>* addresses,
+                             const StatusCallback& cb);
+
  private:
   // The cache is keyed by the host part of the HostPort structure, and the
   // entry stores a vector of all Sockaddr structures produced by DNS resolution
diff --git a/src/kudu/util/net/net_util.cc b/src/kudu/util/net/net_util.cc
index 2f1c671..aba828e 100644
--- a/src/kudu/util/net/net_util.cc
+++ b/src/kudu/util/net/net_util.cc
@@ -55,6 +55,7 @@
 #include "kudu/util/net/socket.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/stopwatch.h"
+#include "kudu/util/string_case.h"
 #include "kudu/util/subprocess.h"
 #include "kudu/util/thread_restrictions.h"
 #include "kudu/util/trace.h"
@@ -71,6 +72,12 @@ DEFINE_string(fail_dns_resolution_hostports, "",
               "dns resolution attempts. Only takes effect if --fail_dns_resolution is 'true'.");
 TAG_FLAG(fail_dns_resolution_hostports, hidden);
 
+DEFINE_string(dns_addr_resolution_override, "",
+              "Comma-separated list of '='-separated pairs of hosts to addresses. The left-hand "
+              "side of the '=' is taken as a host, and will resolve to the right-hand side which "
+              "is expected to be a socket address with no port.");
+TAG_FLAG(dns_addr_resolution_override, hidden);
+
 using std::function;
 using std::string;
 using std::unordered_set;
@@ -193,6 +200,22 @@ Status HostPort::ResolveAddresses(vector<Sockaddr>* addresses) const {
   TRACE_EVENT1("net", "HostPort::ResolveAddresses",
                "host", host_);
   TRACE_COUNTER_SCOPE_LATENCY_US("dns_us");
+  if (PREDICT_FALSE(!FLAGS_dns_addr_resolution_override.empty())) {
+    vector<string> hosts_and_addrs = Split(FLAGS_dns_addr_resolution_override, ",");
+    for (const auto& ha : hosts_and_addrs) {
+      vector<string> host_and_addr = Split(ha, "=");
+      if (host_and_addr.size() != 2) {
+        return Status::InvalidArgument("failed to parse injected address override");
+      }
+      if (iequals(host_and_addr[0], host_)) {
+        Sockaddr addr;
+        RETURN_NOT_OK_PREPEND(addr.ParseString(host_and_addr[1], port_),
+            "failed to parse injected address override");
+        *addresses = { addr };
+        return Status::OK();
+      }
+    }
+  }
   struct addrinfo hints;
   memset(&hints, 0, sizeof(hints));
   hints.ai_family = AF_INET;
diff --git a/src/kudu/util/ttl_cache.h b/src/kudu/util/ttl_cache.h
index 5ee6233..5c9c459 100644
--- a/src/kudu/util/ttl_cache.h
+++ b/src/kudu/util/ttl_cache.h
@@ -180,6 +180,10 @@ class TTLCache {
     return EntryHandle(DCHECK_NOTNULL(entry_ptr->val_ptr), std::move(h));
   }
 
+  void Erase(const K& key) {
+    cache_->Erase(key);
+  }
+
   // For the specified key, add an entry into the cache or replace already
   // existing one. The 'charge' parameter specifies the charge to associate
   // with the entry with regard to the cache's capacity. This method returns

[kudu] 02/02: [rpc] KUDU-75: refresh DNS entries if proxies hit a network error

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 41ebabf2eb618b33fd30ad1821ccbda9d6390010
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Sat Aug 28 13:55:47 2021 -0700

    [rpc] KUDU-75: refresh DNS entries if proxies hit a network error
    
    This patch aims to tackle the following issues that revolve around
    changes in addresses at runtime.
    - KUDU-1885: master long-lived tserver proxies need to be re-resolved in
      case nodes are assigned different addresses; today we just retry at
      the same location forever.
    - KUDU-1620: tablet consensus long-lived proxies need to be re-resolved
      on failure.
    - C++ clients' usages of RemoteTabletServer also have long-lived proxies
      and are likely to run into similar problems if tservers are restarted
      and assigned new physical addresses.
    
    It addresses this by plumbing a DnsResolver into the rpc::Proxy class,
    and chaining the asynchronous callback to an asynchronous refresh of the
    address with the newly introduced refreshing capabilities of the
    DnsResolver.
    
    The new style of proxy isn't currently used, but a test is added
    exercising the new functionality.
    
    Change-Id: I777d169bd3a461294e5721f05071b726ced70f7e
    Reviewed-on: http://gerrit.cloudera.org:8080/17839
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/rpc/CMakeLists.txt     |   1 +
 src/kudu/rpc/connection_id.cc   |   4 +-
 src/kudu/rpc/connection_id.h    |   4 +
 src/kudu/rpc/mt-rpc-test.cc     |   4 +-
 src/kudu/rpc/protoc-gen-krpc.cc |  19 +++-
 src/kudu/rpc/proxy-test.cc      | 221 ++++++++++++++++++++++++++++++++++++++++
 src/kudu/rpc/proxy.cc           | 156 ++++++++++++++++++++++++++--
 src/kudu/rpc/proxy.h            |  67 +++++++++++-
 src/kudu/rpc/rpc-test-base.h    |  26 ++---
 src/kudu/rpc/rpc-test.cc        |  82 +++++++--------
 src/kudu/rpc/rpc_stub-test.cc   |   2 +-
 src/kudu/util/net/sockaddr.cc   |   3 +
 src/kudu/util/net/sockaddr.h    |   2 +-
 13 files changed, 517 insertions(+), 74 deletions(-)

diff --git a/src/kudu/rpc/CMakeLists.txt b/src/kudu/rpc/CMakeLists.txt
index 273842d..c8d831e 100644
--- a/src/kudu/rpc/CMakeLists.txt
+++ b/src/kudu/rpc/CMakeLists.txt
@@ -129,6 +129,7 @@ ADD_KUDU_TEST(exactly_once_rpc-test PROCESSORS 10)
 ADD_KUDU_TEST(mt-rpc-test RUN_SERIAL true)
 ADD_KUDU_TEST(negotiation-test)
 ADD_KUDU_TEST(periodic-test)
+ADD_KUDU_TEST(proxy-test)
 ADD_KUDU_TEST(reactor-test)
 ADD_KUDU_TEST(request_tracker-test)
 ADD_KUDU_TEST(rpc-bench RUN_SERIAL true)
diff --git a/src/kudu/rpc/connection_id.cc b/src/kudu/rpc/connection_id.cc
index 9728f01..8a14d83 100644
--- a/src/kudu/rpc/connection_id.cc
+++ b/src/kudu/rpc/connection_id.cc
@@ -20,7 +20,7 @@
 #include <cstddef>
 #include <utility>
 
-#include <boost/functional/hash/hash.hpp>
+#include <boost/container_hash/extensions.hpp>
 #include <glog/logging.h>
 
 #include "kudu/gutil/strings/substitute.h"
@@ -52,7 +52,7 @@ void ConnectionId::set_network_plane(string network_plane) {
 
 string ConnectionId::ToString() const {
   string remote;
-  if (remote_.is_ip() && hostname_ != remote_.host()) {
+  if (remote_.is_initialized() && remote_.is_ip() && hostname_ != remote_.host()) {
     remote = strings::Substitute("$0 ($1)", remote_.ToString(), hostname_);
   } else {
     remote = remote_.ToString();
diff --git a/src/kudu/rpc/connection_id.h b/src/kudu/rpc/connection_id.h
index 6ec98f7..0aed953 100644
--- a/src/kudu/rpc/connection_id.h
+++ b/src/kudu/rpc/connection_id.h
@@ -45,6 +45,10 @@ class ConnectionId {
 
   const std::string& hostname() const { return hostname_; }
 
+  void set_remote(const Sockaddr& remote) {
+    remote_ = remote;
+  }
+
   // The credentials of the user associated with this connection, if any.
   void set_user_credentials(UserCredentials user_credentials);
 
diff --git a/src/kudu/rpc/mt-rpc-test.cc b/src/kudu/rpc/mt-rpc-test.cc
index 09bda4b..3496e26 100644
--- a/src/kudu/rpc/mt-rpc-test.cc
+++ b/src/kudu/rpc/mt-rpc-test.cc
@@ -71,7 +71,7 @@ class MultiThreadedRpcTest : public RpcTestBase {
     CHECK_OK(CreateMessenger("ClientSC", &client_messenger));
     Proxy p(client_messenger, server_addr, server_addr.host(),
             GenericCalculatorService::static_service_name());
-    *result = DoTestSyncCall(p, method_name);
+    *result = DoTestSyncCall(&p, method_name);
     latch->CountDown();
   }
 
@@ -93,7 +93,7 @@ class MultiThreadedRpcTest : public RpcTestBase {
     int i = 0;
     while (true) {
       i++;
-      Status s = DoTestSyncCall(p, method_name);
+      Status s = DoTestSyncCall(&p, method_name);
       if (!s.ok()) {
         // Return on first failure.
         LOG(INFO) << "Call failed. Shutting down client thread. Ran " << i << " calls: "
diff --git a/src/kudu/rpc/protoc-gen-krpc.cc b/src/kudu/rpc/protoc-gen-krpc.cc
index 2226160..e8d8e55 100644
--- a/src/kudu/rpc/protoc-gen-krpc.cc
+++ b/src/kudu/rpc/protoc-gen-krpc.cc
@@ -575,6 +575,10 @@ class CodeGenerator : public ::google::protobuf::compiler::CodeGenerator {
           "      std::shared_ptr<::kudu::rpc::Messenger> messenger,\n"
           "      const ::kudu::Sockaddr& sockaddr,\n"
           "      std::string hostname);\n"
+          "  $service_name$Proxy(\n"
+          "      std::shared_ptr<::kudu::rpc::Messenger> messenger,\n"
+          "      const ::kudu::HostPort& hp,\n"
+          "      DnsResolver* dns_resolver);\n"
           "  ~$service_name$Proxy();\n");
 
       for (int method_idx = 0; method_idx < service->method_count();
@@ -639,6 +643,15 @@ class CodeGenerator : public ::google::protobuf::compiler::CodeGenerator {
           "            std::move(hostname),\n"
           "            \"$full_service_name$\") {\n"
           "}\n"
+          "$service_name$Proxy::$service_name$Proxy(\n"
+          "    std::shared_ptr<::kudu::rpc::Messenger> messenger,\n"
+          "    const ::kudu::HostPort& hp,\n"
+          "    DnsResolver* dns_resolver)\n"
+          "    : Proxy(std::move(messenger),\n"
+          "            hp,\n"
+          "            dns_resolver,\n"
+          "            \"$full_service_name$\") {\n"
+          "}\n"
           "\n"
           "$service_name$Proxy::~$service_name$Proxy() {\n"
           "}\n");
@@ -652,7 +665,8 @@ class CodeGenerator : public ::google::protobuf::compiler::CodeGenerator {
             "    const $request$& req,\n"
             "    $response$* resp,\n"
             "    ::kudu::rpc::RpcController* controller) {\n"
-            "  return SyncRequest(\"$rpc_name$\", req, resp, controller);\n"
+            "  static const std::string kRpcName = \"$rpc_name$\";\n"
+            "  return SyncRequest(kRpcName, req, resp, controller);\n"
             "}\n"
             "\n"
             "void $service_name$Proxy::$rpc_name$Async(\n"
@@ -660,7 +674,8 @@ class CodeGenerator : public ::google::protobuf::compiler::CodeGenerator {
             "    $response$* resp,\n"
             "    ::kudu::rpc::RpcController* controller,\n"
             "    const ::kudu::rpc::ResponseCallback& callback) {\n"
-            "  AsyncRequest(\"$rpc_name$\", req, resp, controller, callback);\n"
+            "  static const std::string kRpcName = \"$rpc_name$\";\n"
+            "  AsyncRequest(kRpcName, req, resp, controller, callback);\n"
             "}\n");
         subs->Pop(); // method
       }
diff --git a/src/kudu/rpc/proxy-test.cc b/src/kudu/rpc/proxy-test.cc
new file mode 100644
index 0000000..34b626c
--- /dev/null
+++ b/src/kudu/rpc/proxy-test.cc
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/proxy.h"
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/rpc-test-base.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rtest.pb.h"
+#include "kudu/rpc/service_pool.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/dns_resolver.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_string(dns_addr_resolution_override);
+
+using std::shared_ptr;
+using std::string;
+using std::thread;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace rpc {
+
+class Messenger;
+
+namespace {
+
+constexpr uint16_t kPort = 1111;
+constexpr const char* kFakeHost = "fakehost";
+const HostPort kFakeHostPort(kFakeHost, kPort);
+
+Status SendRequest(Proxy* p) {
+  SleepRequestPB req;
+  req.set_sleep_micros(100 * 1000); // 100ms
+  SleepResponsePB resp;
+  RpcController controller;
+  controller.set_timeout(MonoDelta::FromMilliseconds(10000));
+  return p->SyncRequest(GenericCalculatorService::kSleepMethodName, req, &resp, &controller);
+}
+
+} // anonymous namespace
+
+class RpcProxyTest : public RpcTestBase {
+};
+
+// Test that proxies initialized with a DnsResolver return errors when
+// receiving a non-transient error.
+TEST_F(RpcProxyTest, TestProxyReturnsOnNonTransientError) {
+  SKIP_IF_SLOW_NOT_ALLOWED();  // This test waits for a timeout.
+
+  shared_ptr<Messenger> client_messenger;
+  ASSERT_OK(CreateMessenger("client_messenger", &client_messenger));
+  DnsResolver dns_resolver(1, 1024 * 1024);
+  Proxy p(client_messenger, kFakeHostPort, &dns_resolver,
+          CalculatorService::static_service_name());
+  p.Init();
+  Status s = SendRequest(&p);
+  ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+
+  // If we do resolve to an address that turns out to be bogus, we should
+  // time out when negotiating.
+  FLAGS_dns_addr_resolution_override = Substitute("$0=1.1.1.1", kFakeHost);
+  s = SendRequest(&p);
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+}
+
+// Test that ensures a proxy initialized with an address will use that address.
+TEST_F(RpcProxyTest, TestProxyUsesInitialAddr) {
+  string ip1 = GetBindIpForDaemon(/*index*/1, kDefaultBindMode);
+  Sockaddr server_addr;
+  ASSERT_OK(server_addr.ParseString(ip1, kPort));
+  ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr));
+
+  // Despite our proxy being configured with a fake host, our request should
+  // still go through since we call Init() with a valid address.
+  shared_ptr<Messenger> client_messenger;
+  ASSERT_OK(CreateMessenger("client_messenger", &client_messenger));
+  DnsResolver dns_resolver(1, 1024 * 1024);
+  Proxy p(client_messenger, kFakeHostPort, &dns_resolver,
+          CalculatorService::static_service_name());
+  p.Init(server_addr);
+  ASSERT_OK(SendRequest(&p));
+
+  server_messenger_.reset();
+  service_pool_.reset();
+
+  // With our server down, the request should fail.
+  Status s = SendRequest(&p);
+  ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+
+  // Once we bring up a new server and allow our proxy to resolve it, the
+  // request should succeed.
+  string ip2 = GetBindIpForDaemon(/*index*/2, kDefaultBindMode);
+  Sockaddr second_addr;
+  ASSERT_OK(second_addr.ParseString(ip2, kPort));
+  ASSERT_OK(StartTestServerWithGeneratedCode(&second_addr));
+  FLAGS_dns_addr_resolution_override = Substitute("$0=$1", kFakeHost, second_addr.ToString());
+  ASSERT_OK(SendRequest(&p));
+}
+
+TEST_F(RpcProxyTest, TestNonResolvingProxyIgnoresInit) {
+  string ip = GetBindIpForDaemon(/*index*/1, kDefaultBindMode);
+  Sockaddr server_addr;
+  ASSERT_OK(server_addr.ParseString(ip, kPort));
+  ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr));
+
+  shared_ptr<Messenger> client_messenger;
+  ASSERT_OK(CreateMessenger("client_messenger", &client_messenger));
+  DnsResolver dns_resolver(1, 1024 * 1024);
+  HostPort hp(ip, kPort);
+  Proxy p(client_messenger, hp, &dns_resolver, CalculatorService::static_service_name());
+
+  // Call Init() with a fake address. Because this proxy isn't configured for
+  // address re-resolution, the new address is ignored.
+  Sockaddr fake_addr;
+  ASSERT_OK(fake_addr.ParseString("1.1.1.1", kPort));
+  p.Init(fake_addr);
+
+  // We should thus have no trouble sending a request.
+  ASSERT_OK(SendRequest(&p));
+}
+
+// Start a proxy with a DNS resolver that maps a hostname to the address bound
+// by the server. Then restart the server but bind to a different address, and
+// update the DNS resolver to map the same hostname to the different address.
+// The proxy should eventually be usable.
+TEST_F(RpcProxyTest, TestProxyReresolvesAddress) {
+  string ip1 = GetBindIpForDaemon(/*index*/1, kDefaultBindMode);
+  Sockaddr server_addr;
+  ASSERT_OK(server_addr.ParseString(ip1, kPort));
+  ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr));
+  FLAGS_dns_addr_resolution_override = Substitute("$0=$1", kFakeHost, server_addr.ToString());
+
+  shared_ptr<Messenger> client_messenger;
+  ASSERT_OK(CreateMessenger("client_messenger", &client_messenger));
+  DnsResolver dns_resolver(1, 1024 * 1024);
+  Proxy p(client_messenger, kFakeHostPort, &dns_resolver,
+          CalculatorService::static_service_name());
+  p.Init();
+  ASSERT_OK(SendRequest(&p));
+
+  string ip2 = GetBindIpForDaemon(/*index*/2, kDefaultBindMode);
+  Sockaddr second_addr;
+  ASSERT_OK(second_addr.ParseString(ip2, kPort));
+  ASSERT_OK(StartTestServerWithGeneratedCode(&second_addr));
+  FLAGS_dns_addr_resolution_override = Substitute("$0=$1", kFakeHost, second_addr.ToString());
+  ASSERT_OK(SendRequest(&p));
+}
+
+TEST_F(RpcProxyTest, TestProxyReresolvesAddressFromThreads) {
+  constexpr const int kNumThreads = 4;
+
+  string ip1 = GetBindIpForDaemon(/*index*/1, kDefaultBindMode);
+  Sockaddr server_addr;
+  ASSERT_OK(server_addr.ParseString(ip1, kPort));
+  ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr));
+  FLAGS_dns_addr_resolution_override = Substitute("$0=$1", kFakeHost, server_addr.ToString());
+
+  shared_ptr<Messenger> client_messenger;
+  ASSERT_OK(CreateMessenger("client_messenger", &client_messenger));
+  DnsResolver dns_resolver(1, 1024 * 1024);
+  Proxy p(client_messenger, kFakeHostPort, &dns_resolver,
+          CalculatorService::static_service_name());
+  p.Init();
+  ASSERT_OK(SendRequest(&p));
+
+  string ip2 = GetBindIpForDaemon(/*index*/2, kDefaultBindMode);
+  Sockaddr second_addr;
+  ASSERT_OK(second_addr.ParseString(ip2, kPort));
+  ASSERT_OK(StartTestServerWithGeneratedCode(&second_addr));
+  FLAGS_dns_addr_resolution_override = Substitute("$0=$1", kFakeHost, second_addr.ToString());
+
+  vector<Status> errors(kNumThreads);
+  vector<thread> threads;
+  threads.reserve(kNumThreads);
+  for (int i = 0; i < kNumThreads; i++) {
+    threads.emplace_back([&, i] {
+      errors[i] = SendRequest(&p);
+    });
+  }
+  for (auto& t : threads) {
+    t.join();
+  }
+  for (const auto& e : errors) {
+    EXPECT_OK(e);
+  }
+}
+
+} // namespace rpc
+} // namespace kudu
diff --git a/src/kudu/rpc/proxy.cc b/src/kudu/rpc/proxy.cc
index 8bd45fb..8d810b8 100644
--- a/src/kudu/rpc/proxy.cc
+++ b/src/kudu/rpc/proxy.cc
@@ -21,9 +21,11 @@
 #include <iostream>
 #include <memory>
 #include <utility>
+#include <vector>
 
 #include <glog/logging.h>
 
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/outbound_call.h"
@@ -31,6 +33,8 @@
 #include "kudu/rpc/response_callback.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/user_credentials.h"
+#include "kudu/util/net/dns_resolver.h"
+#include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/notification.h"
 #include "kudu/util/status.h"
@@ -39,6 +43,9 @@
 using google::protobuf::Message;
 using std::string;
 using std::shared_ptr;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
 
 namespace kudu {
 namespace rpc {
@@ -48,6 +55,7 @@ Proxy::Proxy(std::shared_ptr<Messenger> messenger,
              string hostname,
              string service_name)
     : service_name_(std::move(service_name)),
+      dns_resolver_(nullptr),
       messenger_(std::move(messenger)),
       is_started_(false) {
   CHECK(messenger_ != nullptr);
@@ -67,19 +75,72 @@ Proxy::Proxy(std::shared_ptr<Messenger> messenger,
   conn_id_ = ConnectionId(remote, std::move(hostname), std::move(creds));
 }
 
+Proxy::Proxy(std::shared_ptr<Messenger> messenger,
+             HostPort hp,
+             DnsResolver* dns_resolver,
+             string service_name)
+    : service_name_(std::move(service_name)),
+      hp_(std::move(hp)),
+      dns_resolver_(dns_resolver),
+      messenger_(std::move(messenger)),
+      is_started_(false) {
+  CHECK(messenger_ != nullptr);
+  DCHECK(!service_name_.empty()) << "Proxy service name must not be blank";
+  DCHECK(hp_.Initialized());
+}
+
+Sockaddr* Proxy::GetSingleSockaddr(std::vector<Sockaddr>* addrs) const {
+  DCHECK(!addrs->empty());
+  if (PREDICT_FALSE(addrs->size() > 1)) {
+    LOG(WARNING) << Substitute(
+        "$0 proxy host/port $1 resolves to $2 different addresses. Using $3",
+        service_name_, hp_.ToString(), addrs->size(), (*addrs)[0].ToString());
+  }
+  return &(*addrs)[0];
+}
+
+void Proxy::Init(Sockaddr addr) {
+  if (!dns_resolver_) {
+    return;
+  }
+  // By default, we set the real user to the currently logged-in user.
+  // Effective user and password remain blank.
+  string real_user;
+  Status s = GetLoggedInUser(&real_user);
+  if (!s.ok()) {
+    LOG(WARNING) << "Proxy for " << service_name_ << ": Unable to get logged-in user name: "
+        << s.ToString() << " before connecting to host/port: " << hp_.ToString();
+  }
+  vector<Sockaddr> addrs;
+  if (!addr.is_initialized()) {
+    s = dns_resolver_->ResolveAddresses(hp_, &addrs);
+    if (PREDICT_TRUE(s.ok() && !addrs.empty())) {
+      addr = *GetSingleSockaddr(&addrs);
+      DCHECK(addr.is_initialized());
+      addr.set_port(hp_.port());
+      // NOTE: it's ok to proceed on failure -- the address will remain
+      // uninitialized and be re-resolved when sending the next request.
+    }
+  }
+
+  UserCredentials creds;
+  creds.set_real_user(std::move(real_user));
+  conn_id_ = ConnectionId(addr, hp_.host(), std::move(creds));
+}
+
 Proxy::~Proxy() {
 }
 
-void Proxy::AsyncRequest(const string& method,
-                         const google::protobuf::Message& req,
-                         google::protobuf::Message* response,
-                         RpcController* controller,
-                         const ResponseCallback& callback) const {
-  CHECK(!controller->call_) << "Controller should be reset";
-  base::subtle::NoBarrier_Store(&is_started_, true);
+void Proxy::EnqueueRequest(const string& method,
+                           const google::protobuf::Message& req,
+                           google::protobuf::Message* response,
+                           RpcController* controller,
+                           const ResponseCallback& callback) const {
+  ConnectionId connection = conn_id();
+  DCHECK(connection.remote().is_initialized());
   RemoteMethod remote_method(service_name_, method);
   controller->call_.reset(
-      new OutboundCall(conn_id_, remote_method, response, controller, callback));
+      new OutboundCall(connection, remote_method, response, controller, callback));
   controller->SetRequestParam(req);
   controller->SetMessenger(messenger_.get());
 
@@ -88,11 +149,86 @@ void Proxy::AsyncRequest(const string& method,
   messenger_->QueueOutboundCall(controller->call_);
 }
 
+void Proxy::RefreshDnsAndEnqueueRequest(const std::string& method,
+                                        const google::protobuf::Message& req,
+                                        google::protobuf::Message* response,
+                                        RpcController* controller,
+                                        const ResponseCallback& callback) {
+  DCHECK(!controller->call_);
+  vector<Sockaddr>* addrs = new vector<Sockaddr>();
+  DCHECK_NOTNULL(dns_resolver_)->RefreshAddressesAsync(hp_, addrs,
+      [this, &req, &method, callback, response, controller, addrs] (const Status& s) {
+    unique_ptr<vector<Sockaddr>> unique_addrs(addrs);
+    // If we fail to resolve the address, treat the call as failed.
+    if (!s.ok() || addrs->empty()) {
+      DCHECK(!controller->call_);
+      // NOTE: we need to keep a reference here because the callback may end up
+      // destructing the controller and the outbound call, _while_ the callback
+      // is running from within the call!
+      auto shared_call = std::make_shared<OutboundCall>(
+          conn_id(), RemoteMethod{service_name_, method}, response, controller, callback);
+      controller->call_ = shared_call;
+      controller->call_->SetFailed(s.CloneAndPrepend("failed to refresh physical address"));
+      return;
+    }
+    auto* addr = GetSingleSockaddr(addrs);
+    DCHECK(addr->is_initialized());
+    addr->set_port(hp_.port());
+    {
+      std::lock_guard<simple_spinlock> l(lock_);
+      conn_id_.set_remote(*addr);
+    }
+    EnqueueRequest(method, req, response, controller, callback);
+  });
+}
+
+void Proxy::AsyncRequest(const string& method,
+                         const google::protobuf::Message& req,
+                         google::protobuf::Message* response,
+                         RpcController* controller,
+                         const ResponseCallback& callback) {
+  CHECK(!controller->call_) << "Controller should be reset";
+  base::subtle::NoBarrier_Store(&is_started_, true);
+  if (!dns_resolver_) {
+    EnqueueRequest(method, req, response, controller, callback);
+    return;
+  }
+
+  // If we haven't successfully initialized the remote, e.g. because the DNS
+  // lookup failed, refresh the DNS entry and enqueue the request.
+  bool remote_initialized;
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    remote_initialized = conn_id_.remote().is_initialized();
+  }
+  if (!remote_initialized) {
+    RefreshDnsAndEnqueueRequest(method, req, response, controller, callback);
+    return;
+  }
+
+  // Otherwise, just enqueue the request, but retry if there's a network error,
+  // since it's possible the physical address of the host was changed. We only
+  // retry once more before calling the callback.
+  auto refresh_dns_and_cb = [this, &req, &method,
+                             callback, response, controller] () {
+    // TODO(awong): we should be more specific here -- consider having the RPC
+    // layer set a flag in the controller that warrants a retry.
+    if (PREDICT_FALSE(!controller->status().ok())) {
+      controller->Reset();
+      RefreshDnsAndEnqueueRequest(method, req, response, controller, callback);
+      return;
+    }
+    // For any other status, OK or otherwise, just run the callback.
+    callback();
+  };
+  EnqueueRequest(method, req, response, controller, refresh_dns_and_cb);
+}
+
 
 Status Proxy::SyncRequest(const string& method,
                           const google::protobuf::Message& req,
                           google::protobuf::Message* resp,
-                          RpcController* controller) const {
+                          RpcController* controller) {
   Notification note;
   AsyncRequest(method, req, DCHECK_NOTNULL(resp), controller,
                [&note]() { note.Notify(); });
@@ -113,7 +249,7 @@ void Proxy::set_network_plane(string network_plane) {
 }
 
 std::string Proxy::ToString() const {
-  return strings::Substitute("$0@$1", service_name_, conn_id_.ToString());
+  return Substitute("$0@$1", service_name_, conn_id_.ToString());
 }
 
 } // namespace rpc
diff --git a/src/kudu/rpc/proxy.h b/src/kudu/rpc/proxy.h
index ccf5f18..b0bc512 100644
--- a/src/kudu/rpc/proxy.h
+++ b/src/kudu/rpc/proxy.h
@@ -17,12 +17,16 @@
 #pragma once
 
 #include <memory>
+#include <mutex>
 #include <string>
+#include <vector>
 
 #include "kudu/gutil/atomicops.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/rpc/connection_id.h"
 #include "kudu/rpc/response_callback.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/net/net_util.h"
 #include "kudu/util/status.h"
 
 namespace google {
@@ -33,6 +37,7 @@ class Message;
 
 namespace kudu {
 
+class DnsResolver;
 class Sockaddr;
 
 namespace rpc {
@@ -65,8 +70,27 @@ class Proxy {
         std::string hostname,
         std::string service_name);
 
+  // TODO(awong): consider a separate auto-resolving proxy class?
+  Proxy(std::shared_ptr<Messenger> messenger,
+        HostPort hp,
+        DnsResolver* dns_resolver,
+        std::string service_name);
+
   ~Proxy();
 
+  // If the proxy is configured for address re-resolution (by supplying a
+  // DnsResolver and HostPort in the constructor), performs an initial
+  // resolution of the address using the HostPort. If 'addr' is supplied, it is
+  // used instead of performing resolution (this is useful to initialize
+  // several proxies with a single external DNS resolution).
+  //
+  // Otherwise, this is a no-op.
+  //
+  // NOTE: it is always OK to skip calling this method -- if this proxy is
+  // configured for address re-resolution and this is skipped, the resolution
+  // will happen upon sending the first request.
+  void Init(Sockaddr addr = {});
+
   // Call a remote method asynchronously.
   //
   // Typically, users will not call this directly, but rather through
@@ -97,14 +121,14 @@ class Proxy {
                     const google::protobuf::Message& req,
                     google::protobuf::Message* resp,
                     RpcController* controller,
-                    const ResponseCallback& callback) const;
+                    const ResponseCallback& callback);
 
   // The same as AsyncRequest(), except that the call blocks until the call
   // finishes. If the call fails, returns a non-OK result.
   Status SyncRequest(const std::string& method,
                      const google::protobuf::Message& req,
                      google::protobuf::Message* resp,
-                     RpcController* controller) const;
+                     RpcController* controller);
 
   // Set the user credentials which should be used to log in.
   void set_user_credentials(UserCredentials user_credentials);
@@ -121,9 +145,48 @@ class Proxy {
   std::string ToString() const;
 
  private:
+  // Asynchronously refreshes the DNS, enqueueing the given request upon
+  // success, or failing the call and calling the callback upon failure.
+  void RefreshDnsAndEnqueueRequest(const std::string& method,
+                                   const google::protobuf::Message& req,
+                                   google::protobuf::Message* response,
+                                   RpcController* controller,
+                                   const ResponseCallback& callback);
+
+  // Queues the given request as an outbound call using the given messenger,
+  // controller, and response.
+  void EnqueueRequest(const std::string& method,
+                      const google::protobuf::Message& req,
+                      google::protobuf::Message* response,
+                      RpcController* controller,
+                      const ResponseCallback& callback) const;
+
+  // Returns a single Sockaddr from the 'addrs', logging a warning if there is
+  // more than one to choose from.
+  Sockaddr* GetSingleSockaddr(std::vector<Sockaddr>* addrs) const;
+
+  ConnectionId conn_id() const {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return conn_id_;
+  }
+
   const std::string service_name_;
+  HostPort hp_;
+  DnsResolver* dns_resolver_;
   std::shared_ptr<Messenger> messenger_;
+
+  // TODO(awong): consider implementing some lock-free list of ConnectionIds
+  // instead of taking a lock every time we want to get the "current"
+  // ConnectionId.
+  //
+  // Connection ID used by this proxy. Once the proxy has started sending
+  // requests, the connection ID may be updated in response to calls (e.g. if
+  // we re-resolved the physical address in response to an invalid DNS entry).
+  // As such, 'conn_id_' is protected by 'lock_', and should be copied and
+  // passed around, rather than used directly
+  mutable simple_spinlock lock_;
   ConnectionId conn_id_;
+
   mutable Atomic32 is_started_;
 
   DISALLOW_COPY_AND_ASSIGN(Proxy);
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index 4a87b0f..602fa6e 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -467,8 +467,8 @@ class RpcTestBase : public KuduTest {
     return bld.Build(messenger);
   }
 
-  Status DoTestSyncCall(const Proxy &p, const char *method,
-                        CredentialsPolicy policy = CredentialsPolicy::ANY_CREDENTIALS) {
+  static Status DoTestSyncCall(Proxy* p, const char *method,
+                               CredentialsPolicy policy = CredentialsPolicy::ANY_CREDENTIALS) {
     AddRequestPB req;
     req.set_x(rand());
     req.set_y(rand());
@@ -476,13 +476,13 @@ class RpcTestBase : public KuduTest {
     RpcController controller;
     controller.set_timeout(MonoDelta::FromMilliseconds(10000));
     controller.set_credentials_policy(policy);
-    RETURN_NOT_OK(p.SyncRequest(method, req, &resp, &controller));
+    RETURN_NOT_OK(p->SyncRequest(method, req, &resp, &controller));
 
     CHECK_EQ(req.x() + req.y(), resp.result());
     return Status::OK();
   }
 
-  void DoTestSidecar(const Proxy &p, int size1, int size2) {
+static void DoTestSidecar(Proxy* p, int size1, int size2) {
     const uint32_t kSeed = 12345;
 
     SendTwoStringsRequestPB req;
@@ -493,8 +493,8 @@ class RpcTestBase : public KuduTest {
     SendTwoStringsResponsePB resp;
     RpcController controller;
     controller.set_timeout(MonoDelta::FromMilliseconds(10000));
-    CHECK_OK(p.SyncRequest(GenericCalculatorService::kSendTwoStringsMethodName,
-                           req, &resp, &controller));
+    CHECK_OK(p->SyncRequest(GenericCalculatorService::kSendTwoStringsMethodName,
+                            req, &resp, &controller));
 
     Slice first = GetSidecarPointer(controller, resp.sidecar1(), size1);
     Slice second = GetSidecarPointer(controller, resp.sidecar2(), size2);
@@ -510,11 +510,11 @@ class RpcTestBase : public KuduTest {
     CHECK_EQ(Slice(expected), second);
   }
 
-  static Status DoTestOutgoingSidecar(const Proxy &p, int size1, int size2) {
+  static Status DoTestOutgoingSidecar(Proxy* p, int size1, int size2) {
     return DoTestOutgoingSidecar(p, {std::string(size1, 'a'), std::string(size2, 'b')});
   }
 
-  static Status DoTestOutgoingSidecar(const Proxy& p, const std::vector<std::string>& strings) {
+  static Status DoTestOutgoingSidecar(Proxy* p, const std::vector<std::string>& strings) {
     PushStringsRequestPB request;
     RpcController controller;
 
@@ -525,8 +525,8 @@ class RpcTestBase : public KuduTest {
     }
 
     PushStringsResponsePB resp;
-    KUDU_RETURN_NOT_OK(p.SyncRequest(GenericCalculatorService::kPushStringsMethodName,
-                                     request, &resp, &controller));
+    KUDU_RETURN_NOT_OK(p->SyncRequest(GenericCalculatorService::kPushStringsMethodName,
+                                      request, &resp, &controller));
     for (int i = 0; i < strings.size(); i++) {
       CHECK_EQ(strings[i].size(), resp.sizes(i));
       CHECK_EQ(crc::Crc32c(strings[i].data(), strings[i].size()),
@@ -536,11 +536,11 @@ class RpcTestBase : public KuduTest {
     return Status::OK();
   }
 
-  void DoTestOutgoingSidecarExpectOK(const Proxy &p, int size1, int size2) {
+  static void DoTestOutgoingSidecarExpectOK(Proxy* p, int size1, int size2) {
     CHECK_OK(DoTestOutgoingSidecar(p, size1, size2));
   }
 
-  static void DoTestExpectTimeout(const Proxy& p,
+  static void DoTestExpectTimeout(Proxy* p,
                                   const MonoDelta& timeout,
                                   bool will_be_cancelled = false,
                                   bool* is_negotiaton_error = nullptr) {
@@ -554,7 +554,7 @@ class RpcTestBase : public KuduTest {
     c.set_timeout(timeout);
     Stopwatch sw;
     sw.start();
-    Status s = p.SyncRequest(GenericCalculatorService::kSleepMethodName, req, &resp, &c);
+    Status s = p->SyncRequest(GenericCalculatorService::kSleepMethodName, req, &resp, &c);
     sw.stop();
     ASSERT_FALSE(s.ok());
     if (is_negotiaton_error != nullptr) {
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index a3eaa04..7a3c657 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -207,7 +207,7 @@ TEST_P(TestRpc, TestNegotiationDeadlock) {
 
   Proxy p(messenger, server_addr, kRemoteHostName,
           GenericCalculatorService::static_service_name());
-  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
 }
 
 // Test making successful RPC calls.
@@ -227,7 +227,7 @@ TEST_P(TestRpc, TestCall) {
                                                expected_remote_str(server_addr)));
 
   for (int i = 0; i < 10; i++) {
-    ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+    ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
   }
 }
 
@@ -259,7 +259,7 @@ TEST_P(TestRpc, TestCallWithChainCertAndChainCA) {
                                                             "{remote=$0, user_credentials=",
                                                         expected_remote_str(server_addr)));
 
-  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
 }
 
 // Test for KUDU-2041.
@@ -290,7 +290,7 @@ TEST_P(TestRpc, TestCallWithChainCertAndRootCA) {
                                                             "{remote=$0, user_credentials=",
                                                         expected_remote_str(server_addr)));
 
-  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
 }
 
 // Test making successful RPC calls while using a TLS certificate with a password protected
@@ -326,7 +326,7 @@ TEST_P(TestRpc, TestCallWithPasswordProtectedKey) {
                                                             "{remote=$0, user_credentials=",
                                                         expected_remote_str(server_addr)));
 
-  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
 }
 
 // Test that using a TLS certificate with a password protected private key and providing
@@ -368,7 +368,7 @@ TEST_P(TestRpc, TestCallToBadServer) {
   // Loop a few calls to make sure that we properly set up and tear down
   // the connections.
   for (int i = 0; i < 5; i++) {
-    Status s = DoTestSyncCall(p, GenericCalculatorService::kAddMethodName);
+    Status s = DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName);
     LOG(INFO) << "Status: " << s.ToString();
     ASSERT_TRUE(s.IsNetworkError()) << "unexpected status: " << s.ToString();
   }
@@ -388,7 +388,7 @@ TEST_P(TestRpc, TestInvalidMethodCall) {
           GenericCalculatorService::static_service_name());
 
   // Call the method which fails.
-  Status s = DoTestSyncCall(p, "ThisMethodDoesNotExist");
+  Status s = DoTestSyncCall(&p, "ThisMethodDoesNotExist");
   ASSERT_TRUE(s.IsRemoteError()) << "unexpected status: " << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "bad method");
 }
@@ -406,7 +406,7 @@ TEST_P(TestRpc, TestWrongService) {
   Proxy p(client_messenger, server_addr, "localhost", "WrongServiceName");
 
   // Call the method which fails.
-  Status s = DoTestSyncCall(p, "ThisMethodDoesNotExist");
+  Status s = DoTestSyncCall(&p, "ThisMethodDoesNotExist");
   ASSERT_TRUE(s.IsRemoteError()) << "unexpected status: " << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(),
                       "Service unavailable: service WrongServiceName "
@@ -415,7 +415,7 @@ TEST_P(TestRpc, TestWrongService) {
   // If the server has been marked as having registered all services, we should
   // expect a "not found" error instead.
   server_messenger_->SetServicesRegistered();
-  s = DoTestSyncCall(p, "ThisMethodDoesNotExist");
+  s = DoTestSyncCall(&p, "ThisMethodDoesNotExist");
   ASSERT_TRUE(s.IsRemoteError()) << "unexpected status: " << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(),
                       "Not found: service WrongServiceName "
@@ -449,7 +449,7 @@ TEST_P(TestRpc, TestHighFDs) {
   ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl()));
   Proxy p(client_messenger, server_addr, kRemoteHostName,
           GenericCalculatorService::static_service_name());
-  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
 }
 
 // Test that connections are kept alive between calls.
@@ -470,7 +470,7 @@ TEST_P(TestRpc, TestConnectionKeepalive) {
   Proxy p(client_messenger, server_addr, kRemoteHostName,
           GenericCalculatorService::static_service_name());
 
-  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
 
   SleepFor(MonoDelta::FromMilliseconds(5));
 
@@ -514,7 +514,7 @@ TEST_P(TestRpc, TestConnectionAlwaysKeepalive) {
   Proxy p(client_messenger, server_addr, kRemoteHostName,
           GenericCalculatorService::static_service_name());
 
-  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
 
   ReactorMetrics metrics;
   ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
@@ -651,7 +651,7 @@ TEST_P(TestRpc, TestReopenOutboundConnections) {
 
   // Run several iterations, just in case.
   for (int i = 0; i < 32; ++i) {
-    ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+    ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
     ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
     ASSERT_EQ(0, metrics.total_client_connections_);
     ASSERT_EQ(i + 1, metrics.total_server_connections_);
@@ -692,7 +692,7 @@ TEST_P(TestRpc, TestCredentialsPolicy) {
   ASSERT_EQ(0, metrics.total_server_connections_);
 
   // Make an RPC call with ANY_CREDENTIALS policy.
-  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
   ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
   ASSERT_EQ(0, metrics.total_client_connections_);
   ASSERT_EQ(1, metrics.total_server_connections_);
@@ -708,7 +708,7 @@ TEST_P(TestRpc, TestCredentialsPolicy) {
   // Make an RPC call with PRIMARY_CREDENTIALS policy. Currently open connection
   // with ANY_CREDENTIALS policy should be closed and a new one established
   // with PRIMARY_CREDENTIALS policy.
-  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName,
+  ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName,
                            CredentialsPolicy::PRIMARY_CREDENTIALS));
   ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
   ASSERT_EQ(0, metrics.total_client_connections_);
@@ -723,7 +723,7 @@ TEST_P(TestRpc, TestCredentialsPolicy) {
   // connection with PRIMARY_CREDENTIALS policy should be re-used because
   // the ANY_CREDENTIALS policy satisfies the PRIMARY_CREDENTIALS policy which
   // the currently open connection has been established with.
-  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
   ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
   ASSERT_EQ(0, metrics.total_client_connections_);
   ASSERT_EQ(2, metrics.total_server_connections_);
@@ -769,7 +769,7 @@ TEST_P(TestRpc, TestConnectionNetworkPlane) {
   ASSERT_EQ(0, metrics.num_client_connections_);
 
   // Make an RPC call with the default network plane.
-  ASSERT_OK(DoTestSyncCall(p1, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p1, GenericCalculatorService::kAddMethodName));
   ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
   ASSERT_EQ(0, metrics.total_client_connections_);
   ASSERT_EQ(1, metrics.total_server_connections_);
@@ -780,7 +780,7 @@ TEST_P(TestRpc, TestConnectionNetworkPlane) {
   ASSERT_EQ(1, metrics.num_client_connections_);
 
   // Make an RPC call with the non-default network plane.
-  ASSERT_OK(DoTestSyncCall(p2, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p2, GenericCalculatorService::kAddMethodName));
   ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
   ASSERT_EQ(0, metrics.total_client_connections_);
   ASSERT_EQ(2, metrics.total_server_connections_);
@@ -792,7 +792,7 @@ TEST_P(TestRpc, TestConnectionNetworkPlane) {
 
   // Make an RPC call with the default network plane again and verify that
   // there are no new connections.
-  ASSERT_OK(DoTestSyncCall(p1, GenericCalculatorService::kAddMethodName));
+  ASSERT_OK(DoTestSyncCall(&p1, GenericCalculatorService::kAddMethodName));
   ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
   ASSERT_EQ(0, metrics.total_client_connections_);
   ASSERT_EQ(2, metrics.total_server_connections_);
@@ -870,18 +870,18 @@ TEST_P(TestRpc, TestRpcSidecar) {
           GenericCalculatorService::static_service_name());
 
   // Test a zero-length sidecar
-  DoTestSidecar(p, 0, 0);
+  DoTestSidecar(&p, 0, 0);
 
   // Test some small sidecars
-  DoTestSidecar(p, 123, 456);
+  DoTestSidecar(&p, 123, 456);
 
   // Test some larger sidecars to verify that we properly handle the case where
   // we can't write the whole response to the socket in a single call.
-  DoTestSidecar(p, 3000 * 1024, 2000 * 1024);
+  DoTestSidecar(&p, 3000 * 1024, 2000 * 1024);
 
-  DoTestOutgoingSidecarExpectOK(p, 0, 0);
-  DoTestOutgoingSidecarExpectOK(p, 123, 456);
-  DoTestOutgoingSidecarExpectOK(p, 3000 * 1024, 2000 * 1024);
+  DoTestOutgoingSidecarExpectOK(&p, 0, 0);
+  DoTestOutgoingSidecarExpectOK(&p, 123, 456);
+  DoTestOutgoingSidecarExpectOK(&p, 3000 * 1024, 2000 * 1024);
 }
 
 // Test sending the maximum number of sidecars, each of them being a single
@@ -903,7 +903,7 @@ TEST_P(TestRpc, TestMaxSmallSidecars) {
   for (auto& s : strings) {
     s = RandomString(2, &rng);
   }
-  ASSERT_OK(DoTestOutgoingSidecar(p, strings));
+  ASSERT_OK(DoTestOutgoingSidecar(&p, strings));
 }
 
 TEST_P(TestRpc, TestRpcSidecarLimits) {
@@ -1008,15 +1008,15 @@ TEST_P(TestRpc, TestCallTimeout) {
   // Test a very short timeout - we expect this will time out while the
   // call is still trying to connect, or in the send queue. This was triggering ASAN failures
   // before.
-  NO_FATALS(DoTestExpectTimeout(p, MonoDelta::FromNanoseconds(1)));
+  NO_FATALS(DoTestExpectTimeout(&p, MonoDelta::FromNanoseconds(1)));
 
   // Test a longer timeout - expect this will time out after we send the request,
   // but shorter than our threshold for two-stage timeout handling.
-  NO_FATALS(DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(200)));
+  NO_FATALS(DoTestExpectTimeout(&p, MonoDelta::FromMilliseconds(200)));
 
   // Test a longer timeout - expect this will trigger the "two-stage timeout"
   // code path.
-  NO_FATALS(DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(1500)));
+  NO_FATALS(DoTestExpectTimeout(&p, MonoDelta::FromMilliseconds(1500)));
 }
 
 // Inject 500ms delay in negotiation, and send a call with a short timeout, followed by
@@ -1035,8 +1035,8 @@ TEST_P(TestRpc, TestCallTimeoutDoesntAffectNegotiation) {
           GenericCalculatorService::static_service_name());
 
   FLAGS_rpc_negotiation_inject_delay_ms = 500;
-  NO_FATALS(DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(50)));
-  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+  NO_FATALS(DoTestExpectTimeout(&p, MonoDelta::FromMilliseconds(50)));
+  ASSERT_OK(DoTestSyncCall(&p, GenericCalculatorService::kAddMethodName));
 
   // Only the second call should have been received by the server, because we
   // don't bother sending an already-timed-out call.
@@ -1079,7 +1079,7 @@ TEST_F(TestRpc, TestNegotiationTimeout) {
 
   bool is_negotiation_error = false;
   NO_FATALS(DoTestExpectTimeout(
-      p, MonoDelta::FromMilliseconds(100), false, &is_negotiation_error));
+      &p, MonoDelta::FromMilliseconds(100), false, &is_negotiation_error));
   EXPECT_TRUE(is_negotiation_error);
 }
 
@@ -1374,14 +1374,14 @@ TEST_P(TestRpc, TestCancellation) {
       case OutboundCall::ON_OUTBOUND_QUEUE:
       case OutboundCall::SENDING:
       case OutboundCall::SENT:
-        ASSERT_TRUE(DoTestOutgoingSidecar(p, 0, 0).IsAborted());
-        ASSERT_TRUE(DoTestOutgoingSidecar(p, 123, 456).IsAborted());
-        ASSERT_TRUE(DoTestOutgoingSidecar(p, 3000 * 1024, 2000 * 1024).IsAborted());
-        DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(timeout_ms), true);
+        ASSERT_TRUE(DoTestOutgoingSidecar(&p, 0, 0).IsAborted());
+        ASSERT_TRUE(DoTestOutgoingSidecar(&p, 123, 456).IsAborted());
+        ASSERT_TRUE(DoTestOutgoingSidecar(&p, 3000 * 1024, 2000 * 1024).IsAborted());
+        DoTestExpectTimeout(&p, MonoDelta::FromMilliseconds(timeout_ms), true);
         break;
       case OutboundCall::NEGOTIATION_TIMED_OUT:
       case OutboundCall::TIMED_OUT:
-        DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(1000));
+        DoTestExpectTimeout(&p, MonoDelta::FromMilliseconds(1000));
         break;
       case OutboundCall::CANCELLED:
         break;
@@ -1399,9 +1399,9 @@ TEST_P(TestRpc, TestCancellation) {
         break;
       }
       case OutboundCall::FINISHED_SUCCESS:
-        DoTestOutgoingSidecarExpectOK(p, 0, 0);
-        DoTestOutgoingSidecarExpectOK(p, 123, 456);
-        DoTestOutgoingSidecarExpectOK(p, 3000 * 1024, 2000 * 1024);
+        DoTestOutgoingSidecarExpectOK(&p, 0, 0);
+        DoTestOutgoingSidecarExpectOK(&p, 123, 456);
+        DoTestOutgoingSidecarExpectOK(&p, 3000 * 1024, 2000 * 1024);
         break;
     }
   }
@@ -1568,7 +1568,7 @@ TEST_P(TestRpc, TestPerformanceBySocketType) {
     Stopwatch sw(Stopwatch::ALL_THREADS);
     sw.start();
     for (int i = 0; i < kNumMb / kMbPerRpc; i++) {
-      DoTestOutgoingSidecar(p, sidecars);
+      DoTestOutgoingSidecar(&p, sidecars);
     }
     sw.stop();
     LOG(INFO) << strings::Substitute(
diff --git a/src/kudu/rpc/rpc_stub-test.cc b/src/kudu/rpc/rpc_stub-test.cc
index 59c7667..d446227 100644
--- a/src/kudu/rpc/rpc_stub-test.cc
+++ b/src/kudu/rpc/rpc_stub-test.cc
@@ -357,7 +357,7 @@ TEST_F(RpcStubTest, TestCallMissingMethod) {
   Proxy p(client_messenger_, server_addr_, server_addr_.host(),
           CalculatorService::static_service_name());
 
-  Status s = DoTestSyncCall(p, "DoesNotExist");
+  Status s = DoTestSyncCall(&p, "DoesNotExist");
   ASSERT_TRUE(s.IsRemoteError()) << "Bad status: " << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "with an invalid method name: DoesNotExist");
 }
diff --git a/src/kudu/util/net/sockaddr.cc b/src/kudu/util/net/sockaddr.cc
index 7653617..d34b8ec 100644
--- a/src/kudu/util/net/sockaddr.cc
+++ b/src/kudu/util/net/sockaddr.cc
@@ -227,6 +227,9 @@ const struct sockaddr_in& Sockaddr::ipv4_addr() const {
 }
 
 std::string Sockaddr::ToString() const {
+  if (!is_initialized()) {
+    return "<uninitialized>";
+  }
   switch (family()) {
     case AF_INET:
       return Substitute("$0:$1", host(), port());
diff --git a/src/kudu/util/net/sockaddr.h b/src/kudu/util/net/sockaddr.h
index e473c8b..6fb9e59 100644
--- a/src/kudu/util/net/sockaddr.h
+++ b/src/kudu/util/net/sockaddr.h
@@ -23,7 +23,7 @@
 
 #include <cstdint>
 #include <string>
-#include <type_traits>
+#include <utility>
 #include <vector>
 
 #include <glog/logging.h>