You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/12/20 19:01:40 UTC

[kudu] branch master updated: KUDU-3357 endpoints for proxied RPCs

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f29b5da5 KUDU-3357 endpoints for proxied RPCs
3f29b5da5 is described below

commit 3f29b5da5f59ea96cfec0608226d5c35740884a6
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Fri Oct 14 14:06:05 2022 -0700

    KUDU-3357 endpoints for proxied RPCs
    
    This patch introduces a solution to the problem outlined in KUDU-3357.
    
    The idea is to establish separate RPC endpoint(s) for Kudu servers to
    handle traffic proxied from external network(s).  So, when a Kudu server
    receives an RPC request, it has enough information to decide whether
    to handle the request as arriving from the internal or some external
    network.  All the communications of Kudu components in the cluster
    should be routed through the standard RPC endpoints, but the requests
    proxied from external networks should be routed through those dedicated
    RPC endpoints.  When a Kudu server receives an RPC through such an
    endpoint, it can substitute internal RPC addresses of Kudu servers with
    corresponding RPC addresses reachable to the client through a TCP proxy.
    
    With that, the following new flags have been introduced, both accepting
    comma-separated list of strings of form '<hostname>:<port>':
    
    --rpc_proxy_advertised_addresses
    
      That's to set the server's RPC endpoints exposed to the external
      network via a TCP proxy.
    
    --rpc_proxied_addresses
    
      That's to define RPC endpoints in the inner network to handle
      RPC requests forwarded/proxied from outside networks.  It's possible
      to use a wildcard for IP address (i.e. 0.0.0.0)
      and the port number (i.e. 0) for the elements of this address list.
    
    The newly introduced --rpc_proxy_advertised_addresses is orthogonal
    to already existing --rpc_advertised_addresses, so it's possible to use
    both simultaneously if the network environment for Docker containers
    in the private internal network is configured in a funny way.
    
    This approach allows for separating the internal and the external
    traffic, meanwhile providing the connectivity for Kudu clients running
    in external networks, where the internal traffic is never routed through
    a proxy's or a loadbalancer's endpoint.  The approach with having only
    --rpc_advertised_addresses for public cloud deployments (referred
    by KUDU-3357) routes _all_ the Kudu traffic through the endpoints
    exposed by the proxy/loadbalancer, and that's the problem this
    patch addresses.
    
    I verified this patch works as expected in k8s environment running in
    AWS/EC2 cloud where Kudu cluster was deployed in a containerized manner
    using Kudu Docker images.  In particular, RPC calls from a client
    running in the external network (I was running it from my laptop behind
    a firewall) were forwarded/proxied via a TCP proxy (NGINX) to Kudu
    servers running in a AWS cluster deployed behind a load balancer.
    I used the "kudu perf loadgen" CLI tool to create tables and write
    data, and "kudu perf table_scan" to read data.  A test Kudu Java client
    application worked as well.
    
    NOTE: even if "kudu cluster ksck" tool also worked, it's not yet a goal
          to be able to run "kudu cluster ksck" and other administrative
          CLI tools from the outside; those tasks are expected to be
          performed from within Kudu cluster's internal network
    
    Follow-up patches should also add:
      * proper advertising of a proxy/loadbalancer endpoint to be forwarded
        to the embedded web server's endpoint for master and tablet servers
      * support for multi-master configurations when forwarding RPCs
        from external networks
    
    Change-Id: Ic300250556d3f6e522a71923bed6aa5cd45375ea
    Reviewed-on: http://gerrit.cloudera.org:8080/19231
    Tested-by: Kudu Jenkins
    Reviewed-by: Attila Bukor <ab...@apache.org>
---
 src/kudu/client/client-test.cc                     |   3 +-
 src/kudu/common/wire_protocol.proto                |  15 ++
 src/kudu/integration-tests/CMakeLists.txt          |   1 +
 .../integration-tests/client-proxied-rpc-test.cc   | 299 +++++++++++++++++++++
 src/kudu/integration-tests/consistency-itest.cc    |   5 +-
 src/kudu/integration-tests/create-table-itest.cc   | 134 ++++++++-
 .../integration-tests/create-table-stress-test.cc  |  16 +-
 src/kudu/integration-tests/registration-test.cc    |  15 +-
 .../integration-tests/table_locations-itest.cc     |   4 +-
 src/kudu/master/auto_leader_rebalancer.cc          |   7 +-
 src/kudu/master/auto_rebalancer.cc                 |   4 +-
 src/kudu/master/catalog_manager.cc                 |  86 ++++--
 src/kudu/master/catalog_manager.h                  |   6 +-
 src/kudu/master/master-test-util.h                 |   3 +-
 src/kudu/master/master-test.cc                     |   4 +-
 src/kudu/master/master.cc                          |  26 +-
 src/kudu/master/master.h                           |  15 +-
 src/kudu/master/master_path_handlers.cc            |  31 ++-
 src/kudu/master/master_service.cc                  |  75 ++++--
 src/kudu/master/ts_descriptor.cc                   |  34 ++-
 src/kudu/master/ts_descriptor.h                    |  13 +-
 src/kudu/server/rpc_server-test.cc                 | 235 +++++++++++++---
 src/kudu/server/rpc_server.cc                      | 170 +++++++++---
 src/kudu/server/rpc_server.h                       |  24 +-
 src/kudu/tserver/heartbeater.cc                    |  33 ++-
 src/kudu/util/net/net_util.cc                      |   9 +-
 src/kudu/util/net/net_util.h                       |  11 +-
 27 files changed, 1107 insertions(+), 171 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index bd7673258..b9f9c2226 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -386,7 +386,8 @@ class ClientTest : public KuduTest {
         cluster_->mini_master()->master()->catalog_manager();
     CatalogManager::ScopedLeaderSharedLock l(catalog);
     CHECK_OK(l.first_failed_status());
-    CHECK_OK(catalog->GetTableLocations(&req, &resp, /*user=*/nullopt));
+    CHECK_OK(catalog->GetTableLocations(
+        &req, &resp, /*use_external_addr=*/false, /*user=*/nullopt));
     CHECK(resp.tablet_locations_size() > 0);
     return resp.tablet_locations(0).tablet_id();
   }
diff --git a/src/kudu/common/wire_protocol.proto b/src/kudu/common/wire_protocol.proto
index c019eab9c..29feeaaa9 100644
--- a/src/kudu/common/wire_protocol.proto
+++ b/src/kudu/common/wire_protocol.proto
@@ -87,6 +87,21 @@ message NodeInstancePB {
 message ServerRegistrationPB {
   repeated HostPortPB rpc_addresses = 1;
   repeated HostPortPB http_addresses = 2;
+
+  // Addresses of this server's RPC endpoints advertised at a TCP proxy.
+  // It's assumed the proxy forwards RPC requests from the specified addresses
+  // to a dedicated RPC endpoint so that the server knows that requests came
+  // from the outside, and it can process it accordingly (e.g., transforming
+  // the internal addresses to be reachable from the outside via the proxied
+  // endpoints, etc.).
+  repeated HostPortPB rpc_proxy_addresses = 7;
+
+  // Addresses of this server's embedded web server HTTP/HTTPS endpoints
+  // advertised at a TCP (HTTP?) proxy. It's assumed the proxy forwards HTTP
+  // requests from the specified addresses to the addresses specified in the
+  // 'http_addresses' field.
+  repeated HostPortPB http_proxy_addresses = 8;
+
   optional string software_version = 3;
 
   // True if HTTPS has been enabled for the web interface.
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index 0738ea66a..39b76ac41 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -67,6 +67,7 @@ ADD_KUDU_TEST(catalog_manager_tsk-itest PROCESSORS 2)
 ADD_KUDU_TEST(varchar-itest)
 ADD_KUDU_TEST(client_failover-itest)
 ADD_KUDU_TEST(client-negotiation-failover-itest)
+ADD_KUDU_TEST(client-proxied-rpc-test)
 ADD_KUDU_TEST(consensus_peer_health_status-itest)
 ADD_KUDU_TEST(consistency-itest PROCESSORS 5)
 ADD_KUDU_TEST(create-table-itest PROCESSORS 3)
diff --git a/src/kudu/integration-tests/client-proxied-rpc-test.cc b/src/kudu/integration-tests/client-proxied-rpc-test.cc
new file mode 100644
index 000000000..932a8b853
--- /dev/null
+++ b/src/kudu/integration-tests/client-proxied-rpc-test.cc
@@ -0,0 +1,299 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <csignal>
+#include <cstdint>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/client/client.h"
+#include "kudu/client/scan_batch.h"
+#include "kudu/client/schema.h"
+#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
+#include "kudu/client/write_op.h"
+#include "kudu/common/wire_protocol-test-util.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/data_gen_util.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/util/env.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/random.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/subprocess.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using kudu::Fifo;
+using kudu::client::KuduClient;
+using kudu::client::KuduInsert;
+using kudu::client::KuduScanBatch;
+using kudu::client::KuduScanner;
+using kudu::client::KuduSchema;
+using kudu::client::KuduSession;
+using kudu::client::KuduTable;
+using kudu::client::KuduTableAlterer;
+using kudu::client::KuduTabletServer;
+using kudu::cluster::ExternalMiniCluster;
+using kudu::cluster::ExternalMiniClusterOptions;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+class ClientProxiedRpcTest : public KuduTest {
+ public:
+  void SetUp() override {
+
+    KuduTest::SetUp();
+
+    ASSERT_OK(GetRandomPort(kIpAddr, &m_proxy_advertised_port_));
+    m_proxy_advertised_addr_ = HostPort(kIpAddr, m_proxy_advertised_port_);
+    ASSERT_OK(GetRandomPort(kIpAddr, &m_proxied_port_));
+    m_proxied_addr_ = HostPort(kIpAddr, m_proxied_port_);
+
+    ASSERT_OK(GetRandomPort(kIpAddr, &t_proxy_advertised_port_));
+    t_proxy_advertised_addr_ = HostPort(kIpAddr, t_proxy_advertised_port_);
+    ASSERT_OK(GetRandomPort(kIpAddr, &t_proxied_port_));
+    t_proxied_addr_ = HostPort(kIpAddr, t_proxied_port_);
+
+    ExternalMiniClusterOptions opts;
+    opts.extra_master_flags = {
+      Substitute("--rpc_proxy_advertised_addresses=$0",
+          m_proxy_advertised_addr_.ToString()),
+      Substitute("--rpc_proxied_addresses=$0",
+          m_proxied_addr_.ToString()),
+    };
+    opts.extra_tserver_flags = {
+      Substitute("--rpc_proxy_advertised_addresses=$0",
+          t_proxy_advertised_addr_.ToString()),
+      Substitute("--rpc_proxied_addresses=$0",
+          t_proxied_addr_.ToString()),
+    };
+
+    cluster_.reset(new ExternalMiniCluster(std::move(opts)));
+    ASSERT_OK(cluster_->Start());
+  }
+
+  void TearDown() override {
+    if (cluster_) {
+      cluster_->Shutdown();
+    }
+    KuduTest::TearDown();
+  }
+
+ protected:
+  static constexpr const char* const kIpAddr = "127.0.0.1";
+
+  uint16_t m_proxied_port_;
+  HostPort m_proxied_addr_;
+  uint16_t m_proxy_advertised_port_;
+  HostPort m_proxy_advertised_addr_;
+
+  uint16_t t_proxied_port_;
+  HostPort t_proxied_addr_;
+  uint16_t t_proxy_advertised_port_;
+  HostPort t_proxy_advertised_addr_;
+
+  unique_ptr<ExternalMiniCluster> cluster_;
+};
+
+// Verify basic functionality when RPC connections to Kudu master and tablet
+// server are forwarded via a simple TCP proxy.
+TEST_F(ClientProxiedRpcTest, Basic) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  string nc;
+  {
+    auto s = FindExecutable("nc", {"/bin", "/usr/bin", "/usr/local/bin"}, &nc);
+    if (s.IsNotFound()) {
+      LOG(WARNING) << "test is skipped: could not find netcat utility (nc)";
+      GTEST_SKIP();
+    }
+    ASSERT_OK(s);
+  }
+  ASSERT_FALSE(nc.empty());
+
+  const auto kTimeout = MonoDelta::FromSeconds(5);
+  constexpr const char* const kTableName = "proxied_rpc_test";
+  constexpr const char* const kProxyCmdPattern =
+      "trap \"kill %1\" EXIT; $0 -knv -l $1 $2 <$4 | $0 -nv $1 $3 >$4";
+  const auto schema = KuduSchema::FromSchema(GetSimpleTestSchema());
+  TestWorkload w(cluster_.get());
+  w.set_schema(schema);
+  w.set_table_name(kTableName);
+  w.set_num_replicas(1);
+  w.Setup();
+
+  unique_ptr<Fifo> m_fifo;
+  ASSERT_OK(env_->NewFifo(JoinPathSegments(test_dir_, "m.fifo"), &m_fifo));
+
+  unique_ptr<Fifo> t_fifo;
+  ASSERT_OK(env_->NewFifo(JoinPathSegments(test_dir_, "t.fifo"), &t_fifo));
+
+  // Run TCP proxy for Kudu master connections.
+  const auto m_proxy_cmd_str = Substitute(
+      kProxyCmdPattern, nc,
+      kIpAddr, m_proxy_advertised_port_, m_proxied_port_, m_fifo->filename());
+  Subprocess m_proxy({"/bin/bash", "-c", m_proxy_cmd_str});
+  ASSERT_OK(m_proxy.Start());
+  auto m_proxy_cleanup = MakeScopedCleanup([&] {
+    m_proxy.KillAndWait(SIGTERM);
+  });
+
+  // Run TCP proxy for Kudu tablet server connections.
+  const auto t_proxy_cmd_str = Substitute(
+      kProxyCmdPattern, nc,
+      kIpAddr, t_proxy_advertised_port_, t_proxied_port_, t_fifo->filename());
+  Subprocess t_proxy({"/bin/bash", "-c", t_proxy_cmd_str});
+  ASSERT_OK(t_proxy.Start());
+  auto t_proxy_cleanup = MakeScopedCleanup([&] {
+    t_proxy.KillAndWait(SIGTERM);
+  });
+
+  // Wait for the TCP proxies to start up.
+  ASSERT_OK(WaitForTcpBindAtPort({ kIpAddr }, m_proxy_advertised_port_, kTimeout));
+  ASSERT_OK(WaitForTcpBindAtPort({ kIpAddr }, t_proxy_advertised_port_, kTimeout));
+
+  // Build a client to send requests via the proxied RPC endpoint.
+  client::sp::shared_ptr<client::KuduClient> client;
+  {
+    client::KuduClientBuilder b;
+    b.add_master_server_addr(m_proxy_advertised_addr_.ToString());
+    b.default_admin_operation_timeout(kTimeout);
+    b.default_rpc_timeout(kTimeout);
+    ASSERT_OK(b.Build(&client));
+  }
+
+  // Make sure the client receives proxy advertised addresses since the request
+  // came to the proxied RPC address.
+  const auto& master_addresses = client->GetMasterAddresses();
+  ASSERT_EQ(m_proxy_advertised_addr_.ToString(), master_addresses);
+  // Just a sanity check: multiple RPC endpoints shouldn't be treated as
+  // a presence of multiple masters in the cluster.
+  ASSERT_FALSE(client->IsMultiMaster());
+
+  // Check that client sees RPC addresses advertised by TCP proxy for the
+  // tablet server.
+  vector<KuduTabletServer*> tss;
+  ElementDeleter deleter(&tss);
+  ASSERT_OK(client->ListTabletServers(&tss));
+  ASSERT_EQ(1, tss.size());
+  ASSERT_EQ(kIpAddr, tss[0]->hostname());
+  ASSERT_EQ(t_proxy_advertised_port_, tss[0]->port());
+
+  client::sp::shared_ptr<KuduTable> table;
+  ASSERT_OK(client->OpenTable(kTableName, &table));
+
+  // Create a session and explicitly set the flush mode to AUTO_FLUSH_SYNC
+  // to send every operation when calling Apply().
+  client::sp::shared_ptr<KuduSession> session(client->NewSession());
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+  ThreadSafeRandom rng(SeedRandom());
+  for (auto i = 0; i < 10; ++i) {
+    unique_ptr<KuduInsert> insert(table->NewInsert());
+    auto* row = insert->mutable_row();
+    GenerateDataForRow(schema, i, &rng, row);
+    ASSERT_OK(session->Apply(insert.release()));
+  }
+  // Call Flush() just in case, but it's a no-op effectively since the chosen
+  // session flush mode.
+  ASSERT_OK(session->Flush());
+
+  // Read the data back.
+  {
+    KuduScanner scanner(table.get());
+    ASSERT_OK(scanner.SetFaultTolerant());
+    ASSERT_OK(scanner.Open());
+    ASSERT_TRUE(scanner.HasMoreRows());
+    KuduScanBatch batch;
+
+    int32_t idx = 0;
+    while (scanner.HasMoreRows()) {
+      ASSERT_OK(scanner.NextBatch(&batch));
+      for (const auto& row : batch) {
+        int32_t value;
+        ASSERT_OK(row.GetInt32(0, &value));
+        ASSERT_EQ(idx++, value);
+      }
+    }
+    ASSERT_EQ(10, idx);
+  }
+
+  // Make sure the client indeed works through the RPC address advertised by
+  // proxy: stop the proxy and check if client could write any data to the table.
+  t_proxy_cleanup.cancel();
+  ASSERT_OK(t_proxy.KillAndWait(SIGTERM));
+  {
+    unique_ptr<KuduInsert> insert(table->NewInsert());
+    auto* row = insert->mutable_row();
+    GenerateDataForRow(schema, 100, &rng, row);
+    const auto s = session->Apply(insert.release());
+    ASSERT_TRUE(s.IsIOError()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
+  }
+
+  // Try reading the data now: this expected to fail since the client works only
+  // through the advertised addressed.
+  {
+    KuduScanner scanner(table.get());
+    ASSERT_OK(scanner.SetFaultTolerant());
+    ASSERT_OK(scanner.SetTimeoutMillis(kTimeout.ToMilliseconds()));
+    const auto s = scanner.Open();
+    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+    ASSERT_STR_MATCHES(s.ToString(),
+        "(timed out after deadline expired|exceeded configured scan timeout)");
+  }
+
+  // Meanwhile, DDL operations should be still possible: connections to master
+  // are still being proxied as needed, and master and tablet servers
+  // communicate via standard, non-proxied RPC endpoints.
+  {
+    unique_ptr<KuduTableAlterer> alt(client->NewTableAlterer(kTableName));
+    alt->AlterColumn("string_val")->RenameTo("str_val");
+    ASSERT_OK(alt->Alter());
+  }
+
+  // Make sure the client communicates with master via the advertised addresses:
+  // once the TCP proxy is shut down, client should not be able to reach master
+  // to perform a DDL operation.
+  m_proxy_cleanup.cancel();
+  ASSERT_OK(m_proxy.KillAndWait(SIGTERM));
+  {
+    unique_ptr<KuduTableAlterer> alt(client->NewTableAlterer(kTableName));
+    alt->AlterColumn("str_val")->RenameTo("string_val");
+    const auto s = alt->Alter();
+    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "AlterTable passed its deadline");
+    ASSERT_STR_CONTAINS(s.ToString(), "Client connection negotiation failed");
+  }
+}
+
+} // namespace kudu
diff --git a/src/kudu/integration-tests/consistency-itest.cc b/src/kudu/integration-tests/consistency-itest.cc
index 0ed345179..16e8d4a7a 100644
--- a/src/kudu/integration-tests/consistency-itest.cc
+++ b/src/kudu/integration-tests/consistency-itest.cc
@@ -21,6 +21,7 @@
 #include <optional>
 #include <string>
 #include <thread>
+#include <type_traits>
 #include <vector>
 
 #include <gflags/gflags_declare.h>
@@ -40,7 +41,6 @@
 #include "kudu/clock/clock.h"
 #include "kudu/clock/hybrid_clock.h"
 #include "kudu/clock/mock_ntp.h"
-#include "kudu/clock/time_service.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/ref_counted.h"
@@ -248,7 +248,8 @@ class ConsistencyITest : public MiniClusterITestBase {
     GetTableLocationsResponsePB resp;
     CatalogManager::ScopedLeaderSharedLock l(catalog);
     RETURN_NOT_OK(l.first_failed_status());
-    RETURN_NOT_OK(catalog->GetTableLocations(&req, &resp, /*user=*/std::nullopt));
+    RETURN_NOT_OK(catalog->GetTableLocations(
+        &req, &resp, /*use_external_addr=*/false, /*user=*/std::nullopt));
     if (resp.tablet_locations_size() < 1) {
       return Status::NotFound(Substitute("$0: no tablets for key", key_value));
     }
diff --git a/src/kudu/integration-tests/create-table-itest.cc b/src/kudu/integration-tests/create-table-itest.cc
index 71a1b48e0..b0969ae47 100644
--- a/src/kudu/integration-tests/create-table-itest.cc
+++ b/src/kudu/integration-tests/create-table-itest.cc
@@ -26,6 +26,7 @@
 #include <set>
 #include <string>
 #include <thread>
+#include <type_traits>
 #include <unordered_map>
 #include <utility>
 #include <vector>
@@ -53,21 +54,20 @@
 #include "kudu/util/atomic.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
-namespace kudu {
-class HostPort;
-}  // namespace kudu
-
 using std::multimap;
 using std::set;
 using std::string;
 using std::thread;
 using std::unique_ptr;
 using std::vector;
+using strings::Substitute;
 
 METRIC_DECLARE_entity(server);
 METRIC_DECLARE_histogram(handler_latency_kudu_tserver_TabletServerAdminService_CreateTablet);
@@ -162,7 +162,7 @@ TEST_F(CreateTableITest, TestCreateWhenMajorityOfReplicasFailCreation) {
         tablet_num_replica_map[tablet_id]++;
       }
     }
-    LOG(INFO) << strings::Substitute(
+    LOG(INFO) << Substitute(
         "Waiting for only one tablet to be left with $0 replicas. Currently have: $1 tablet(s)",
         kNumReplicas, tablet_num_replica_map.size());
     ASSERT_EQ(1, tablet_num_replica_map.size());
@@ -501,4 +501,128 @@ TEST_F(CreateTableITest, TestCreateTableWithDeadTServers) {
   }
 }
 
+// Make sure it's possible to create a table when using proxied RPC addresses,
+// and the result table locations point to the addresses set to be advertised
+// by a TCP proxy correspondingly.
+TEST_F(CreateTableITest, ProxyAdvertisedAddresses) {
+  constexpr const char* const kIpAddr = "127.0.0.1";
+  constexpr int kNumServers = 1;
+  constexpr int kNumTablets = 2;
+  const MonoDelta timeout = MonoDelta::FromSeconds(10);
+
+  uint16_t m_port = 0;
+  ASSERT_OK(GetRandomPort(kIpAddr, &m_port));
+  const HostPort m_proxied_addr(kIpAddr, m_port);
+
+  uint16_t t_port = 0;
+  ASSERT_OK(GetRandomPort(kIpAddr, &t_port));
+  const HostPort t_proxied_addr(kIpAddr, t_port);
+
+  const string m_proxy_advertised_address = "kudu.proxy.io:333";
+
+  const uint16_t t_proxy_advertised_port = 888;
+  const string t_proxy_advertised_host = "kudu.proxy.io";
+  const string t_proxy_advertised_addr = Substitute(
+      "$0:$1", t_proxy_advertised_host, t_proxy_advertised_port);
+
+  uint16_t t_bind_port = 0;
+  ASSERT_OK(GetRandomPort(kIpAddr, &t_bind_port));
+  const HostPort t_bind_addr(kIpAddr, t_bind_port);
+
+  const vector<string> master_flags = {
+    Substitute("--rpc_proxy_advertised_addresses=$0", m_proxy_advertised_address),
+    Substitute("--rpc_proxied_addresses=$0", m_proxied_addr.ToString()),
+  };
+  const vector<string> ts_flags = {
+    Substitute("--rpc_proxy_advertised_addresses=$0", t_proxy_advertised_addr),
+    Substitute("--rpc_proxied_addresses=$0", t_proxied_addr.ToString()),
+    Substitute("--rpc_bind_addresses=$0", t_bind_addr.ToString()),
+  };
+  NO_FATALS(StartCluster(ts_flags, master_flags, kNumServers));
+
+  // Build a client to send requests via the proxied RPC endpoint.
+  client::KuduClientBuilder builder;
+  builder.add_master_server_addr(m_proxied_addr.ToString());
+  client::sp::shared_ptr<client::KuduClient> c_ext;
+  ASSERT_OK(builder.Build(&c_ext));
+
+  // Create table using Kudu client sending requests as if they were proxied
+  // from outside.
+  unique_ptr<client::KuduTableCreator> table_creator(c_ext->NewTableCreator());
+  auto client_schema = KuduSchema::FromSchema(GetSimpleTestSchema());
+  ASSERT_OK(table_creator->table_name(kTableName)
+            .schema(&client_schema)
+            .set_range_partition_columns({ "key" })
+            .num_replicas(1)
+            .add_hash_partitions({ "key" }, kNumTablets)
+            .Create());
+
+  // Make sure the client receives proxy advertised addresses since the request
+  // came to the proxied RPC address.
+  const auto& master_addresses = c_ext->GetMasterAddresses();
+  ASSERT_EQ(m_proxy_advertised_address, master_addresses);
+
+  // Get information on table's locations via standard RPC endpoint.
+  {
+    master::GetTableLocationsRequestPB req;
+    req.set_intern_ts_infos_in_response(true);
+    req.mutable_table()->set_table_name(kTableName);
+
+    rpc::RpcController rpc;
+    rpc.set_timeout(timeout);
+    master::GetTableLocationsResponsePB resp;
+    auto mp = cluster_->master_proxy();
+    ASSERT_OK(mp->GetTableLocations(req, &resp, &rpc));
+
+    ASSERT_EQ(kNumTablets, resp.tablet_locations().size());
+    for (const auto& loc : resp.tablet_locations()) {
+      ASSERT_EQ(kNumServers, loc.interned_replicas_size());
+    }
+
+    ASSERT_EQ(1, resp.ts_infos_size());
+    const auto& ts_info = resp.ts_infos(0);
+    ASSERT_EQ(1, ts_info.rpc_addresses_size());
+    const auto& hp = ts_info.rpc_addresses(0);
+    ASSERT_TRUE(hp.has_host());
+    ASSERT_EQ(t_bind_addr.host(), hp.host());
+    ASSERT_TRUE(hp.has_port());
+    ASSERT_EQ(t_bind_addr.port(), hp.port());
+  }
+
+  // Get information on table's locations via endpoint for proxied RPCs.
+  {
+    Sockaddr ma;
+    ma.ParseFromNumericHostPort(m_proxied_addr);
+    auto mp = std::make_shared<master::MasterServiceProxy>(
+        cluster_->messenger(), ma, ma.host());
+
+    master::GetTableLocationsRequestPB req;
+    req.set_intern_ts_infos_in_response(true);
+    req.mutable_table()->set_table_name(kTableName);
+
+    rpc::RpcController rpc;
+    rpc.set_timeout(timeout);
+    master::GetTableLocationsResponsePB resp;
+    ASSERT_OK(mp->GetTableLocations(req, &resp, &rpc));
+
+    ASSERT_EQ(kNumTablets, resp.tablet_locations().size());
+    for (const auto& loc : resp.tablet_locations()) {
+      ASSERT_EQ(kNumServers, loc.interned_replicas_size());
+    }
+
+    ASSERT_EQ(1, resp.ts_infos_size());
+    const auto& ts_info = resp.ts_infos(0);
+    ASSERT_EQ(1, ts_info.rpc_addresses_size());
+    const auto& hp = ts_info.rpc_addresses(0);
+    ASSERT_TRUE(hp.has_host());
+    ASSERT_EQ(t_proxy_advertised_host, hp.host());
+    ASSERT_TRUE(hp.has_port());
+    ASSERT_EQ(t_proxy_advertised_port, hp.port());
+  }
+
+  // Delete the created table using the client instance communicating with
+  // the cluster through regular RPC endpoints.
+  ASSERT_OK(client_->DeleteTable(kTableName));
+}
+
 } // namespace kudu
diff --git a/src/kudu/integration-tests/create-table-stress-test.cc b/src/kudu/integration-tests/create-table-stress-test.cc
index 069ab0a63..f1572ddcf 100644
--- a/src/kudu/integration-tests/create-table-stress-test.cc
+++ b/src/kudu/integration-tests/create-table-stress-test.cc
@@ -21,6 +21,7 @@
 #include <optional>
 #include <string>
 #include <thread>
+#include <type_traits>
 #include <unordered_map>
 #include <utility>
 #include <vector>
@@ -256,7 +257,8 @@ TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) {
     resp.Clear();
     req.mutable_table()->set_table_name(table_name);
     req.set_max_returned_locations(0);
-    Status s = catalog->GetTableLocations(&req, &resp, /*user=*/nullopt);
+    Status s = catalog->GetTableLocations(
+        &req, &resp, /*use_external_addr=*/false, /*user=*/nullopt);
     ASSERT_STR_CONTAINS(
         s.ToString(),
         "max_returned_locations must be greater than 0 if specified");
@@ -269,7 +271,8 @@ TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) {
     resp.Clear();
     req.mutable_table()->set_table_name(table_name);
     req.set_max_returned_locations(1);
-    ASSERT_OK(catalog->GetTableLocations(&req, &resp, /*user=*/nullopt));
+    ASSERT_OK(catalog->GetTableLocations(
+        &req, &resp, /*use_external_addr=*/false, /*user=*/nullopt));
     ASSERT_EQ(resp.tablet_locations_size(), 1);
     // empty since it's the first
     ASSERT_EQ(resp.tablet_locations(0).partition().partition_key_start(), "");
@@ -284,7 +287,8 @@ TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) {
     resp.Clear();
     req.mutable_table()->set_table_name(table_name);
     req.set_max_returned_locations(half_tablets);
-    ASSERT_OK(catalog->GetTableLocations(&req, &resp, /*user=*/nullopt));
+    ASSERT_OK(catalog->GetTableLocations(
+        &req, &resp, /*use_external_addr=*/false, /*user=*/nullopt));
     ASSERT_EQ(half_tablets, resp.tablet_locations_size());
   }
 
@@ -295,7 +299,8 @@ TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) {
     resp.Clear();
     req.mutable_table()->set_table_name(table_name);
     req.set_max_returned_locations(FLAGS_num_test_tablets);
-    ASSERT_OK(catalog->GetTableLocations(&req, &resp, /*user=*/nullopt));
+    ASSERT_OK(catalog->GetTableLocations(
+        &req, &resp, /*use_external_addr=*/false, /*user=*/nullopt));
     ASSERT_EQ(FLAGS_num_test_tablets, resp.tablet_locations_size());
   }
 
@@ -339,7 +344,8 @@ TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) {
     req.mutable_table()->set_table_name(table_name);
     req.set_max_returned_locations(1);
     req.set_partition_key_start(start_key_middle);
-    ASSERT_OK(catalog->GetTableLocations(&req, &resp, /*user=*/nullopt));
+    ASSERT_OK(catalog->GetTableLocations(
+        &req, &resp, /*use_external_addr=*/false, /*user=*/nullopt));
     ASSERT_EQ(1, resp.tablet_locations_size())
         << "Response: [" << pb_util::SecureDebugString(resp) << "]";
     ASSERT_EQ(start_key_middle, resp.tablet_locations(0).partition().partition_key_start());
diff --git a/src/kudu/integration-tests/registration-test.cc b/src/kudu/integration-tests/registration-test.cc
index 2f598a163..9ed909a25 100644
--- a/src/kudu/integration-tests/registration-test.cc
+++ b/src/kudu/integration-tests/registration-test.cc
@@ -22,6 +22,7 @@
 #include <optional>
 #include <ostream>
 #include <string>
+#include <type_traits>
 #include <vector>
 
 #include <gflags/gflags_declare.h>
@@ -41,7 +42,7 @@
 #include "kudu/master/master.pb.h"
 #include "kudu/master/mini_master.h"
 #include "kudu/master/sys_catalog.h"
-#include "kudu/master/ts_descriptor.h"
+#include "kudu/master/ts_descriptor.h"  // IWYU pragma: keep
 #include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/security/test/test_certs.h"
 #include "kudu/security/tls_context.h"
@@ -203,6 +204,7 @@ class RegistrationTest : public KuduTest {
         RETURN_NOT_OK(ls);
         s = catalog->GetTabletLocations(tablet_id,
                                         master::VOTER_REPLICA,
+                                        /*use_external_addr=*/false,
                                         &loc,
                                         &infos_dict,
                                         /*user=*/nullopt);
@@ -232,7 +234,7 @@ TEST_F(RegistrationTest, TestTSRegisters) {
 
   // Verify that the registration is sane.
   ServerRegistrationPB reg;
-  descs[0]->GetRegistration(&reg);
+  ASSERT_OK(descs[0]->GetRegistration(&reg));
   {
     SCOPED_TRACE(SecureShortDebugString(reg));
     ASSERT_EQ(string::npos, SecureShortDebugString(reg).find("0.0.0.0"))
@@ -266,12 +268,12 @@ TEST_F(RegistrationTest, TestTSRegisters) {
 TEST_F(RegistrationTest, TestMasterSoftwareVersion) {
   // Verify that the master's software version exists.
   ServerRegistrationPB reg;
-  cluster_->mini_master()->master()->GetMasterRegistration(&reg);
+  cluster_->mini_master()->master()->GetMasterRegistration(
+      &reg, /*use_external_addr=*/false);
   {
     SCOPED_TRACE(SecureShortDebugString(reg));
     ASSERT_TRUE(reg.has_software_version());
-    ASSERT_STR_CONTAINS(reg.software_version(),
-                        VersionInfo::GetVersionInfo());
+    ASSERT_STR_CONTAINS(reg.software_version(), VersionInfo::GetVersionInfo());
     ASSERT_LE(setup_time_, reg.start_time());
     ASSERT_LE(reg.start_time(), WallTime_Now());
   }
@@ -279,7 +281,8 @@ TEST_F(RegistrationTest, TestMasterSoftwareVersion) {
 
 TEST_F(RegistrationTest, TestServerStartWallTime) {
   ServerRegistrationPB reg;
-  cluster_->mini_master()->master()->GetMasterRegistration(&reg);
+  cluster_->mini_master()->master()->GetMasterRegistration(
+      &reg, /*use_external_addr=*/false);
   ASSERT_LE(setup_time_, reg.start_time());
   ASSERT_LE(reg.start_time(), WallTime_Now());
 
diff --git a/src/kudu/integration-tests/table_locations-itest.cc b/src/kudu/integration-tests/table_locations-itest.cc
index 4d6829d7a..12f304b77 100644
--- a/src/kudu/integration-tests/table_locations-itest.cc
+++ b/src/kudu/integration-tests/table_locations-itest.cc
@@ -27,6 +27,7 @@
 #include <ostream>
 #include <string>
 #include <thread>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -717,7 +718,8 @@ TEST_F(TableLocationsTest, GetTableLocationsBenchmarkFunctionCall) {
         ++req_counters[idx];
         {
           CatalogManager::ScopedLeaderSharedLock l(cm);
-          auto s = cm->GetTableLocations(&req, resp, username);
+          auto s = cm->GetTableLocations(
+              &req, resp, /*use_external_addr=*/false, username);
           if (!s.ok()) {
             ++err_counters[idx];
           }
diff --git a/src/kudu/master/auto_leader_rebalancer.cc b/src/kudu/master/auto_leader_rebalancer.cc
index b56ca737a..1a5adf143 100644
--- a/src/kudu/master/auto_leader_rebalancer.cc
+++ b/src/kudu/master/auto_leader_rebalancer.cc
@@ -155,7 +155,12 @@ Status AutoLeaderRebalancerTask::RunLeaderRebalanceForTable(
       // This will only return tablet replicas in the RUNNING state, and
       // filter to only retrieve voter replicas.
       RETURN_NOT_OK(catalog_manager_->GetTabletLocations(
-          tablet->id(), ReplicaTypeFilter::VOTER_REPLICA, &locs_pb, &ts_infos_dict, nullopt));
+          tablet->id(),
+          ReplicaTypeFilter::VOTER_REPLICA,
+          /*use_external_addr=*/false,
+          &locs_pb,
+          &ts_infos_dict,
+          nullopt));
     }
 
     // Build a summary for each replica of the tablet.
diff --git a/src/kudu/master/auto_rebalancer.cc b/src/kudu/master/auto_rebalancer.cc
index faa9a2c65..aa3d1a7fc 100644
--- a/src/kudu/master/auto_rebalancer.cc
+++ b/src/kudu/master/auto_rebalancer.cc
@@ -386,6 +386,7 @@ Status AutoRebalancerTask::GetTabletLeader(
     RETURN_NOT_OK(catalog_manager_->GetTabletLocations(
         tablet_id,
         ReplicaTypeFilter::VOTER_REPLICA,
+        /*use_external_addr=*/false,
         &locs_pb,
         &ts_infos_dict,
         nullopt));
@@ -434,7 +435,7 @@ Status AutoRebalancerTask::ExecuteMoves(
         return Status::NotFound("Could not find destination tserver");
       }
       ServerRegistrationPB dest_reg;
-      dest_desc->GetRegistration(&dest_reg);
+      RETURN_NOT_OK(dest_desc->GetRegistration(&dest_reg));
 
       auto* add_peer_change = req.add_config_changes();
       add_peer_change->set_type(ADD_PEER);
@@ -541,6 +542,7 @@ Status AutoRebalancerTask::BuildClusterRawInfo(
         RETURN_NOT_OK(catalog_manager_->GetTabletLocations(
             tablet_summary.id,
             ReplicaTypeFilter::VOTER_REPLICA,
+            /*use_external_addr=*/false,
             &locs_pb,
             &ts_infos_dict,
             nullopt));
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 48f5924cf..c2ff76a92 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -5037,7 +5037,7 @@ bool AsyncAddReplicaTask::SendRequest(int attempt) {
     peer->mutable_attrs()->set_promote(true);
   }
   ServerRegistrationPB peer_reg;
-  extra_replica->GetRegistration(&peer_reg);
+  DCHECK_OK(extra_replica->GetRegistration(&peer_reg));
   CHECK_GT(peer_reg.rpc_addresses_size(), 0);
   *peer->mutable_last_known_addr() = peer_reg.rpc_addresses(0);
   peer->set_member_type(member_type_);
@@ -6023,7 +6023,7 @@ Status CatalogManager::SelectReplicasForTablet(const PlacementPolicy& policy,
                                    tablet->id(), table_guard.data().name()));
   for (const auto& desc : descriptors) {
     ServerRegistrationPB reg;
-    desc->GetRegistration(&reg);
+    RETURN_NOT_OK(desc->GetRegistration(&reg));
 
     RaftPeerPB* peer = config->add_peers();
     peer->set_member_type(RaftPeerPB::VOTER);
@@ -6119,6 +6119,7 @@ Status CatalogManager::ProcessDeletedTables(const vector<scoped_refptr<TableInfo
 Status CatalogManager::BuildLocationsForTablet(
     const scoped_refptr<TabletInfo>& tablet,
     ReplicaTypeFilter filter,
+    bool use_external_addr,
     TabletLocationsPB* locs_pb,
     TSInfosDict* ts_infos_dict) {
   TabletMetadataLock l_tablet(tablet.get(), LockMode::READ);
@@ -6163,37 +6164,57 @@ Status CatalogManager::BuildLocationsForTablet(
     }
 
     // Helper function to create a TSInfoPB.
-    auto fill_tsinfo_pb = [this, &peer](TSInfoPB* tsinfo_pb) {
+    auto fill_tsinfo_pb = [this, &peer, use_external_addr](TSInfoPB* tsinfo_pb) {
       tsinfo_pb->set_permanent_uuid(peer.permanent_uuid());
       shared_ptr<TSDescriptor> ts_desc;
       if (master_->ts_manager()->LookupTSByUUID(peer.permanent_uuid(), &ts_desc)) {
-        ts_desc->GetTSInfoPB(tsinfo_pb);
-      } else {
-        // If we've never received a heartbeat from the tserver, we'll fall back
-        // to the last known RPC address in the RaftPeerPB.
-        //
-        // TODO(wdberkeley): We should track these RPC addresses in the master table itself.
+        ts_desc->GetTSInfoPB(tsinfo_pb, use_external_addr);
+        return true;
+      }
+      // If we've never received a heartbeat from the tserver, we'll fall back
+      // to the last known RPC address in the RaftPeerPB.
+      //
+      // TODO(wdberkeley): We should track these RPC addresses in the master table itself.
+      //
+      if (!use_external_addr) {
         tsinfo_pb->add_rpc_addresses()->CopyFrom(peer.last_known_addr());
+        return true;
       }
+      return false;
     };
 
     const auto role = GetParticipantRole(peer, cstate);
     const optional<string> dimension = l_tablet.data().pb.has_dimension_label()
         ? make_optional(l_tablet.data().pb.dimension_label()) : nullopt;
+
+    // Don't even add a TSInfo entry when using external addresses if
+    // proxy-advertised address for the peer isn't yet known at this point.
+    // That's to avoid exposing internal addresses to clients running outside
+    // of the cluster since they cannot make requests to internal addresses
+    // anyways, so from their standpoint a replica without such address
+    // doesn't even exist.
     if (ts_infos_dict) {
       const auto idx = ts_infos_dict->LookupOrAdd(peer.permanent_uuid(), fill_tsinfo_pb);
-      auto* interned_replica_pb = locs_pb->add_interned_replicas();
-      interned_replica_pb->set_ts_info_idx(idx);
-      interned_replica_pb->set_role(role);
-      if (dimension) {
-        interned_replica_pb->set_dimension_label(*dimension);
+      if (idx >= 0) {
+        auto* interned_replica_pb = locs_pb->add_interned_replicas();
+        interned_replica_pb->set_ts_info_idx(idx);
+        interned_replica_pb->set_role(role);
+        if (dimension) {
+          interned_replica_pb->set_dimension_label(*dimension);
+        }
       }
     } else {
-      TabletLocationsPB_DEPRECATED_ReplicaPB* replica_pb = locs_pb->add_deprecated_replicas();
-      TSInfoPB* tsi = google::protobuf::Arena::CreateMessage<TSInfoPB>(locs_pb->GetArena());
-      fill_tsinfo_pb(tsi);
-      replica_pb->set_allocated_ts_info(tsi);
-      replica_pb->set_role(role);
+      auto* replica_pb = locs_pb->add_deprecated_replicas();
+      auto* arena = locs_pb->GetArena();
+      TSInfoPB* tsi = google::protobuf::Arena::CreateMessage<TSInfoPB>(arena);
+      if (fill_tsinfo_pb(tsi)) {
+        replica_pb->set_allocated_ts_info(tsi);
+        replica_pb->set_role(role);
+      } else {
+        if (!arena) {
+          delete tsi;
+        }
+      }
     }
   }
 
@@ -6208,6 +6229,7 @@ Status CatalogManager::BuildLocationsForTablet(
 
 Status CatalogManager::GetTabletLocations(const string& tablet_id,
                                           ReplicaTypeFilter filter,
+                                          bool use_external_addr,
                                           TabletLocationsPB* locs_pb,
                                           TSInfosDict* ts_infos_dict,
                                           const optional<string>& user) {
@@ -6234,7 +6256,8 @@ Status CatalogManager::GetTabletLocations(const string& tablet_id,
         NormalizeTableName(table_lock.data().name()), *user, *user == table_lock.data().owner()));
   }
 
-  return BuildLocationsForTablet(tablet_info, filter, locs_pb, ts_infos_dict);
+  return BuildLocationsForTablet(
+      tablet_info, filter, use_external_addr, locs_pb, ts_infos_dict);
 }
 
 Status CatalogManager::ReplaceTablet(const string& tablet_id, ReplaceTabletResponsePB* resp) {
@@ -6328,6 +6351,7 @@ Status CatalogManager::ReplaceTablet(const string& tablet_id, ReplaceTabletRespo
 
 Status CatalogManager::GetTableLocations(const GetTableLocationsRequestPB* req,
                                          GetTableLocationsResponsePB* resp,
+                                         bool use_external_addr,
                                          const optional<string>& user) {
   // If start-key is > end-key report an error instead of swapping the two
   // since probably there is something wrong app-side.
@@ -6383,7 +6407,10 @@ Status CatalogManager::GetTableLocations(const GetTableLocationsRequestPB* req,
   bool consistent_locations = true;
   for (const auto& tablet : tablets_in_range) {
     const auto s = BuildLocationsForTablet(
-        tablet, req->replica_type_filter(), resp->add_tablet_locations(),
+        tablet,
+        req->replica_type_filter(),
+        use_external_addr,
+        resp->add_tablet_locations(),
         req->intern_ts_infos_in_response() ? &infos_dict : nullptr);
     if (PREDICT_TRUE(s.ok())) {
       continue;
@@ -6736,14 +6763,19 @@ CatalogManager::TSInfosDict::~TSInfosDict() {
 }
 
 int CatalogManager::TSInfosDict::LookupOrAdd(const string& uuid,
-                                             const std::function<void(TSInfoPB*)>& creator) {
+                                             const std::function<bool(TSInfoPB*)>& creator) {
   return *ComputePairIfAbsent(&uuid_to_idx_, uuid, [&]() -> pair<StringPiece, int> {
-    auto idx = ts_info_pbs_.size();
     auto* pb = google::protobuf::Arena::CreateMessage<TSInfoPB>(arena_);
-    ts_info_pbs_.push_back(pb);
-    creator(pb);
-    DCHECK_EQ(pb->permanent_uuid(), uuid);
-    return {pb->permanent_uuid(), idx};
+    if (creator(pb)) {
+      DCHECK_EQ(pb->permanent_uuid(), uuid);
+      auto idx = ts_info_pbs_.size();
+      ts_info_pbs_.push_back(pb);
+      return {pb->permanent_uuid(), idx};
+    }
+    if (!arena_) {
+      delete pb;
+    }
+    return {"", -1};
   });
 }
 
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 3baa8bfdf..b17c29ddd 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -737,6 +737,7 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   // Returns an error if any of the tablets are not running.
   Status GetTableLocations(const GetTableLocationsRequestPB* req,
                            GetTableLocationsResponsePB* resp,
+                           bool use_external_addr,
                            const std::optional<std::string>& user);
 
   // Dictionary mapping tablet servers to indexes, so that when a GetTableLocations
@@ -755,7 +756,8 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
     // Lookup the index for the given tablet server UUID. If that UUID has not been
     // added yet, allocates a new TSInfoPB and calls 'creator(pb)' to fill it in,
     // returning the index of the newly-added TS.
-    int LookupOrAdd(const std::string& uuid, const std::function<void(TSInfoPB*)>& creator);
+    int LookupOrAdd(const std::string& uuid,
+                    const std::function<bool(TSInfoPB*)>& creator);
 
     const std::vector<TSInfoPB*>& ts_info_pbs() const { return ts_info_pbs_; }
 
@@ -781,6 +783,7 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   // This only returns tablets which are in RUNNING state.
   Status GetTabletLocations(const std::string& tablet_id,
                             master::ReplicaTypeFilter filter,
+                            bool use_external_addr,
                             TabletLocationsPB* locs_pb,
                             TSInfosDict* ts_infos_dict,
                             const std::optional<std::string>& user);
@@ -1122,6 +1125,7 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   // Returns Status::ServiceUnavailable if tablet is not running.
   Status BuildLocationsForTablet(const scoped_refptr<TabletInfo>& tablet,
                                  master::ReplicaTypeFilter filter,
+                                 bool use_external_addr,
                                  TabletLocationsPB* locs_pb,
                                  TSInfosDict* ts_infos_dict);
 
diff --git a/src/kudu/master/master-test-util.h b/src/kudu/master/master-test-util.h
index e77392222..db4842603 100644
--- a/src/kudu/master/master-test-util.h
+++ b/src/kudu/master/master-test-util.h
@@ -52,7 +52,8 @@ Status WaitForRunningTabletCount(MiniMaster* mini_master,
     {
       CatalogManager::ScopedLeaderSharedLock l(catalog);
       RETURN_NOT_OK(l.first_failed_status());
-      RETURN_NOT_OK(catalog->GetTableLocations(&req, resp, /*user=*/std::nullopt));
+      RETURN_NOT_OK(catalog->GetTableLocations(
+          &req, resp, /*use_external_addr=*/false, /*user=*/std::nullopt));
     }
     if (resp->tablet_locations_size() >= expected_count) {
       return Status::OK();
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index 2b2d73505..5fdd2c664 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -505,7 +505,7 @@ TEST_F(MasterTest, TestRegisterAndHeartbeat) {
   master_->ts_manager()->GetAllDescriptors(&descs);
   ASSERT_EQ(1, descs.size()) << "Should have registered the TS";
   ServerRegistrationPB reg;
-  descs[0]->GetRegistration(&reg);
+  ASSERT_OK(descs[0]->GetRegistration(&reg));
   ASSERT_EQ(SecureDebugString(fake_reg), SecureDebugString(reg))
       << "Master got different registration";
 
@@ -3232,7 +3232,7 @@ TEST_F(MasterTest, TestDuplicateRequest) {
   master_->ts_manager()->GetAllDescriptors(&descs);
   ASSERT_EQ(1, descs.size()) << "Should have registered the TS";
   ServerRegistrationPB reg;
-  descs[0]->GetRegistration(&reg);
+  ASSERT_OK(descs[0]->GetRegistration(&reg));
   ASSERT_EQ(SecureDebugString(fake_reg), SecureDebugString(reg))
       << "Master got different registration";
   shared_ptr<TSDescriptor> ts_desc;
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index d54a53732..296765b21 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -235,6 +235,7 @@ Status Master::Init() {
       FLAGS_authz_token_validity_seconds,
       FLAGS_tsk_rotation_seconds,
       messenger_->shared_token_verifier()));
+
   state_ = kInitialized;
   return Status::OK();
 }
@@ -392,11 +393,27 @@ Status Master::WaitUntilCatalogManagerIsLeaderAndReadyForTests(const MonoDelta&
                           s.ToString());
 }
 
-Status Master::GetMasterRegistration(ServerRegistrationPB* reg) const {
+Status Master::GetMasterRegistration(ServerRegistrationPB* reg,
+                                     bool use_external_addr) const {
   if (!registration_initialized_.load(std::memory_order_acquire)) {
     return Status::ServiceUnavailable("Master startup not complete");
   }
   reg->CopyFrom(registration_);
+  if (use_external_addr) {
+    DCHECK_GT(reg->rpc_proxy_addresses_size(), 0);
+    if (reg->rpc_proxy_addresses_size() > 0) {
+      reg->clear_rpc_addresses();
+      reg->mutable_rpc_addresses()->CopyFrom(reg->rpc_proxy_addresses());
+    }
+
+    // TODO(aserbin): uncomment once the webserver proxy advertised addresses
+    //                are properly propagated in the code
+    //DCHECK_GT(reg->proxy_http_addresses_size(), 0);
+    if (reg->http_proxy_addresses_size() > 0) {
+      reg->clear_http_addresses();
+      reg->mutable_http_addresses()->CopyFrom(reg->http_proxy_addresses());
+    }
+  }
   return Status::OK();
 }
 
@@ -503,7 +520,8 @@ Status Master::DeleteExpiredReservedTables() {
   return Status::OK();
 }
 
-Status Master::ListMasters(vector<ServerEntryPB>* masters) const {
+Status Master::ListMasters(vector<ServerEntryPB>* masters,
+                           bool use_external_addr) const {
   auto consensus = catalog_manager_->master_consensus();
   if (!consensus) {
     return Status::IllegalState("consensus not running");
@@ -516,7 +534,8 @@ Status Master::ListMasters(vector<ServerEntryPB>* masters) const {
   if (config.peers_size() == 1) {
     ServerEntryPB local_entry;
     local_entry.mutable_instance_id()->CopyFrom(catalog_manager_->NodeInstance());
-    RETURN_NOT_OK(GetMasterRegistration(local_entry.mutable_registration()));
+    RETURN_NOT_OK(GetMasterRegistration(local_entry.mutable_registration(),
+                                        use_external_addr));
     local_entry.set_role(RaftPeerPB::LEADER);
     local_entry.set_cluster_id(catalog_manager_->GetClusterId());
     local_entry.set_member_type(RaftPeerPB::VOTER);
@@ -525,6 +544,7 @@ Status Master::ListMasters(vector<ServerEntryPB>* masters) const {
   }
 
   // For distributed master configuration.
+  // TODO(aserbin): update this to work with proxied RPCs
   for (const auto& peer : config.peers()) {
     HostPort hp = HostPortFromPB(peer.last_known_addr());
     ServerEntryPB peer_entry;
diff --git a/src/kudu/master/master.h b/src/kudu/master/master.h
index dd451a908..49b6541fc 100644
--- a/src/kudu/master/master.h
+++ b/src/kudu/master/master.h
@@ -28,6 +28,7 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/kserver/kserver.h"
 #include "kudu/master/master_options.h"
+#include "kudu/server/rpc_server.h"
 #include "kudu/util/promise.h"
 #include "kudu/util/status.h"
 
@@ -37,6 +38,7 @@ class HostPort;
 class MaintenanceManager;
 class MonoDelta;
 class MonoTime;
+class Sockaddr;
 class Thread;
 class ThreadPool;
 
@@ -102,7 +104,9 @@ class Master : public kserver::KuduServer {
   LocationCache* location_cache() { return location_cache_.get(); }
 
   // Get the RPC and HTTP addresses for this master instance.
-  Status GetMasterRegistration(ServerRegistrationPB* registration) const;
+  // Whether to use external/proxied address for master registration.
+  Status GetMasterRegistration(ServerRegistrationPB* registration,
+                               bool use_external_addr) const;
 
   // Get node instance, Raft role, RPC and HTTP addresses for all
   // masters.
@@ -114,7 +118,8 @@ class Master : public kserver::KuduServer {
   // client; cache this information with a TTL (possibly in another
   // SysTable), so that we don't have to perform an RPC call on every
   // request.
-  Status ListMasters(std::vector<ServerEntryPB>* masters) const;
+  Status ListMasters(std::vector<ServerEntryPB>* masters,
+                     bool use_external_addr) const;
 
   enum MasterType {
     ALL,
@@ -144,6 +149,12 @@ class Master : public kserver::KuduServer {
     return maintenance_manager_.get();
   }
 
+  // A shortcut to get addresses this master server is configured with
+  // for processing RPCs proxied from an external network.
+  const std::vector<Sockaddr>& rpc_proxied_addresses() const {
+    return rpc_server()->GetRpcProxiedAddresses();
+  }
+
  private:
   friend class MasterTest;
   friend class CatalogManager;
diff --git a/src/kudu/master/master_path_handlers.cc b/src/kudu/master/master_path_handlers.cc
index fbff516d7..c7bc02816 100644
--- a/src/kudu/master/master_path_handlers.cc
+++ b/src/kudu/master/master_path_handlers.cc
@@ -147,6 +147,12 @@ void MasterPathHandlers::HandleTabletServers(const Webserver::WebRequest& /*req*
   }
   // Process the registered tservers.
   for (const auto& desc : descs) {
+    ServerRegistrationPB reg;
+    if (auto s = desc->GetRegistration(&reg); PREDICT_FALSE(!s.ok())) {
+      LOG(WARNING) << s.ToString();
+      continue;
+    }
+
     const string& ts_uuid = desc->permanent_uuid();
     string ts_key;
     auto* ts_state_info = FindOrNull(ts_state_infos, ts_uuid);
@@ -164,8 +170,6 @@ void MasterPathHandlers::HandleTabletServers(const Webserver::WebRequest& /*req*
     }
     EasyJson ts_json = (*output)[ts_key].PushBack(EasyJson::kObject);
 
-    ServerRegistrationPB reg;
-    desc->GetRegistration(&reg);
     ts_json["uuid"] = ts_uuid;
     if (!reg.http_addresses().empty()) {
       string webserver = Substitute("$0://$1:$2",
@@ -556,9 +560,10 @@ void MasterPathHandlers::HandleTablePage(const Webserver::WebRequest& req,
 
 void MasterPathHandlers::HandleMasters(const Webserver::WebRequest& /*req*/,
                                        Webserver::WebResponse* resp) {
+  // TODO(aserbin): update this to work with proxied RPCs
   EasyJson* output = &resp->output;
   vector<ServerEntryPB> masters;
-  Status s = master_->ListMasters(&masters);
+  Status s = master_->ListMasters(&masters, /*use_external_addr=*/false);
   if (!s.ok()) {
     string msg = s.CloneAndPrepend("Unable to list Masters").ToString();
     LOG(WARNING) << msg;
@@ -737,14 +742,17 @@ void MasterPathHandlers::HandleDumpEntities(const Webserver::WebRequest& /*req*/
   vector<shared_ptr<TSDescriptor> > descs;
   master_->ts_manager()->GetAllDescriptors(&descs);
   for (const auto& desc : descs) {
+    ServerRegistrationPB reg;
+    if (auto s = desc->GetRegistration(&reg); PREDICT_FALSE(!s.ok())) {
+      LOG(WARNING) << s.ToString();
+      continue;
+    }
+
     jw.StartObject();
 
     jw.String("uuid");
     jw.String(desc->permanent_uuid());
 
-    ServerRegistrationPB reg;
-    desc->GetRegistration(&reg);
-
     jw.String("rpc_addrs");
     jw.StartArray();
     for (const HostPortPB& host_port : reg.rpc_addresses()) {
@@ -819,8 +827,10 @@ Status MasterPathHandlers::Register(Webserver* server) {
 pair<string, string> MasterPathHandlers::TSDescToLinkPair(const TSDescriptor& desc,
                                                           const string& tablet_id) const {
   ServerRegistrationPB reg;
-  desc.GetRegistration(&reg);
-  if (reg.http_addresses().empty()) {
+  if (auto s = desc.GetRegistration(&reg); !s.ok() || reg.http_addresses().empty()) {
+    if (PREDICT_FALSE(!s.ok())) {
+      LOG(WARNING) << s.ToString();
+    }
     return std::make_pair(desc.permanent_uuid(), "");
   }
   string text = Substitute("$0:$1", reg.http_addresses(0).host(), reg.http_addresses(0).port());
@@ -857,8 +867,11 @@ string MasterPathHandlers::MasterAddrsToCsv() const {
 }
 
 Status MasterPathHandlers::GetLeaderMasterHttpAddr(string* leader_http_addr) const {
+  // TODO(aserbin): update this to work with proxied RPCs
   vector<ServerEntryPB> masters;
-  RETURN_NOT_OK_PREPEND(master_->ListMasters(&masters), "unable to list masters");
+  RETURN_NOT_OK_PREPEND(master_->ListMasters(&masters,
+                                             /*use_external_addr=*/false),
+                        "unable to list masters");
   for (const auto& master : masters) {
     if (master.has_error()) {
       continue;
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index 6e2cfcf19..7af0bd754 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -51,6 +51,7 @@
 #include "kudu/security/token.pb.h"
 #include "kudu/security/token_signer.h"
 #include "kudu/security/token_verifier.h"
+#include "kudu/server/rpc_server.h"
 #include "kudu/server/server_base.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/flag_tags.h"
@@ -517,12 +518,17 @@ void MasterServiceImpl::GetTabletLocations(const GetTabletLocationsRequestPB* re
     SleepFor(MonoDelta::FromMilliseconds(FLAGS_master_inject_latency_on_tablet_lookups_ms));
   }
 
+  const auto use_external_addr = IsAddrOneOf(
+      rpc->local_address(), server_->rpc_proxied_addresses());
+
   CatalogManager::TSInfosDict infos_dict(resp->GetArena());
   for (const string& tablet_id : req->tablet_ids()) {
     // TODO(todd): once we have catalog data. ACL checks would also go here, probably.
     TabletLocationsPB* locs_pb = resp->add_tablet_locations();
     Status s = server_->catalog_manager()->GetTabletLocations(
-        tablet_id, req->replica_type_filter(),
+        tablet_id,
+        req->replica_type_filter(),
+        use_external_addr,
         locs_pb,
         req->intern_ts_infos_in_response() ? &infos_dict : nullptr,
         rpc->remote_user().username());
@@ -675,8 +681,11 @@ void MasterServiceImpl::GetTableLocations(const GetTableLocationsRequestPB* req,
     if (PREDICT_FALSE(FLAGS_master_inject_latency_on_tablet_lookups_ms > 0)) {
       SleepFor(MonoDelta::FromMilliseconds(FLAGS_master_inject_latency_on_tablet_lookups_ms));
     }
+
+    const auto use_external_addr = IsAddrOneOf(
+        rpc->local_address(), server_->rpc_proxied_addresses());
     s = server_->catalog_manager()->GetTableLocations(
-        req, resp, rpc->remote_user().username());
+        req, resp, use_external_addr, rpc->remote_user().username());
   }
 
   CheckRespErrorOrSetUnknown(s, resp);
@@ -705,6 +714,9 @@ void MasterServiceImpl::GetTableSchema(const GetTableSchemaRequestPB* req,
 void MasterServiceImpl::ListTabletServers(const ListTabletServersRequestPB* req,
                                           ListTabletServersResponsePB* resp,
                                           rpc::RpcContext* rpc) {
+  const auto use_external_addr = IsAddrOneOf(
+      rpc->local_address(), server_->rpc_proxied_addresses());
+
   TSManager* ts_manager = server_->ts_manager();
   TServerStateMap states = req->include_states() ?
       ts_manager->GetTServerStates() : TServerStateMap();
@@ -712,8 +724,13 @@ void MasterServiceImpl::ListTabletServers(const ListTabletServersRequestPB* req,
   server_->ts_manager()->GetAllDescriptors(&descs);
   for (const std::shared_ptr<TSDescriptor>& desc : descs) {
     ListTabletServersResponsePB::Entry* entry = resp->add_servers();
+    if (auto s = desc->GetRegistration(entry->mutable_registration(),
+                                       use_external_addr); !s.ok()) {
+      LOG(WARNING) << s.ToString();
+      resp->mutable_servers()->RemoveLast();
+      continue;
+    }
     desc->GetNodeInstancePB(entry->mutable_instance_id());
-    desc->GetRegistration(entry->mutable_registration());
     entry->set_millis_since_heartbeat(desc->TimeSinceHeartbeat().ToMilliseconds());
     if (desc->location()) {
       entry->set_location(*desc->location());
@@ -742,12 +759,13 @@ void MasterServiceImpl::ListTabletServers(const ListTabletServersRequestPB* req,
   rpc->RespondSuccess();
 }
 
-void MasterServiceImpl::ListMasters(const ListMastersRequestPB* req,
+void MasterServiceImpl::ListMasters(const ListMastersRequestPB* /*req*/,
                                     ListMastersResponsePB* resp,
                                     rpc::RpcContext* rpc) {
+  const auto use_external_addr = IsAddrOneOf(
+      rpc->local_address(), server_->rpc_proxied_addresses());
   vector<ServerEntryPB> masters;
-  Status s = server_->ListMasters(&masters);
-  if (!s.ok()) {
+  if (const auto s = server_->ListMasters(&masters, use_external_addr); !s.ok()) {
     StatusToPB(s, resp->mutable_error()->mutable_status());
     resp->mutable_error()->set_code(MasterErrorPB::UNKNOWN_ERROR);
 
@@ -761,9 +779,10 @@ void MasterServiceImpl::ListMasters(const ListMastersRequestPB* req,
   rpc->RespondSuccess();
 }
 
-void MasterServiceImpl::GetMasterRegistration(const GetMasterRegistrationRequestPB* req,
-                                              GetMasterRegistrationResponsePB* resp,
-                                              rpc::RpcContext* rpc) {
+void MasterServiceImpl::GetMasterRegistration(
+    const GetMasterRegistrationRequestPB* /*req*/,
+    GetMasterRegistrationResponsePB* resp,
+    rpc::RpcContext* rpc) {
   // instance_id must always be set in order for status pages to be useful.
   resp->mutable_instance_id()->CopyFrom(server_->instance_pb());
 
@@ -772,7 +791,10 @@ void MasterServiceImpl::GetMasterRegistration(const GetMasterRegistrationRequest
     return;
   }
 
-  Status s = server_->GetMasterRegistration(resp->mutable_registration());
+  const auto use_external_addr = IsAddrOneOf(
+      rpc->local_address(), server_->rpc_proxied_addresses());
+  const auto s = server_->GetMasterRegistration(
+      resp->mutable_registration(), use_external_addr);
   CheckRespErrorOrSetUnknown(s, resp);
   const auto& role_and_member = server_->catalog_manager()->GetRoleAndMemberType();
   resp->set_role(role_and_member.first);
@@ -791,17 +813,28 @@ void MasterServiceImpl::ConnectToMaster(const ConnectToMasterRequestPB* /*req*/,
 
   // Set the info about the other masters, so that the client can verify
   // it has the full set of info.
-  vector<HostPort> addresses;
-  Status s = server_->GetMasterHostPorts(&addresses);
-  if (!s.ok()) {
-    StatusToPB(s, resp->mutable_error()->mutable_status());
-    resp->mutable_error()->set_code(MasterErrorPB::UNKNOWN_ERROR);
-    rpc->RespondSuccess();
-    return;
-  }
-  resp->mutable_master_addrs()->Reserve(addresses.size());
-  for (const auto& hp : addresses) {
-    *resp->add_master_addrs() = HostPortToPB(hp);
+  {
+    // Check if the request came to the dedicated RPC endpoint meaning
+    // it's been proxied from the outside network.
+    if (IsAddrOneOf(rpc->local_address(), server_->rpc_proxied_addresses())) {
+      // TODO(aserbin): adapt this to multi-master configuration
+      for (const auto& hp : server_->rpc_server()->GetProxyAdvertisedHostPorts()) {
+        *resp->add_master_addrs() = HostPortToPB(hp);
+      }
+    } else {
+      vector<HostPort> addresses;
+      if (const auto s = server_->GetMasterHostPorts(&addresses);
+          PREDICT_FALSE(!s.ok())) {
+        StatusToPB(s, resp->mutable_error()->mutable_status());
+        resp->mutable_error()->set_code(MasterErrorPB::UNKNOWN_ERROR);
+        rpc->RespondSuccess();
+        return;
+      }
+      resp->mutable_master_addrs()->Reserve(addresses.size());
+      for (const auto& hp : addresses) {
+        *resp->add_master_addrs() = HostPortToPB(hp);
+      }
+    }
   }
 
   const bool is_leader = l.leader_status().ok();
diff --git a/src/kudu/master/ts_descriptor.cc b/src/kudu/master/ts_descriptor.cc
index 75eb27e4d..f9152a564 100644
--- a/src/kudu/master/ts_descriptor.cc
+++ b/src/kudu/master/ts_descriptor.cc
@@ -32,6 +32,7 @@
 #include "kudu/common/wire_protocol.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/consensus.proxy.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/tserver/tserver_admin.proxy.h"
@@ -210,17 +211,44 @@ double TSDescriptor::RecentReplicaCreations() {
   return recent_replica_creations_;
 }
 
-void TSDescriptor::GetRegistration(ServerRegistrationPB* reg) const {
+Status TSDescriptor::GetRegistration(ServerRegistrationPB* reg,
+                                     bool use_external_addr) const {
   shared_lock<rw_spinlock> l(lock_);
   CHECK(registration_) << "No registration";
   CHECK_NOTNULL(reg)->CopyFrom(*registration_);
+
+  if (use_external_addr) {
+    // For a tablet server to be reachable from the outside, its registration
+    // should have at least one address in rpc_proxy_addresses and one element
+    // in proxy_http_addresses, otherwise it might be a misconfiguration.
+    //
+    // The internal addresses are not exposed to the outside, and addresses
+    // advertised by a proxy to outside clients are reported as the only
+    // available ones for a tablet server.
+    // TODO(aserbin): add condition on reg->http_proxy_addresses_size()
+    //                once http_proxy_addresses flag is introduced
+    if (PREDICT_FALSE(reg->rpc_proxy_addresses_size() <= 0)) {
+      const auto msg = Substitute(
+          "server $0 lacks proxy advertised addresses", ToString());
+      DCHECK(false) << msg;
+      return Status::IllegalState(msg);
+    }
+    reg->mutable_rpc_addresses()->Swap(reg->mutable_rpc_proxy_addresses());
+    reg->clear_rpc_proxy_addresses();
+
+    // TODO(aserbin): uncomment once http_proxy_addresses flag is introduced
+    //reg->mutable_http_addresses()->Swap(reg->mutable_http_proxy_addresses());
+    //reg->clear_http_proxy_addresses();
+  }
+  return Status::OK();
 }
 
-void TSDescriptor::GetTSInfoPB(TSInfoPB* tsinfo_pb) const {
+void TSDescriptor::GetTSInfoPB(TSInfoPB* tsinfo_pb, bool use_external_addr) const {
   shared_lock<rw_spinlock> l(lock_);
   CHECK(registration_);
   const auto& reg = *registration_;
-  tsinfo_pb->mutable_rpc_addresses()->CopyFrom(reg.rpc_addresses());
+  tsinfo_pb->mutable_rpc_addresses()->CopyFrom(
+      use_external_addr ? reg.rpc_proxy_addresses() : reg.rpc_addresses());
   if (reg.has_unix_domain_socket_path()) {
     tsinfo_pb->set_unix_domain_socket_path(reg.unix_domain_socket_path());
   }
diff --git a/src/kudu/master/ts_descriptor.h b/src/kudu/master/ts_descriptor.h
index 4bc862075..ca78eb7b3 100644
--- a/src/kudu/master/ts_descriptor.h
+++ b/src/kudu/master/ts_descriptor.h
@@ -104,9 +104,16 @@ class TSDescriptor : public enable_make_shared<TSDescriptor> {
   // Copy the current registration info into the given PB object.
   // A safe copy is returned because the internal Registration object
   // may be mutated at any point if the tablet server re-registers.
-  void GetRegistration(ServerRegistrationPB* reg) const;
-
-  void GetTSInfoPB(TSInfoPB* tsinfo_pb) const;
+  // If 'use_external_addr' is 'true', return information targeted to clients
+  // in the external networks from where RPCs are proxied into the cluster's
+  // internal network.
+  // Return Status::OK() if the registration has been successfully filled in,
+  // non-OK if misconfiguration related to the usage of external addresses
+  // has been detected.
+  Status GetRegistration(ServerRegistrationPB* reg,
+                         bool use_external_addr = false) const;
+
+  void GetTSInfoPB(TSInfoPB* tsinfo_pb, bool use_external_addr) const;
 
   void GetNodeInstancePB(NodeInstancePB* instance_pb) const;
 
diff --git a/src/kudu/server/rpc_server-test.cc b/src/kudu/server/rpc_server-test.cc
index 8f7e4c7d6..4b5fd9f08 100644
--- a/src/kudu/server/rpc_server-test.cc
+++ b/src/kudu/server/rpc_server-test.cc
@@ -15,18 +15,24 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <string>
+#include "kudu/server/rpc_server.h"
+
+#include <cstdint>
 #include <memory>
+#include <string>
+#include <utility>
 #include <vector>
 
 #include <gflags/gflags_declare.h>
 #include <gtest/gtest.h>
 
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/messenger.h"
-#include "kudu/server/rpc_server.h"
 #include "kudu/util/metrics.h"
+#include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
+#include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
@@ -34,6 +40,7 @@ using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using std::vector;
+using strings::Substitute;
 
 DECLARE_bool(rpc_server_allow_ephemeral_ports);
 
@@ -47,18 +54,28 @@ class RpcServerAdvertisedAddressesTest : public KuduTest {
     FLAGS_rpc_server_allow_ephemeral_ports = true;
 
     RpcServerOptions opts;
-    string bind = use_bind_addresses();
-    if (!bind.empty()) {
-      opts.rpc_bind_addresses = bind;
-    } else {
-      opts.rpc_bind_addresses = "127.0.0.1";
-    }
-    string advertised = use_advertised_addresses();
-    if (!advertised.empty()) {
-      opts.rpc_advertised_addresses = advertised;
+    {
+      string bind = use_bind_addresses();
+      if (!bind.empty()) {
+        opts.rpc_bind_addresses = std::move(bind);
+      } else {
+        opts.rpc_bind_addresses = "127.0.0.1";
+      }
+      string advertised = use_advertised_addresses();
+      if (!advertised.empty()) {
+        opts.rpc_advertised_addresses = std::move(advertised);
+      }
+      string proxy_advertised = use_proxy_advertised_addresses();
+      if (!proxy_advertised.empty()) {
+        opts.rpc_proxy_advertised_addresses = std::move(proxy_advertised);
+      }
+      string proxied = use_proxied_addresses();
+      if (!proxied.empty()) {
+        opts.rpc_proxied_addresses = std::move(proxied);
+      }
     }
     server_.reset(new RpcServer(opts));
-    unique_ptr<MetricRegistry> metric_registry(new MetricRegistry());
+    unique_ptr<MetricRegistry> metric_registry(new MetricRegistry);
     scoped_refptr<MetricEntity> metric_entity =
         METRIC_ENTITY_server.Instantiate(metric_registry.get(), "test");
     rpc::MessengerBuilder builder("test");
@@ -73,11 +90,19 @@ class RpcServerAdvertisedAddressesTest : public KuduTest {
   // Overridden by subclasses.
   virtual string use_bind_addresses() const { return ""; }
   virtual string use_advertised_addresses() const { return ""; }
+  virtual string use_proxy_advertised_addresses() const { return ""; }
+  virtual string use_proxied_addresses() const { return ""; }
+
+  Status GetAddresses(vector<Sockaddr>* bound_addrs,
+                      vector<Sockaddr>* advertised_addrs) {
+    RETURN_NOT_OK(server_->GetBoundAddresses(bound_addrs));
+    return server_->GetAdvertisedAddresses(advertised_addrs);
+  }
 
-  void GetAddresses(vector<Sockaddr>* bound_addrs,
-                    vector<Sockaddr>* advertised_addrs) {
-    ASSERT_OK(server_->GetBoundAddresses(bound_addrs));
-    ASSERT_OK(server_->GetAdvertisedAddresses(advertised_addrs));
+  void GetProxyAddresses(vector<Sockaddr>* proxied_addrs,
+                         vector<HostPort>* proxy_advertised_addrs) {
+    *proxied_addrs = server_->GetRpcProxiedAddresses();
+    *proxy_advertised_addrs = server_->GetProxyAdvertisedHostPorts();
   }
 
   unique_ptr<RpcServer> server_;
@@ -88,47 +113,197 @@ class AdvertisedOnlyWebserverTest : public RpcServerAdvertisedAddressesTest {
   string use_advertised_addresses() const override { return "1.2.3.4:1234"; }
 };
 
+TEST_F(AdvertisedOnlyWebserverTest, OnlyAdvertisedAddresses) {
+  vector<Sockaddr> bound_addrs;
+  vector<Sockaddr> advertised_addrs;
+  ASSERT_OK(GetAddresses(&bound_addrs, &advertised_addrs));
+
+  ASSERT_EQ(1, advertised_addrs.size());
+  ASSERT_EQ(1, bound_addrs.size());
+  ASSERT_EQ("1.2.3.4", advertised_addrs[0].host());
+  ASSERT_EQ(1234, advertised_addrs[0].port());
+}
+
 class BoundOnlyWebserverTest : public RpcServerAdvertisedAddressesTest {
  protected:
   string use_bind_addresses() const override { return "127.0.0.1"; }
 };
 
+TEST_F(BoundOnlyWebserverTest, OnlyBoundAddresses) {
+  vector<Sockaddr> bound_addrs;
+  vector<Sockaddr> advertised_addrs;
+  ASSERT_OK(GetAddresses(&bound_addrs, &advertised_addrs));
+
+  ASSERT_EQ(1, advertised_addrs.size());
+  ASSERT_EQ(1, bound_addrs.size());
+  ASSERT_EQ("127.0.0.1", advertised_addrs[0].host());
+  ASSERT_EQ("127.0.0.1", bound_addrs[0].host());
+  ASSERT_EQ(advertised_addrs[0].port(), bound_addrs[0].port());
+}
+
 class BothBoundAndAdvertisedWebserverTest : public RpcServerAdvertisedAddressesTest {
  protected:
   string use_advertised_addresses() const override { return "1.2.3.4:1234"; }
   string use_bind_addresses() const override { return "127.0.0.1"; }
 };
 
-TEST_F(AdvertisedOnlyWebserverTest, OnlyAdvertisedAddresses) {
-  vector<Sockaddr> bound_addrs, advertised_addrs;
-  NO_FATALS(GetAddresses(&bound_addrs, &advertised_addrs));
+TEST_F(BothBoundAndAdvertisedWebserverTest, BothBoundAndAdvertisedAddresses) {
+  vector<Sockaddr> bound_addrs;
+  vector<Sockaddr> advertised_addrs;
+  ASSERT_OK(GetAddresses(&bound_addrs, &advertised_addrs));
 
   ASSERT_EQ(1, advertised_addrs.size());
   ASSERT_EQ(1, bound_addrs.size());
   ASSERT_EQ("1.2.3.4", advertised_addrs[0].host());
   ASSERT_EQ(1234, advertised_addrs[0].port());
+  ASSERT_EQ("127.0.0.1", bound_addrs[0].host());
 }
 
-TEST_F(BoundOnlyWebserverTest, OnlyBoundAddresses) {
-  vector<Sockaddr> bound_addrs, advertised_addrs;
-  NO_FATALS(GetAddresses(&bound_addrs, &advertised_addrs));
+class ProxiedRpcAddressesTest : public RpcServerAdvertisedAddressesTest {
+ public:
+  void SetUp() override {
+    ASSERT_OK(GetRandomPort("127.0.0.1", &rpc_bind_port_));
+    ASSERT_OK(GetRandomPort("127.0.0.1", &rpc_proxied_port_));
+    RpcServerAdvertisedAddressesTest::SetUp();
+  }
+
+ protected:
+  string use_bind_addresses() const override {
+    return Substitute("127.0.0.1:$0", rpc_bind_port_);
+  }
+  string use_proxy_advertised_addresses() const override {
+    return "1.2.3.4:888";
+  }
+  string use_proxied_addresses() const override {
+    return Substitute("127.0.0.1:$0", rpc_proxied_port_);
+  }
+  uint16_t rpc_bind_port() const {
+    return rpc_bind_port_;
+  }
+  uint16_t rpc_proxied_port() const {
+    return rpc_proxied_port_;
+  }
+
+ private:
+  uint16_t rpc_bind_port_ = 0;
+  uint16_t rpc_proxied_port_ = 0;
+};
+
+TEST_F(ProxiedRpcAddressesTest, Basic) {
+  vector<Sockaddr> proxied_addrs;
+  vector<HostPort> proxy_advertised_addrs;
+  GetProxyAddresses(&proxied_addrs, &proxy_advertised_addrs);
+
+  ASSERT_EQ(1, proxy_advertised_addrs.size());
+  ASSERT_EQ("1.2.3.4", proxy_advertised_addrs[0].host());
+  ASSERT_EQ(888, proxy_advertised_addrs[0].port());
+
+  ASSERT_EQ(1, proxied_addrs.size());
+  ASSERT_EQ("127.0.0.1", proxied_addrs[0].host());
+  ASSERT_EQ(rpc_proxied_port(), proxied_addrs[0].port());
+
+  vector<Sockaddr> bound_addrs;
+  vector<Sockaddr> advertised_addrs;
+  ASSERT_OK(GetAddresses(&bound_addrs, &advertised_addrs));
 
   ASSERT_EQ(1, advertised_addrs.size());
-  ASSERT_EQ(1, bound_addrs.size());
   ASSERT_EQ("127.0.0.1", advertised_addrs[0].host());
+  ASSERT_EQ(rpc_bind_port(), advertised_addrs[0].port());
+
+  ASSERT_EQ(2, bound_addrs.size());
   ASSERT_EQ("127.0.0.1", bound_addrs[0].host());
-  ASSERT_EQ(advertised_addrs[0].port(), bound_addrs[0].port());
+  ASSERT_EQ(rpc_bind_port(), bound_addrs[0].port());
+  ASSERT_EQ("127.0.0.1", bound_addrs[1].host());
+  ASSERT_EQ(rpc_proxied_port(), bound_addrs[1].port());
 }
 
-TEST_F(BothBoundAndAdvertisedWebserverTest, BothBoundAndAdvertisedAddresses) {
-  vector<Sockaddr> bound_addrs, advertised_addrs;
-  NO_FATALS(GetAddresses(&bound_addrs, &advertised_addrs));
+// The advertised and proxy advertised addresses are independent and
+// both can be set for an RPC server.
+class ProxiedAndAdvertisedRpcAddressesTest : public ProxiedRpcAddressesTest {
+ public:
+  string use_advertised_addresses() const override {
+      return "2.3.4.5:2345";
+  }
+};
+
+TEST_F(ProxiedAndAdvertisedRpcAddressesTest, Basic) {
+  vector<Sockaddr> proxied_addrs;
+  vector<HostPort> proxy_advertised_addrs;
+  GetProxyAddresses(&proxied_addrs, &proxy_advertised_addrs);
+
+  ASSERT_EQ(1, proxy_advertised_addrs.size());
+  ASSERT_EQ("1.2.3.4", proxy_advertised_addrs[0].host());
+  ASSERT_EQ(888, proxy_advertised_addrs[0].port());
+
+  ASSERT_EQ(1, proxied_addrs.size());
+  ASSERT_EQ("127.0.0.1", proxied_addrs[0].host());
+  ASSERT_EQ(rpc_proxied_port(), proxied_addrs[0].port());
+
+  vector<Sockaddr> bound_addrs;
+  vector<Sockaddr> advertised_addrs;
+  ASSERT_OK(GetAddresses(&bound_addrs, &advertised_addrs));
 
   ASSERT_EQ(1, advertised_addrs.size());
-  ASSERT_EQ(1, bound_addrs.size());
-  ASSERT_EQ("1.2.3.4", advertised_addrs[0].host());
-  ASSERT_EQ(1234, advertised_addrs[0].port());
+  ASSERT_EQ("2.3.4.5", advertised_addrs[0].host());
+  ASSERT_EQ(2345, advertised_addrs[0].port());
+
+  ASSERT_EQ(2, bound_addrs.size());
   ASSERT_EQ("127.0.0.1", bound_addrs[0].host());
+  ASSERT_EQ(rpc_bind_port(), bound_addrs[0].port());
+  ASSERT_EQ("127.0.0.1", bound_addrs[1].host());
+  ASSERT_EQ(rpc_proxied_port(), bound_addrs[1].port());
+}
+
+// This is similar to ProxiedRpcAddressesTest, but binds to ephemeral ports
+// given 0 as the port number to bind to.
+class ProxiedRpcAddressesWildcardPortTest : public ProxiedRpcAddressesTest {
+ public:
+  void SetUp() override {
+    RpcServerAdvertisedAddressesTest::SetUp();
+  }
+};
+
+TEST_F(ProxiedRpcAddressesWildcardPortTest, Basic) {
+  vector<Sockaddr> proxied_addrs;
+  vector<HostPort> proxy_advertised_addrs;
+  GetProxyAddresses(&proxied_addrs, &proxy_advertised_addrs);
+
+  ASSERT_EQ(1, proxy_advertised_addrs.size());
+  ASSERT_EQ("1.2.3.4", proxy_advertised_addrs[0].host());
+  ASSERT_EQ(888, proxy_advertised_addrs[0].port());
+
+  ASSERT_EQ(1, proxied_addrs.size());
+  ASSERT_EQ("127.0.0.1", proxied_addrs[0].host());
+  ASSERT_NE(0, proxied_addrs[0].port());
+
+  vector<Sockaddr> bound_addrs;
+  vector<Sockaddr> advertised_addrs;
+  ASSERT_OK(GetAddresses(&bound_addrs, &advertised_addrs));
+
+  ASSERT_EQ(1, advertised_addrs.size());
+  ASSERT_EQ("127.0.0.1", advertised_addrs[0].host());
+  ASSERT_NE(0, advertised_addrs[0].port());
+
+  ASSERT_EQ(2, bound_addrs.size());
+  ASSERT_EQ("127.0.0.1", bound_addrs[0].host());
+  ASSERT_NE(0, bound_addrs[0].port());
+  ASSERT_EQ("127.0.0.1", bound_addrs[1].host());
+  ASSERT_NE(0, bound_addrs[1].port());
+
+  // The bound endpoints include the proxied RPC endpoints as well.
+  // Not comparing the IP addresses since they are both loopbacks (127.0.0.1).
+  ASSERT_TRUE(
+      proxied_addrs[0].port() == bound_addrs[0].port() ||
+      proxied_addrs[0].port() == bound_addrs[1].port());
+
+  // The bound endpoints include the advertised endpoints as well.
+  // Not comparing the IP addresses since they are both loopbacks (127.0.0.1).
+  ASSERT_TRUE(
+      advertised_addrs[0].port() == bound_addrs[0].port() ||
+      advertised_addrs[0].port() == bound_addrs[1].port());
+
+  // The advertised endpoints and proxied endpoints are different.
+  ASSERT_NE(advertised_addrs[0].port(), proxied_addrs[0].port());
 }
 
 } // namespace kudu
diff --git a/src/kudu/server/rpc_server.cc b/src/kudu/server/rpc_server.cc
index c2842f2e4..5d5791268 100644
--- a/src/kudu/server/rpc_server.cc
+++ b/src/kudu/server/rpc_server.cc
@@ -21,6 +21,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -32,12 +33,13 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/rpc/acceptor_pool.h"
-#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/acceptor_pool.h" // IWYU pragma: keep
+#include "kudu/rpc/messenger.h"     // IWYU pragma: keep
 #include "kudu/rpc/rpc_service.h"
 #include "kudu/rpc/service_if.h"
 #include "kudu/rpc/service_pool.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/flag_validators.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/status.h"
@@ -64,6 +66,25 @@ DEFINE_string(rpc_advertised_addresses, "",
               "for example, if Kudu is deployed in a container.");
 TAG_FLAG(rpc_advertised_addresses, advanced);
 
+DEFINE_string(rpc_proxied_addresses, "",
+              "Comma-separated list of addresses to accept RPC requests "
+              "forwarded from external networks (possibly, via a TCP proxy). "
+              "These are RPC endpoints in the inner network to handle RPC "
+              "requests forwarded/proxied from outside networks; "
+              "they are orthogonal to --rpc_advertised_addresses, so these "
+              "can be used in containerized environments behind a firewall.");
+TAG_FLAG(rpc_proxied_addresses, advanced);
+TAG_FLAG(rpc_proxied_addresses, experimental);
+
+DEFINE_string(rpc_proxy_advertised_addresses, "",
+              "This server's RPC endpoints exposed to the external network via "
+              "a TCP proxy. It's assumed that RPCs sent by a Kudu client from "
+              "the external network are forwarded/proxied to the RPC endpoint "
+              "in the inner cluster's network, where the latter is specified "
+              "by the --rpc_proxied_addresses flag.");
+TAG_FLAG(rpc_proxy_advertised_addresses, advanced);
+TAG_FLAG(rpc_proxy_advertised_addresses, experimental);
+
 DEFINE_int32(rpc_num_acceptors_per_address, 1,
              "Number of RPC acceptor threads for each bound address");
 TAG_FLAG(rpc_num_acceptors_per_address, advanced);
@@ -85,16 +106,37 @@ DEFINE_bool(rpc_reuseport, false,
             "Whether to set the SO_REUSEPORT option on listening RPC sockets.");
 TAG_FLAG(rpc_reuseport, experimental);
 
+namespace {
+
+bool ValidateProxyAddresses()  {
+  if ((FLAGS_rpc_proxied_addresses.empty() &&
+       !FLAGS_rpc_proxy_advertised_addresses.empty()) ||
+      (!FLAGS_rpc_proxied_addresses.empty() &&
+       FLAGS_rpc_proxy_advertised_addresses.empty())) {
+    LOG(ERROR) << Substitute(
+        "--rpc_proxy_advertised_addresses and --rpc_proxied_addresses should "
+        "be either both set or both unset");
+    return false;
+  }
+  return true;
+}
+GROUP_FLAG_VALIDATOR(proxy_addresses, ValidateProxyAddresses);
+
+} // anonymous namespace
+
+
 namespace kudu {
 
 RpcServerOptions::RpcServerOptions()
-  : rpc_bind_addresses(FLAGS_rpc_bind_addresses),
-    rpc_advertised_addresses(FLAGS_rpc_advertised_addresses),
-    num_acceptors_per_address(FLAGS_rpc_num_acceptors_per_address),
-    num_service_threads(FLAGS_rpc_num_service_threads),
-    default_port(0),
-    service_queue_length(FLAGS_rpc_service_queue_length),
-    rpc_reuseport(FLAGS_rpc_reuseport) {
+    : rpc_bind_addresses(FLAGS_rpc_bind_addresses),
+      rpc_advertised_addresses(FLAGS_rpc_advertised_addresses),
+      rpc_proxied_addresses(FLAGS_rpc_proxied_addresses),
+      rpc_proxy_advertised_addresses(FLAGS_rpc_proxy_advertised_addresses),
+      num_acceptors_per_address(FLAGS_rpc_num_acceptors_per_address),
+      num_service_threads(FLAGS_rpc_num_service_threads),
+      default_port(0),
+      service_queue_length(FLAGS_rpc_service_queue_length),
+      rpc_reuseport(FLAGS_rpc_reuseport) {
 }
 
 RpcServer::RpcServer(RpcServerOptions opts)
@@ -118,26 +160,35 @@ Status RpcServer::Init(const shared_ptr<Messenger>& messenger) {
   CHECK_EQ(server_state_, UNINITIALIZED);
   messenger_ = messenger;
 
-  RETURN_NOT_OK(ParseAddressList(options_.rpc_bind_addresses,
-                                 options_.default_port,
-                                 &rpc_bind_addresses_));
+  constexpr auto addr_check_fn = [] (const vector<Sockaddr>& addresses,
+                                     bool allow_ephemeral_ports) {
+    for (const Sockaddr& addr : addresses) {
+      if (!addr.is_ip()) {
+        continue;
+      }
 
-  for (const Sockaddr& addr : rpc_bind_addresses_) {
-    if (!addr.is_ip()) continue;
+      if (IsPrivilegedPort(addr.port())) {
+        LOG(WARNING) << Substitute(
+            "may be unable to bind to privileged port for address $0",
+            addr.ToString());
+      }
 
-    if (IsPrivilegedPort(addr.port())) {
-      LOG(WARNING) << "May be unable to bind to privileged port for address "
-                   << addr.ToString();
+      // Currently, we can't support binding to ephemeral ports outside of
+      // unit tests, because consensus caches RPC ports of other servers
+      // across restarts. See KUDU-334.
+      if (addr.port() == 0 && !allow_ephemeral_ports) {
+        LOG(FATAL) << Substitute(
+            "binding to ephemeral ports not supported "
+            "(RPC address configured to $0)", addr.ToString());
+      }
     }
+  };
 
-    // Currently, we can't support binding to ephemeral ports outside of
-    // unit tests, because consensus caches RPC ports of other servers
-    // across restarts. See KUDU-334.
-    if (addr.port() == 0 && !FLAGS_rpc_server_allow_ephemeral_ports) {
-      LOG(FATAL) << "Binding to ephemeral ports not supported (RPC address "
-                 << "configured to " << addr.ToString() << ")";
-    }
-  }
+  const bool allow_ephemeral_ports = FLAGS_rpc_server_allow_ephemeral_ports;
+  RETURN_NOT_OK(ParseAddressList(options_.rpc_bind_addresses,
+                                 options_.default_port,
+                                 &rpc_bind_addresses_));
+  addr_check_fn(rpc_bind_addresses_, allow_ephemeral_ports);
 
   if (!options_.rpc_advertised_addresses.empty()) {
     RETURN_NOT_OK(ParseAddressList(options_.rpc_advertised_addresses,
@@ -152,6 +203,23 @@ Status RpcServer::Init(const shared_ptr<Messenger>& messenger) {
     }
   }
 
+  if (!options_.rpc_proxied_addresses.empty()) {
+    RETURN_NOT_OK(ParseAddressList(options_.rpc_proxied_addresses,
+                                   options_.default_port,
+                                   &rpc_proxied_addresses_));
+    addr_check_fn(rpc_proxied_addresses_, allow_ephemeral_ports);
+  }
+
+  if (!options_.rpc_proxy_advertised_addresses.empty()) {
+    vector<HostPort> host_ports;
+    RETURN_NOT_OK(HostPort::ParseStrings(
+        options_.rpc_proxy_advertised_addresses, options_.default_port, &host_ports));
+    if (host_ports.empty()) {
+      return Status::InvalidArgument("no proxy advertised address specified");
+    }
+    rpc_proxy_advertised_hostports_ = std::move(host_ports);
+  }
+
   server_state_ = INITIALIZED;
   return Status::OK();
 }
@@ -186,12 +254,27 @@ Status RpcServer::Bind() {
   // Create the AcceptorPool for each bind address.
   for (const Sockaddr& bind_addr : rpc_bind_addresses_) {
     shared_ptr<rpc::AcceptorPool> pool;
-    RETURN_NOT_OK(messenger_->AddAcceptorPool(
-                    bind_addr,
-                    &pool));
+    RETURN_NOT_OK(messenger_->AddAcceptorPool(bind_addr, &pool));
     new_acceptor_pools.emplace_back(std::move(pool));
   }
 
+  // Create the AcceptorPool for each address for proxied RPCs.
+  {
+    vector<Sockaddr> bound_rpc_proxied_addresses;
+    bound_rpc_proxied_addresses.reserve(rpc_proxied_addresses_.size());
+    for (const Sockaddr& bind_addr : rpc_proxied_addresses_) {
+      shared_ptr<rpc::AcceptorPool> pool;
+      RETURN_NOT_OK(messenger_->AddAcceptorPool(bind_addr, &pool));
+      new_acceptor_pools.emplace_back(std::move(pool));
+      Sockaddr bound_addr;
+      RETURN_NOT_OK(new_acceptor_pools.back()->GetBoundAddress(&bound_addr));
+      bound_rpc_proxied_addresses.emplace_back(std::move(bound_addr));
+    }
+    // The proxied addresses might be specified with a wildcard port,
+    // so here the bound addresses with actually bound ports are stored.
+    rpc_proxied_addresses_ = std::move(bound_rpc_proxied_addresses);
+  }
+
   acceptor_pools_.swap(new_acceptor_pools);
 
   server_state_ = BOUND;
@@ -242,7 +325,7 @@ Status RpcServer::GetBoundAddresses(vector<Sockaddr>* addresses) const {
     Sockaddr bound_addr;
     RETURN_NOT_OK_PREPEND(pool->GetBoundAddress(&bound_addr),
                           "Unable to get bound address from AcceptorPool");
-    addresses->push_back(bound_addr);
+    addresses->emplace_back(std::move(bound_addr));
   }
   return Status::OK();
 }
@@ -258,10 +341,23 @@ Status RpcServer::GetAdvertisedAddresses(vector<Sockaddr>* addresses) const {
       server_state_ != STARTED) {
     return Status::ServiceUnavailable(Substitute("bad state: $0", server_state_));
   }
-  if (rpc_advertised_addresses_.empty()) {
-    return GetBoundAddresses(addresses);
+
+  if (!rpc_advertised_addresses_.empty()) {
+    *addresses = rpc_advertised_addresses_;
+    return Status::OK();
+  }
+
+  vector<Sockaddr> tmp;
+  RETURN_NOT_OK(GetBoundAddresses(&tmp));
+  // Remove addresses that are dedicated to serve proxied RPCs. Those should
+  // not be advertised to clients: a client isn't supposed to send RPCs to
+  // those endpoints directly, but that's where the requests are proxied to.
+  addresses->reserve(tmp.size());
+  for (auto& addr : tmp) {
+    if (!IsAddrOneOf(addr, rpc_proxied_addresses_)) {
+      addresses->emplace_back(std::move(addr));
+    }
   }
-  *addresses = rpc_advertised_addresses_;
   return Status::OK();
 }
 
@@ -271,6 +367,16 @@ Status RpcServer::GetAdvertisedHostPorts(vector<HostPort>* hostports) const {
   return HostPortsFromAddrs(addrs, hostports);
 }
 
+const vector<HostPort>& RpcServer::GetProxyAdvertisedHostPorts() const {
+  DCHECK_NE(UNINITIALIZED, server_state_);
+  return rpc_proxy_advertised_hostports_;
+}
+
+const vector<Sockaddr>& RpcServer::GetRpcProxiedAddresses() const {
+  DCHECK_NE(UNINITIALIZED, server_state_);
+  return rpc_proxied_addresses_;
+}
+
 const rpc::ServicePool* RpcServer::service_pool(const string& service_name) const {
   return down_cast<rpc::ServicePool*>(messenger_->rpc_service(service_name).get());
 }
diff --git a/src/kudu/server/rpc_server.h b/src/kudu/server/rpc_server.h
index c23e97e19..91a4b6758 100644
--- a/src/kudu/server/rpc_server.h
+++ b/src/kudu/server/rpc_server.h
@@ -28,13 +28,13 @@
 
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
+#include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/status.h"
 
 template <class T> class scoped_refptr;
 
 namespace kudu {
-class HostPort;
 
 namespace rpc {
 class AcceptorPool;
@@ -48,6 +48,10 @@ struct RpcServerOptions {
 
   std::string rpc_bind_addresses;
   std::string rpc_advertised_addresses;
+
+  std::string rpc_proxied_addresses;
+  std::string rpc_proxy_advertised_addresses;
+
   uint32_t num_acceptors_per_address;
   uint32_t num_service_threads;
   uint16_t default_port;
@@ -94,6 +98,14 @@ class RpcServer {
   Status GetAdvertisedAddresses(std::vector<Sockaddr>* addresses) const WARN_UNUSED_RESULT;
   Status GetAdvertisedHostPorts(std::vector<HostPort>* hostports) const WARN_UNUSED_RESULT;
 
+  // Return addresses advertised at a TCP proxy for clients connecting from
+  // an external network.
+  const std::vector<HostPort>& GetProxyAdvertisedHostPorts() const;
+
+  // Return addresses this RPC server is configured with for processing RPCs
+  // proxied from an external network (might be wildcard addresses).
+  const std::vector<Sockaddr>& GetRpcProxiedAddresses() const;
+
   const rpc::ServicePool* service_pool(const std::string& service_name) const;
 
   // Return all of the currently-registered service pools.
@@ -124,6 +136,16 @@ class RpcServer {
   // should be advertised.
   std::vector<Sockaddr> rpc_advertised_addresses_;
 
+  // Endpoints for forwarded/proxied RPCs from external network(s).
+  std::vector<Sockaddr> rpc_proxied_addresses_;
+
+  // RPC endpoints in external network that are proxied/forwarded to RPC
+  // endpoints this server is bound to in the local/internal network.
+  // External endpoints are not being resolved into Sockaddr since the DNS
+  // resolver in the internal network might have no idea of the external
+  // network's DNS records.
+  std::vector<HostPort> rpc_proxy_advertised_hostports_;
+
   std::vector<std::shared_ptr<rpc::AcceptorPool> > acceptor_pools_;
 
   // Function called when one of this server's pools rejects an RPC due to queue overflow.
diff --git a/src/kudu/tserver/heartbeater.cc b/src/kudu/tserver/heartbeater.cc
index 494391e8b..ccc116fac 100644
--- a/src/kudu/tserver/heartbeater.cc
+++ b/src/kudu/tserver/heartbeater.cc
@@ -28,6 +28,7 @@
 #include <string>
 #include <type_traits>
 #include <unordered_map>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -339,12 +340,32 @@ void Heartbeater::Thread::SetupCommonField(master::TSToMasterCommonPB* common) {
 Status Heartbeater::Thread::SetupRegistration(ServerRegistrationPB* reg) {
   reg->Clear();
 
-  vector<HostPort> hps;
-  RETURN_NOT_OK(server_->rpc_server()->GetAdvertisedHostPorts(&hps));
-  for (const auto& hp : hps) {
-    auto* pb = reg->add_rpc_addresses();
-    pb->set_host(hp.host());
-    pb->set_port(hp.port());
+  // Populate the RPC addresses for communicating within the internal network.
+  {
+    vector<HostPort> hps;
+    RETURN_NOT_OK(server_->rpc_server()->GetAdvertisedHostPorts(&hps));
+    for (const auto& hp : hps) {
+      auto* pb = reg->add_rpc_addresses();
+      pb->set_host(hp.host());
+      pb->set_port(hp.port());
+    }
+  }
+
+  // Populate the RPC addresses for communicating with clients from
+  // external networks.
+  {
+    const auto* srv = server_->rpc_server();
+    const auto& hps = srv->GetProxyAdvertisedHostPorts();
+    for (const auto& hp : hps) {
+      auto* pb = reg->add_rpc_proxy_addresses();
+      pb->set_host(hp.host());
+      pb->set_port(hp.port());
+    }
+    // The RPC proxy-advertised addresses and the proxied RPC addresses should
+    // be either both empty or both non-empty. However, the number addresses
+    // a server uses to process proxied RPCs might differ from the number of
+    // advertised RPC addresses at proxy.
+    DCHECK_EQ(hps.empty(), srv->GetRpcProxiedAddresses().empty());
   }
   // Now fetch any UNIX domain sockets.
   vector<Sockaddr> addrs;
diff --git a/src/kudu/util/net/net_util.cc b/src/kudu/util/net/net_util.cc
index 496cf3658..becd7cef2 100644
--- a/src/kudu/util/net/net_util.cc
+++ b/src/kudu/util/net/net_util.cc
@@ -527,14 +527,17 @@ Status SockaddrFromHostPort(const HostPort& host_port, Sockaddr* addr) {
 }
 
 bool IsAddrOneOf(const Sockaddr& addr, const vector<Sockaddr>& ref_addresses) {
-  DCHECK(addr.is_ip());
-  DCHECK(!addr.IsWildcard());
+  if (!addr.is_ip()) {
+    return false;
+  }
   DCHECK_NE(0, addr.port());
   const bool have_match = std::any_of(
       ref_addresses.begin(),
       ref_addresses.end(),
       [&addr](const Sockaddr& s) {
-        DCHECK(s.is_ip());
+        if (!s.is_ip()) {
+          return false;
+        }
         const bool is_same_or_wildcard_port = s.port() == addr.port() ||
             s.port() == 0;
         if (s.IsWildcard()) {
diff --git a/src/kudu/util/net/net_util.h b/src/kudu/util/net/net_util.h
index 5115afc80..cb10b7d58 100644
--- a/src/kudu/util/net/net_util.h
+++ b/src/kudu/util/net/net_util.h
@@ -191,11 +191,12 @@ Status GetFQDN(std::string* hostname);
 Status SockaddrFromHostPort(const HostPort& host_port, Sockaddr* addr);
 
 // Returns true if the specified address 'addr' matches at least one of the
-// addresses in 'ref_addresses'. The 'addr' itself must not be a wildcard, but
-// any of the addresses in 'ref_addresses' may: both the address (i.e. 0.0.0.0)
-// and port wildcard (i.e. port 0) are supported. All the addresses must be in
-// the IP notation, not UNIX socket or anything else. Only IPv4 addresses are
-// supported since Sockaddr doesn't support IPv6 yet.
+// addresses in 'ref_addresses'. The 'addr' itself must not be an address with
+// wildcard port, but any of the addresses in 'ref_addresses' may, where both
+// the address wildcard (i.e. 0.0.0.0) and port wildcard (i.e. port 0)
+// are supported. All the addresses must be in IP notation, not UNIX socket
+// or anything else, otherwise function returns false regardless of inputs.
+// Only IPv4 addresses are supported since Sockaddr doesn't support IPv6 yet.
 bool IsAddrOneOf(const Sockaddr& addr, const std::vector<Sockaddr>& ref_addresses);
 
 // Converts the given list of Sockaddrs into a list of HostPorts that can be