You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2023/01/11 16:21:51 UTC

[kudu] branch master updated: KUDU-3357 proxied RPCs for multi-master Kudu clusters

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new cdc97be9a KUDU-3357 proxied RPCs for multi-master Kudu clusters
cdc97be9a is described below

commit cdc97be9aaecb3232f174786ad60d9e30d6a78dd
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Fri Jan 6 16:03:37 2023 -0800

    KUDU-3357 proxied RPCs for multi-master Kudu clusters
    
    With this patch, it's now possible to run a multi-master Kudu cluster in
    a local area network while processing RPC requests sent from external
    networks.  The RPC requests are assumed to be forwarded by a TCP proxy
    which advertises RPC endpoints for the masters and the tablet servers
    of the multi-master Kudu cluster.
    
    Refactored and unified test scenarios in client-proxied-rpc-test provide
    the coverage for the newly added functionality.
    
    This is a follow-up to 3f29b5da5f59ea96cfec0608226d5c35740884a6.
    
    Change-Id: Id834d5fae8dcc2230de9f6b6429334c24052011a
    Reviewed-on: http://gerrit.cloudera.org:8080/19404
    Reviewed-by: Attila Bukor <ab...@apache.org>
    Reviewed-by: Zoltan Chovan <zc...@cloudera.com>
    Tested-by: Alexey Serbin <al...@apache.org>
---
 .../integration-tests/client-proxied-rpc-test.cc   | 517 +++++++++++++--------
 src/kudu/master/master.cc                          | 109 ++++-
 src/kudu/master/master.h                           |  15 +-
 src/kudu/master/master_service.cc                  |   8 +-
 src/kudu/mini-cluster/external_mini_cluster.cc     |  27 +-
 src/kudu/mini-cluster/external_mini_cluster.h      |  32 +-
 6 files changed, 490 insertions(+), 218 deletions(-)

diff --git a/src/kudu/integration-tests/client-proxied-rpc-test.cc b/src/kudu/integration-tests/client-proxied-rpc-test.cc
index 932a8b853..05d0ad32f 100644
--- a/src/kudu/integration-tests/client-proxied-rpc-test.cc
+++ b/src/kudu/integration-tests/client-proxied-rpc-test.cc
@@ -15,13 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <algorithm>
+#include <array>
 #include <csignal>
+#include <cstddef>
 #include <cstdint>
+#include <functional>
 #include <memory>
 #include <ostream>
+#include <set>
 #include <string>
 #include <type_traits>
-#include <utility>
 #include <vector>
 
 #include <glog/logging.h>
@@ -34,6 +38,8 @@
 #include "kudu/client/write_op.h"
 #include "kudu/common/wire_protocol-test-util.h"
 #include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/data_gen_util.h"
 #include "kudu/integration-tests/test_workload.h"
@@ -61,44 +67,87 @@ using kudu::client::KuduTableAlterer;
 using kudu::client::KuduTabletServer;
 using kudu::cluster::ExternalMiniCluster;
 using kudu::cluster::ExternalMiniClusterOptions;
+using std::array;
+using std::function;
+using std::set;
 using std::string;
 using std::unique_ptr;
 using std::vector;
+using strings::Split;
 using strings::Substitute;
 
 namespace kudu {
 
+// Class template for test scenarios running against external mini-cluster
+// with M masters and T tablet servers.
+template<size_t M, size_t T>
 class ClientProxiedRpcTest : public KuduTest {
  public:
   void SetUp() override {
+    SKIP_IF_SLOW_NOT_ALLOWED();
+
+    auto s = FindExecutable("nc", {"/bin", "/usr/bin", "/usr/local/bin"}, &nc_);
+    if (s.IsNotFound()) {
+      LOG(WARNING) << "test is skipped: could not find netcat utility (nc)";
+      GTEST_SKIP();
+    }
+    ASSERT_OK(s);
 
     KuduTest::SetUp();
 
-    ASSERT_OK(GetRandomPort(kIpAddr, &m_proxy_advertised_port_));
-    m_proxy_advertised_addr_ = HostPort(kIpAddr, m_proxy_advertised_port_);
-    ASSERT_OK(GetRandomPort(kIpAddr, &m_proxied_port_));
-    m_proxied_addr_ = HostPort(kIpAddr, m_proxied_port_);
+    ExternalMiniClusterOptions opts;
+    opts.num_masters = M;
+    opts.num_tablet_servers = T;
+
+    vector<string> master_addrs;
+    for (size_t i = 0; i < M; ++i) {
+      auto& a_port = m_proxy_advertised_ports_[i];
+      ASSERT_OK(GetRandomPort(kIpAddr, &a_port));
+      auto& a_addr = m_proxy_advertised_addrs_[i];
+      a_addr = HostPort(kIpAddr, a_port);
+
+      auto& p_port = m_proxied_ports_[i];
+      ASSERT_OK(GetRandomPort(kIpAddr, &p_port));
+      auto& p_addr = m_proxied_addrs_[i];
+      p_addr = HostPort(kIpAddr, p_port);
+
+      vector<string> flags = {
+        Substitute("--rpc_proxy_advertised_addresses=$0", a_addr.ToString()),
+        Substitute("--rpc_proxied_addresses=$0", p_addr.ToString()),
+      };
+      opts.m_custom_flags.emplace_back(std::move(flags));
+
+      master_addrs.emplace_back(a_addr.ToString());
+    }
 
-    ASSERT_OK(GetRandomPort(kIpAddr, &t_proxy_advertised_port_));
-    t_proxy_advertised_addr_ = HostPort(kIpAddr, t_proxy_advertised_port_);
-    ASSERT_OK(GetRandomPort(kIpAddr, &t_proxied_port_));
-    t_proxied_addr_ = HostPort(kIpAddr, t_proxied_port_);
+    if (M > 1) {
+      const auto flag = Substitute("--master_rpc_proxy_advertised_addresses=$0",
+                                   JoinStrings(master_addrs, ","));
+      for (size_t i = 0; i < M; ++i) {
+        opts.m_custom_flags[i].push_back(flag);
+      }
+    }
 
-    ExternalMiniClusterOptions opts;
-    opts.extra_master_flags = {
-      Substitute("--rpc_proxy_advertised_addresses=$0",
-          m_proxy_advertised_addr_.ToString()),
-      Substitute("--rpc_proxied_addresses=$0",
-          m_proxied_addr_.ToString()),
-    };
-    opts.extra_tserver_flags = {
-      Substitute("--rpc_proxy_advertised_addresses=$0",
-          t_proxy_advertised_addr_.ToString()),
-      Substitute("--rpc_proxied_addresses=$0",
-          t_proxied_addr_.ToString()),
-    };
+    for (size_t i = 0; i < T; ++i) {
+      auto& a_port = t_proxy_advertised_ports_[i];
+      ASSERT_OK(GetRandomPort(kIpAddr, &a_port));
+      auto& a_addr = t_proxy_advertised_addrs_[i];
+      a_addr = HostPort(kIpAddr, a_port);
+
+      auto& p_port = t_proxied_ports_[i];
+      ASSERT_OK(GetRandomPort(kIpAddr, &p_port));
+      auto& p_addr = t_proxied_addrs_[i];
+      p_addr = HostPort(kIpAddr, p_port);
+
+      vector<string> flags = {
+        Substitute("--rpc_proxy_advertised_addresses=$0", a_addr.ToString()),
+        Substitute("--rpc_proxied_addresses=$0", p_addr.ToString()),
+      };
+      opts.t_custom_flags.emplace_back(std::move(flags));
+    }
 
     cluster_.reset(new ExternalMiniCluster(std::move(opts)));
+
     ASSERT_OK(cluster_->Start());
   }
 
@@ -109,191 +158,275 @@ class ClientProxiedRpcTest : public KuduTest {
     KuduTest::TearDown();
   }
 
- protected:
-  static constexpr const char* const kIpAddr = "127.0.0.1";
+  // Verify basic functionality when RPC connections to Kudu masters and tablet
+  // servers are forwarded via a TCP proxy.
+  void Run() {
+    ASSERT_FALSE(nc_.empty());
+
+    const auto kTimeout = MonoDelta::FromSeconds(5);
+    const char* const kTableName = CURRENT_TEST_NAME();
+    const auto schema = KuduSchema::FromSchema(GetSimpleTestSchema());
+    TestWorkload w(cluster_.get());
+    w.set_schema(schema);
+    w.set_table_name(kTableName);
+    w.set_num_replicas(1);
+    w.Setup();
+
+    vector<unique_ptr<Fifo>> m_fifos(M);
+    for (auto i = 0; i < M; ++i) {
+      const auto fname = Substitute("m.fifo.$0", i);
+      ASSERT_OK(env_->NewFifo(JoinPathSegments(test_dir_, fname), &m_fifos[i]));
+    }
 
-  uint16_t m_proxied_port_;
-  HostPort m_proxied_addr_;
-  uint16_t m_proxy_advertised_port_;
-  HostPort m_proxy_advertised_addr_;
+    vector<unique_ptr<Fifo>> t_fifos(T);
+    for (auto i = 0; i < T; ++i) {
+      const auto fname = Substitute("t.fifo.$0", i);
+      ASSERT_OK(env_->NewFifo(JoinPathSegments(test_dir_, fname), &t_fifos[i]));
+    }
 
-  uint16_t t_proxied_port_;
-  HostPort t_proxied_addr_;
-  uint16_t t_proxy_advertised_port_;
-  HostPort t_proxy_advertised_addr_;
+    // Run TCP proxies for Kudu masters' connections.
+    vector<unique_ptr<Subprocess>> m_proxies;
+    m_proxies.reserve(M);
+    vector<ScopedCleanup<function<void(void)>>> m_proxy_cleanups;
+    m_proxy_cleanups.reserve(M);
+    for (auto i = 0; i < M; ++i) {
+      const auto proxy_cmd_str = Substitute(
+          kProxyCmdPattern,
+          nc_,
+          kIpAddr,
+          m_proxy_advertised_ports_[i],
+          m_proxied_ports_[i],
+          m_fifos[i]->filename());
+      m_proxies.emplace_back(new Subprocess({"/bin/bash", "-c", proxy_cmd_str}));
+
+      auto* proxy = m_proxies.back().get();
+      function<void(void)> cleanup = [proxy] {
+        if (proxy->IsStarted()) {
+          WARN_NOT_OK(proxy->KillAndWait(SIGTERM),
+                      Substitute("PID $0: could not stop process", proxy->pid()));
+        }
+      };
+
+      m_proxy_cleanups.emplace_back(std::move(cleanup));
+    }
+    for (auto& p : m_proxies) {
+      ASSERT_OK(p->Start());
+    }
 
-  unique_ptr<ExternalMiniCluster> cluster_;
-};
+    // Run TCP proxies for Kudu tablet servers' connections.
+    vector<unique_ptr<Subprocess>> t_proxies;
+    t_proxies.reserve(T);
+    vector<ScopedCleanup<function<void(void)>>> t_proxy_cleanups;
+    t_proxy_cleanups.reserve(T);
+    for (auto i = 0; i < T; ++i) {
+      const auto proxy_cmd_str = Substitute(
+          kProxyCmdPattern,
+          nc_,
+          kIpAddr,
+          t_proxy_advertised_ports_[i],
+          t_proxied_ports_[i],
+          t_fifos[i]->filename());
+      t_proxies.emplace_back(new Subprocess({"/bin/bash", "-c", proxy_cmd_str}));
+
+      auto* proxy = t_proxies.back().get();
+      function<void(void)> cleanup = [proxy] {
+        if (proxy->IsStarted()) {
+          WARN_NOT_OK(proxy->KillAndWait(SIGTERM),
+                      Substitute("PID $0: could not stop process", proxy->pid()));
+        }
+      };
+      t_proxy_cleanups.emplace_back(std::move(cleanup));
+    }
+    for (auto& p : t_proxies) {
+      ASSERT_OK(p->Start());
+    }
 
-// Verify basic functionality when RPC connections to Kudu master and tablet
-// server are forwarded via a simple TCP proxy.
-TEST_F(ClientProxiedRpcTest, Basic) {
-  SKIP_IF_SLOW_NOT_ALLOWED();
+    // Wait for the TCP proxies to start up.
+    for (auto port : m_proxy_advertised_ports_) {
+      ASSERT_OK(WaitForTcpBindAtPort({ kIpAddr }, port, kTimeout));
+    }
+    for (auto port : t_proxy_advertised_ports_) {
+      ASSERT_OK(WaitForTcpBindAtPort({ kIpAddr }, port, kTimeout));
+    }
 
-  string nc;
-  {
-    auto s = FindExecutable("nc", {"/bin", "/usr/bin", "/usr/local/bin"}, &nc);
-    if (s.IsNotFound()) {
-      LOG(WARNING) << "test is skipped: could not find netcat utility (nc)";
-      GTEST_SKIP();
+    // Build a client to send requests via RPC endpoints advertised by proxy.
+    client::sp::shared_ptr<client::KuduClient> client;
+    {
+      client::KuduClientBuilder b;
+      for (auto i = 0; i < M; ++i) {
+        b.add_master_server_addr(m_proxy_advertised_addrs_[i].ToString());
+      }
+      b.default_admin_operation_timeout(kTimeout);
+      b.default_rpc_timeout(kTimeout);
+      ASSERT_OK(b.Build(&client));
     }
-    ASSERT_OK(s);
-  }
-  ASSERT_FALSE(nc.empty());
 
-  const auto kTimeout = MonoDelta::FromSeconds(5);
-  constexpr const char* const kTableName = "proxied_rpc_test";
-  constexpr const char* const kProxyCmdPattern =
-      "trap \"kill %1\" EXIT; $0 -knv -l $1 $2 <$4 | $0 -nv $1 $3 >$4";
-  const auto schema = KuduSchema::FromSchema(GetSimpleTestSchema());
-  TestWorkload w(cluster_.get());
-  w.set_schema(schema);
-  w.set_table_name(kTableName);
-  w.set_num_replicas(1);
-  w.Setup();
-
-  unique_ptr<Fifo> m_fifo;
-  ASSERT_OK(env_->NewFifo(JoinPathSegments(test_dir_, "m.fifo"), &m_fifo));
-
-  unique_ptr<Fifo> t_fifo;
-  ASSERT_OK(env_->NewFifo(JoinPathSegments(test_dir_, "t.fifo"), &t_fifo));
-
-  // Run TCP proxy for Kudu master connections.
-  const auto m_proxy_cmd_str = Substitute(
-      kProxyCmdPattern, nc,
-      kIpAddr, m_proxy_advertised_port_, m_proxied_port_, m_fifo->filename());
-  Subprocess m_proxy({"/bin/bash", "-c", m_proxy_cmd_str});
-  ASSERT_OK(m_proxy.Start());
-  auto m_proxy_cleanup = MakeScopedCleanup([&] {
-    m_proxy.KillAndWait(SIGTERM);
-  });
-
-  // Run TCP proxy for Kudu tablet server connections.
-  const auto t_proxy_cmd_str = Substitute(
-      kProxyCmdPattern, nc,
-      kIpAddr, t_proxy_advertised_port_, t_proxied_port_, t_fifo->filename());
-  Subprocess t_proxy({"/bin/bash", "-c", t_proxy_cmd_str});
-  ASSERT_OK(t_proxy.Start());
-  auto t_proxy_cleanup = MakeScopedCleanup([&] {
-    t_proxy.KillAndWait(SIGTERM);
-  });
-
-  // Wait for the TCP proxies to start up.
-  ASSERT_OK(WaitForTcpBindAtPort({ kIpAddr }, m_proxy_advertised_port_, kTimeout));
-  ASSERT_OK(WaitForTcpBindAtPort({ kIpAddr }, t_proxy_advertised_port_, kTimeout));
-
-  // Build a client to send requests via the proxied RPC endpoint.
-  client::sp::shared_ptr<client::KuduClient> client;
-  {
-    client::KuduClientBuilder b;
-    b.add_master_server_addr(m_proxy_advertised_addr_.ToString());
-    b.default_admin_operation_timeout(kTimeout);
-    b.default_rpc_timeout(kTimeout);
-    ASSERT_OK(b.Build(&client));
-  }
+    // Make sure the client receives the addresses advertised by proxy since
+    // the request came through the proxied RPC address.
+    const vector<string> master_addresses(Split(client->GetMasterAddresses(), ","));
+    for (const auto& hp : m_proxy_advertised_addrs_) {
+      ASSERT_TRUE(std::any_of(master_addresses.begin(), master_addresses.end(),
+                              [&hp](const string& e) { return e == hp.ToString(); }));
+    }
+    ASSERT_EQ(m_proxy_advertised_addrs_.size(), master_addresses.size());
+    if (M > 1) {
+      ASSERT_TRUE(client->IsMultiMaster());
+    } else {
+      ASSERT_FALSE(client->IsMultiMaster());
+    }
 
-  // Make sure the client receives proxy advertised addresses since the request
-  // came to the proxied RPC address.
-  const auto& master_addresses = client->GetMasterAddresses();
-  ASSERT_EQ(m_proxy_advertised_addr_.ToString(), master_addresses);
-  // Just a sanity check: multiple RPC endpoints shouldn't be treated as
-  // a presence of multiple masters in the cluster.
-  ASSERT_FALSE(client->IsMultiMaster());
-
-  // Check that client sees RPC addresses advertised by TCP proxy for the
-  // tablet server.
-  vector<KuduTabletServer*> tss;
-  ElementDeleter deleter(&tss);
-  ASSERT_OK(client->ListTabletServers(&tss));
-  ASSERT_EQ(1, tss.size());
-  ASSERT_EQ(kIpAddr, tss[0]->hostname());
-  ASSERT_EQ(t_proxy_advertised_port_, tss[0]->port());
-
-  client::sp::shared_ptr<KuduTable> table;
-  ASSERT_OK(client->OpenTable(kTableName, &table));
-
-  // Create a session and explicitly set the flush mode to AUTO_FLUSH_SYNC
-  // to send every operation when calling Apply().
-  client::sp::shared_ptr<KuduSession> session(client->NewSession());
-  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
-  ThreadSafeRandom rng(SeedRandom());
-  for (auto i = 0; i < 10; ++i) {
-    unique_ptr<KuduInsert> insert(table->NewInsert());
-    auto* row = insert->mutable_row();
-    GenerateDataForRow(schema, i, &rng, row);
-    ASSERT_OK(session->Apply(insert.release()));
-  }
-  // Call Flush() just in case, but it's a no-op effectively since the chosen
-  // session flush mode.
-  ASSERT_OK(session->Flush());
-
-  // Read the data back.
-  {
-    KuduScanner scanner(table.get());
-    ASSERT_OK(scanner.SetFaultTolerant());
-    ASSERT_OK(scanner.Open());
-    ASSERT_TRUE(scanner.HasMoreRows());
-    KuduScanBatch batch;
-
-    int32_t idx = 0;
-    while (scanner.HasMoreRows()) {
-      ASSERT_OK(scanner.NextBatch(&batch));
-      for (const auto& row : batch) {
-        int32_t value;
-        ASSERT_OK(row.GetInt32(0, &value));
-        ASSERT_EQ(idx++, value);
+    // Check that client sees RPC addresses advertised by TCP proxy for the
+    // tablet server.
+    vector<KuduTabletServer*> tss;
+    ElementDeleter deleter(&tss);
+    ASSERT_OK(client->ListTabletServers(&tss));
+    ASSERT_EQ(T, tss.size());
+    for (auto i = 0; i < T; ++i) {
+      ASSERT_EQ(kIpAddr, tss[i]->hostname());
+    }
+    {
+      set<uint16_t> ports;
+      for (auto i = 0; i < T; ++i) {
+        const auto port = tss[i]->port();
+        ports.emplace(port);
+        ASSERT_TRUE(std::any_of(t_proxy_advertised_ports_.begin(),
+                                t_proxy_advertised_ports_.end(),
+                                [&port](uint16_t e) { return e == port; }));
       }
+      // Make sure all ports are different.
+      ASSERT_EQ(T, ports.size());
     }
-    ASSERT_EQ(10, idx);
-  }
 
-  // Make sure the client indeed works through the RPC address advertised by
-  // proxy: stop the proxy and check if client could write any data to the table.
-  t_proxy_cleanup.cancel();
-  ASSERT_OK(t_proxy.KillAndWait(SIGTERM));
-  {
-    unique_ptr<KuduInsert> insert(table->NewInsert());
-    auto* row = insert->mutable_row();
-    GenerateDataForRow(schema, 100, &rng, row);
-    const auto s = session->Apply(insert.release());
-    ASSERT_TRUE(s.IsIOError()) << s.ToString();
-    ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
-  }
+    client::sp::shared_ptr<KuduTable> table;
+    ASSERT_OK(client->OpenTable(kTableName, &table));
+
+    // Create a session and explicitly set the flush mode to AUTO_FLUSH_SYNC
+    // to send every operation when calling Apply().
+    client::sp::shared_ptr<KuduSession> session(client->NewSession());
+    ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+    ThreadSafeRandom rng(SeedRandom());
+    for (auto i = 0; i < 10; ++i) {
+      unique_ptr<KuduInsert> insert(table->NewInsert());
+      auto* row = insert->mutable_row();
+      GenerateDataForRow(schema, i, &rng, row);
+      ASSERT_OK(session->Apply(insert.release()));
+    }
+    // Call Flush() just in case, but it's a no-op effectively since the chosen
+    // session flush mode.
+    ASSERT_OK(session->Flush());
+
+    // Read the data back.
+    {
+      KuduScanner scanner(table.get());
+      ASSERT_OK(scanner.SetTimeoutMillis(kTimeout.ToMilliseconds()));
+      ASSERT_OK(scanner.Open());
+      ASSERT_TRUE(scanner.HasMoreRows());
+      KuduScanBatch batch;
+
+      int32_t idx = 0;
+      while (scanner.HasMoreRows()) {
+        ASSERT_OK(scanner.NextBatch(&batch));
+        for (const auto& row : batch) {
+          int32_t value;
+          ASSERT_OK(row.GetInt32(0, &value));
+          ASSERT_EQ(idx++, value);
+        }
+      }
+      ASSERT_EQ(10, idx);
+    }
 
-  // Try reading the data now: this expected to fail since the client works only
-  // through the advertised addressed.
-  {
-    KuduScanner scanner(table.get());
-    ASSERT_OK(scanner.SetFaultTolerant());
-    ASSERT_OK(scanner.SetTimeoutMillis(kTimeout.ToMilliseconds()));
-    const auto s = scanner.Open();
-    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
-    ASSERT_STR_MATCHES(s.ToString(),
-        "(timed out after deadline expired|exceeded configured scan timeout)");
-  }
+    // Make sure the client indeed works through the RPC addresses advertised by
+    // proxy: stop the proxy and check if client can succeed in writing any data
+    // to the table.
+    for (auto i = 0; i < T; ++i) {
+      t_proxy_cleanups[i].cancel();
+      ASSERT_OK(t_proxies[i]->KillAndWait(SIGTERM));
+    }
+    {
+      unique_ptr<KuduInsert> insert(table->NewInsert());
+      auto* row = insert->mutable_row();
+      GenerateDataForRow(schema, 100, &rng, row);
+      const auto s = session->Apply(insert.release());
+      ASSERT_TRUE(s.IsIOError()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
+    }
 
-  // Meanwhile, DDL operations should be still possible: connections to master
-  // are still being proxied as needed, and master and tablet servers
-  // communicate via standard, non-proxied RPC endpoints.
-  {
-    unique_ptr<KuduTableAlterer> alt(client->NewTableAlterer(kTableName));
-    alt->AlterColumn("string_val")->RenameTo("str_val");
-    ASSERT_OK(alt->Alter());
-  }
+    // Try reading the data now: this expected to fail since the client works
+    // only through the advertised addresses.
+    {
+      KuduScanner scanner(table.get());
+      ASSERT_OK(scanner.SetTimeoutMillis(kTimeout.ToMilliseconds()));
+      const auto s = scanner.Open();
+      ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+      ASSERT_STR_MATCHES(s.ToString(),
+          "(timed out after deadline expired|exceeded configured scan timeout)");
+    }
 
-  // Make sure the client communicates with master via the advertised addresses:
-  // once the TCP proxy is shut down, client should not be able to reach master
-  // to perform a DDL operation.
-  m_proxy_cleanup.cancel();
-  ASSERT_OK(m_proxy.KillAndWait(SIGTERM));
-  {
-    unique_ptr<KuduTableAlterer> alt(client->NewTableAlterer(kTableName));
-    alt->AlterColumn("str_val")->RenameTo("string_val");
-    const auto s = alt->Alter();
-    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
-    ASSERT_STR_CONTAINS(s.ToString(), "AlterTable passed its deadline");
-    ASSERT_STR_CONTAINS(s.ToString(), "Client connection negotiation failed");
+    // Meanwhile, DDL operations should be still possible: connections to
+    // masters are still being proxied as needed, and masters and tablet servers
+    // communicate via standard, non-proxied RPC endpoints.
+    {
+      unique_ptr<KuduTableAlterer> alt(client->NewTableAlterer(kTableName));
+      alt->AlterColumn("string_val")->RenameTo("str_val");
+      ASSERT_OK(alt->Alter());
+    }
+
+    // Make sure the client communicates with masters via the advertised
+    // addresses: once the corresponding TCP proxy is shut down, the client
+    // should not be able to reach the master to perform a DDL operation.
+    for (auto i = 0; i < M; ++i) {
+      m_proxy_cleanups[i].cancel();
+      ASSERT_OK(m_proxies[i]->KillAndWait(SIGTERM));
+    }
+    {
+      unique_ptr<KuduTableAlterer> alt(client->NewTableAlterer(kTableName));
+      alt->AlterColumn("str_val")->RenameTo("string_val");
+      const auto s = alt->Alter();
+      ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(), "AlterTable passed its deadline");
+      ASSERT_STR_CONTAINS(s.ToString(), "Client connection negotiation failed");
+    }
   }
+
+ protected:
+  static constexpr const char* const kIpAddr = "127.0.0.1";
+  static constexpr const char* const kProxyCmdPattern =
+      "trap \"kill %1\" EXIT; $0 -knv -l $1 $2 <$4 | $0 -nv $1 $3 >$4";
+
+  // Full path to the nc/netcat utility (if present).
+  string nc_;
+
+  array<uint16_t, M> m_proxied_ports_;
+  array<HostPort, M> m_proxied_addrs_;
+  array<uint16_t, M> m_proxy_advertised_ports_;
+  array<HostPort, M> m_proxy_advertised_addrs_;
+
+  array<uint16_t, T> t_proxied_ports_;
+  array<HostPort, T> t_proxied_addrs_;
+  array<uint16_t, T> t_proxy_advertised_ports_;
+  array<HostPort, T> t_proxy_advertised_addrs_;
+
+  unique_ptr<ExternalMiniCluster> cluster_;
+};
+
+typedef ClientProxiedRpcTest<1, 1> ClientProxiedRpc1M1Test;
+TEST_F(ClientProxiedRpc1M1Test, Basic) {
+  NO_FATALS(Run());
+}
+
+typedef ClientProxiedRpcTest<1, 3> ClientProxiedRpc1M3Test;
+TEST_F(ClientProxiedRpc1M3Test, Basic) {
+  NO_FATALS(Run());
+}
+
+typedef ClientProxiedRpcTest<3, 1> ClientProxiedRpc3M1Test;
+TEST_F(ClientProxiedRpc3M1Test, Basic) {
+  NO_FATALS(Run());
+}
+
+typedef ClientProxiedRpcTest<3, 3> ClientProxiedRpc3M3Test;
+TEST_F(ClientProxiedRpc3M3Test, Basic) {
+  NO_FATALS(Run());
 }
 
 } // namespace kudu
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index 296765b21..79c83ee7c 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -38,6 +38,7 @@
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/hms/hms_catalog.h"
 #include "kudu/master/catalog_manager.h"
@@ -60,6 +61,7 @@
 #include "kudu/tserver/tablet_service.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/flag_validators.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/maintenance_manager.h"
 #include "kudu/util/monotime.h"
@@ -112,8 +114,18 @@ DEFINE_string(location_mapping_cmd, "",
               "characters from the set [a-zA-Z0-9_-.]. If the cluster is not "
               "using location awareness features this flag should not be set.");
 
+DEFINE_string(master_rpc_proxy_advertised_addresses, "",
+              "RPC endpoints of all masters in this cluster exposed to the "
+              "external network via a TCP proxy. This is a comma-separated "
+              "list of the masters' proxy-advertised RPC addresses, each "
+              "specified via the --rpc_proxy_advertised_addresses "
+              "flag for a single master.");
+TAG_FLAG(master_rpc_proxy_advertised_addresses, experimental);
+
 DECLARE_bool(txn_manager_lazily_initialized);
 DECLARE_bool(txn_manager_enabled);
+DECLARE_string(master_addresses);
+DECLARE_string(rpc_proxy_advertised_addresses);
 
 using kudu::consensus::RaftPeerPB;
 using kudu::fs::ErrorHandlerType;
@@ -129,6 +141,7 @@ using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using std::vector;
+using strings::Split;
 using strings::Substitute;
 
 namespace kudu {
@@ -140,7 +153,40 @@ class RpcContext;
 namespace kudu {
 namespace master {
 
+
 namespace {
+
+// This validator issues a warning (not an error) to allow for a temporary
+// configurations when adding a new master.
+bool ValidateMultiMasterProxiedRpcFlags() {
+  if (FLAGS_rpc_proxy_advertised_addresses.empty()) {
+    return true;
+  }
+  const vector<string> master_addrs = Split(FLAGS_master_addresses, ",");
+  if (master_addrs.size() <= 1) {
+    return true;
+  }
+  // Make sure --master_rpc_proxy_advertised_addresses is set if a multi-master
+  // flags are detected and --rpc_proxy_advertised_addresses is set as well.
+  // Also, check whether the number of masters in the internal network
+  // corresponds to the number of masters advertised for clients in external
+  // networks.
+  if (FLAGS_master_rpc_proxy_advertised_addresses.empty()) {
+    LOG(WARNING) << Substitute(
+        "--master_rpc_proxy_advertised_addresses should be set as well "
+        "when configuring RPC proxying in a multi-master cluster");
+  }
+  const vector<string> master_proxy_addrs =
+      Split(FLAGS_master_rpc_proxy_advertised_addresses, ",");
+  if (master_proxy_addrs.size() != master_addrs.size()) {
+    LOG(WARNING) << Substitute(
+        "--master_rpc_proxy_advertised_addresses and --master_addresses "
+        "should have same number of elements");
+  }
+  return true;
+}
+GROUP_FLAG_VALIDATOR(multi_master_pp, &ValidateMultiMasterProxiedRpcFlags);
+
 constexpr const char* kReplaceMasterMessage =
     "this master may return incorrect results and should be replaced";
 void CrashMasterOnDiskError(const string& uuid) {
@@ -236,6 +282,17 @@ Status Master::Init() {
       FLAGS_tsk_rotation_seconds,
       messenger_->shared_token_verifier()));
 
+  if (!FLAGS_master_rpc_proxy_advertised_addresses.empty()) {
+    vector<HostPort> host_ports;
+    RETURN_NOT_OK(HostPort::ParseStrings(
+        FLAGS_master_rpc_proxy_advertised_addresses, kDefaultPort, &host_ports));
+    if (host_ports.empty()) {
+      return Status::InvalidArgument(
+          "the set of RPC advertised master addresses is empty");
+    }
+    master_rpc_proxy_advertised_hostports_ = std::move(host_ports);
+  }
+
   state_ = kInitialized;
   return Status::OK();
 }
@@ -544,26 +601,37 @@ Status Master::ListMasters(vector<ServerEntryPB>* masters,
   }
 
   // For distributed master configuration.
-  // TODO(aserbin): update this to work with proxied RPCs
   for (const auto& peer : config.peers()) {
-    HostPort hp = HostPortFromPB(peer.last_known_addr());
+    const auto hp = HostPortFromPB(peer.last_known_addr());
     ServerEntryPB peer_entry;
-    Status s = GetMasterEntryForHost(messenger_, hp, &peer_entry);
-    if (!s.ok()) {
-      s = s.CloneAndPrepend(
-          Substitute("Unable to get registration information for peer $0 ($1)",
-                     peer.permanent_uuid(),
-                     hp.ToString()));
+    if (auto s = GetMasterEntryForHost(messenger_, hp, &peer_entry); !s.ok()) {
+      const auto errmsg = Substitute("unable to get registration information for peer $0",
+                                     peer.permanent_uuid());
+      s = s.CloneAndPrepend(errmsg);
+      if (use_external_addr) {
+        StatusToPB(Status::IllegalState(errmsg), peer_entry.mutable_error());
+      } else {
+        StatusToPB(s, peer_entry.mutable_error());
+      }
       LOG(WARNING) << s.ToString();
-      StatusToPB(s, peer_entry.mutable_error());
     } else if (peer_entry.instance_id().permanent_uuid() != peer.permanent_uuid()) {
-      StatusToPB(Status::IllegalState(
-          Substitute("mismatched UUIDs: expected UUID $0 from master at $1, but got UUID $2",
-                     peer.permanent_uuid(),
-                     hp.ToString(),
-                     peer_entry.instance_id().permanent_uuid())),
+      StatusToPB(Status::IllegalState(Substitute("UUID mismatch: $0 vs $1 expected",
+                                                 peer_entry.instance_id().permanent_uuid(),
+                                                 peer.permanent_uuid())),
                  peer_entry.mutable_error());
+      LOG(WARNING) << Substitute(
+          "UUID mismatch: $0 vs $1 expected for master at $2",
+          peer_entry.instance_id().permanent_uuid(), peer.permanent_uuid(), hp.ToString());
+    }
+    if (use_external_addr && peer_entry.has_registration()) {
+      auto* reg = peer_entry.mutable_registration();
+      reg->mutable_rpc_addresses()->Swap(reg->mutable_rpc_proxy_addresses());
+      reg->clear_rpc_proxy_addresses();
+
+      reg->mutable_http_addresses()->Swap(reg->mutable_http_proxy_addresses());
+      reg->clear_http_proxy_addresses();
     }
+
     masters->emplace_back(std::move(peer_entry));
   }
   return Status::OK();
@@ -724,5 +792,18 @@ Status Master::RemoveMaster(const HostPort& hp, const string& uuid, rpc::RpcCont
                                                        matching_uuid, rpc);
 }
 
+const vector<HostPort>& Master::GetProxyAdvertisedHostPorts() const {
+  // In case of multi-master configuration or a single-master configuration
+  // where --master_rpc_proxy_advertised_addresses is specified, get the list
+  // of proxy advertised <host, port> pairs from that flag.
+  DCHECK_NE(kStopped, state_);
+  if (!master_rpc_proxy_advertised_hostports_.empty()) {
+    return master_rpc_proxy_advertised_hostports_;
+  }
+
+  // Otherwise, return whatever is set for --rpc_proxy_advertised_addresses
+  return rpc_server()->GetProxyAdvertisedHostPorts();
+}
+
 } // namespace master
 } // namespace kudu
diff --git a/src/kudu/master/master.h b/src/kudu/master/master.h
index 49b6541fc..38752e032 100644
--- a/src/kudu/master/master.h
+++ b/src/kudu/master/master.h
@@ -29,12 +29,12 @@
 #include "kudu/kserver/kserver.h"
 #include "kudu/master/master_options.h"
 #include "kudu/server/rpc_server.h"
+#include "kudu/util/net/net_util.h"
 #include "kudu/util/promise.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
 
-class HostPort;
 class MaintenanceManager;
 class MonoDelta;
 class MonoTime;
@@ -150,11 +150,15 @@ class Master : public kserver::KuduServer {
   }
 
   // A shortcut to get addresses this master server is configured with
-  // for processing RPCs proxied from an external network.
+  // for processing RPCs proxied from external networks.
   const std::vector<Sockaddr>& rpc_proxied_addresses() const {
     return rpc_server()->GetRpcProxiedAddresses();
   }
 
+  // Return addresses advertised at a TCP proxy for clients connecting from
+  // external networks.
+  const std::vector<HostPort>& GetProxyAdvertisedHostPorts() const;
+
  private:
   friend class MasterTest;
   friend class CatalogManager;
@@ -225,6 +229,13 @@ class Master : public kserver::KuduServer {
 
   scoped_refptr<Thread> expired_reserved_tables_deleter_thread_;
 
+  // RPC endpoints of all masters in this cluster in the external network.
+  // These are sourced from the --master_rpc_proxy_advertised_addresses flag.
+  // These are assumed to be proxied/forwarded to the corresponding RPC
+  // endpoints of this cluster's masters, where latter are specified by
+  // --rpc_proxied_addresses for each master correspondingly.
+  std::vector<HostPort> master_rpc_proxy_advertised_hostports_;
+
   DISALLOW_COPY_AND_ASSIGN(Master);
 };
 
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index 7af0bd754..234008826 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -51,7 +51,6 @@
 #include "kudu/security/token.pb.h"
 #include "kudu/security/token_signer.h"
 #include "kudu/security/token_verifier.h"
-#include "kudu/server/rpc_server.h"
 #include "kudu/server/server_base.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/flag_tags.h"
@@ -814,11 +813,10 @@ void MasterServiceImpl::ConnectToMaster(const ConnectToMasterRequestPB* /*req*/,
   // Set the info about the other masters, so that the client can verify
   // it has the full set of info.
   {
-    // Check if the request came to the dedicated RPC endpoint meaning
-    // it's been proxied from the outside network.
+    // Check if the request came through the dedicated RPC endpoint meaning
+    // it's been proxied from external network.
     if (IsAddrOneOf(rpc->local_address(), server_->rpc_proxied_addresses())) {
-      // TODO(aserbin): adapt this to multi-master configuration
-      for (const auto& hp : server_->rpc_server()->GetProxyAdvertisedHostPorts()) {
+      for (const auto& hp : server_->GetProxyAdvertisedHostPorts()) {
         *resp->add_master_addrs() = HostPortToPB(hp);
       }
     } else {
diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc b/src/kudu/mini-cluster/external_mini_cluster.cc
index 480581adb..2cb5187dd 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster.cc
@@ -26,6 +26,7 @@
 #include <memory>
 #include <string>
 #include <thread>
+#include <type_traits>
 #include <unordered_set>
 #include <utility>
 
@@ -97,6 +98,7 @@ using kudu::tserver::ListTabletsRequestPB;
 using kudu::tserver::ListTabletsResponsePB;
 using kudu::tserver::TabletServerAdminServiceProxy;
 using kudu::tserver::TabletServerServiceProxy;
+using std::back_inserter;
 using std::copy;
 using std::map;
 using std::pair;
@@ -633,7 +635,16 @@ Status ExternalMiniCluster::AddTabletServer() {
   vector<string> extra_flags;
   RETURN_NOT_OK(AddTimeSourceFlags(idx, &extra_flags));
   auto flags = SubstituteInFlags(opts_.extra_tserver_flags, idx);
-  copy(flags.begin(), flags.end(), std::back_inserter(extra_flags));
+  copy(flags.begin(), flags.end(), back_inserter(extra_flags));
+
+  // Add custom tablet server flags, specific to this particular tablet server
+  // instance.
+  if (!opts_.t_custom_flags.empty()) {
+    CHECK_EQ(opts_.num_tablet_servers, opts_.t_custom_flags.size());
+    const auto& custom_flags = opts_.t_custom_flags[idx];
+    copy(custom_flags.begin(), custom_flags.end(), back_inserter(extra_flags));
+  }
+
   opts.extra_flags = extra_flags;
   if (!opts_.tserver_alias_prefix.empty()) {
     opts.extra_flags.emplace_back(
@@ -657,7 +668,8 @@ Status ExternalMiniCluster::AddTabletServer() {
   return Status::OK();
 }
 
-Status ExternalMiniCluster::CreateMaster(const vector<HostPort>& master_rpc_addrs, int idx,
+Status ExternalMiniCluster::CreateMaster(const vector<HostPort>& master_rpc_addrs,
+                                         size_t idx,
                                          scoped_refptr<ExternalMaster>* master) {
   DCHECK_LT(idx, master_rpc_addrs.size());
   vector<string> flags;
@@ -720,9 +732,16 @@ Status ExternalMiniCluster::CreateMaster(const vector<HostPort>& master_rpc_addr
       flags.emplace_back(Substitute("--ranger_kms_url=$0", ranger_kms_->url()));
     }
   }
-  // Add custom master flags.
+  // Add extra master flags, common for all masters in the cluster.
   copy(opts_.extra_master_flags.begin(), opts_.extra_master_flags.end(),
-       std::back_inserter(flags));
+       back_inserter(flags));
+
+  // Add custom master flags, specific to this particular master instance.
+  if (!opts_.m_custom_flags.empty()) {
+    CHECK_EQ(opts_.num_masters, opts_.m_custom_flags.size());
+    const auto& custom_flags = opts_.m_custom_flags[idx];
+    copy(custom_flags.begin(), custom_flags.end(), back_inserter(flags));
+  }
 
   string daemon_id = Substitute("master-$0", idx);
 
diff --git a/src/kudu/mini-cluster/external_mini_cluster.h b/src/kudu/mini-cluster/external_mini_cluster.h
index b06e66ccb..e1ab188b0 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.h
+++ b/src/kudu/mini-cluster/external_mini_cluster.h
@@ -19,6 +19,7 @@
 
 #include <sys/types.h>
 
+#include <cstddef>
 #include <cstdint>
 #include <functional>
 #include <map>
@@ -48,6 +49,7 @@ class Env;
 class NodeInstancePB;
 class Sockaddr;
 class Subprocess;
+
 namespace ranger {
 class MiniRanger;
 }  // namespace ranger
@@ -140,6 +142,9 @@ enum class BuiltinNtpConfigMode {
 };
 #endif
 
+// TODO(aserbin): maybe, turn this struct into a class template based on number
+//                of master and tablet servers, changing std::vector to
+//                std::array for {masters,tservers}_custom_flags
 struct ExternalMiniClusterOptions {
   ExternalMiniClusterOptions();
 
@@ -192,6 +197,30 @@ struct ExternalMiniClusterOptions {
   // If unset, addresses are assigned automatically.
   std::vector<HostPort> master_rpc_addresses;
 
+  // Custom flags for masters in the cluster. These are per-instance,
+  // i.e. not common across all masters in the cluster:
+  // 'm_custom_flags[i]' are the flags for master at index 'i'.
+  //
+  // If not empty, must contain the same number of elements as the number
+  // of masters in the cluster. In other words, the invariant is
+  //
+  //   m_custom_flags.empty() || m_custom_flags.size() == num_masters
+  //
+  // Default: empty
+  std::vector<std::vector<std::string>> m_custom_flags;
+
+  // Custom flags for tablet servers in the cluster. These are per-instance,
+  // i.e. not common across all tablet servers in the cluster:
+  // 't_custom_flags[i]' are the flags for tablet servers at index 'i'.
+  //
+  // If not empty, must contain the same number of elements as the number
+  // of tablet servers in the cluster. In other words, the invariant is
+  //
+  //   t_custom_flags.empty() || t_custom_flags.size() == num_tablet_servers
+  //
+  // Default: empty
+  std::vector<std::vector<std::string>> t_custom_flags;
+
   // Options to configure the MiniKdc before starting it up.
   // Only used when 'enable_kerberos' is 'true'.
   MiniKdcOptions mini_kdc_options;
@@ -559,7 +588,8 @@ class ExternalMiniCluster : public MiniCluster {
   //
   // It's expected that the port for the master at 'idx' is reserved, and that
   // the master can be run with the --rpc_reuseport flag.
-  Status CreateMaster(const std::vector<HostPort>& master_rpc_addrs, int idx,
+  Status CreateMaster(const std::vector<HostPort>& master_rpc_addrs,
+                      size_t idx,
                       scoped_refptr<ExternalMaster>* master);
 
  private: