You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2020/04/16 17:45:18 UTC

[kudu] branch master updated (fbde237 -> be68ce8)

This is an automated email from the ASF dual-hosted git repository.

todd pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from fbde237  Add basic support for UNIX domain sockets
     new c7c4d47  Avoid calling Schema::find_column() once per RowBlock in columnar serialization
     new adf5d9f  columnar_serialization: avoid preallocating 8MB per column
     new cd95fef  mini-cluster: exclude libpcre for license check
     new be68ce8  client/tserver: add support for connecting over unix domain sockets

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../relocate_binaries_for_mini_cluster.py          |  5 +-
 src/kudu/client/client-internal.cc                 |  9 ++-
 src/kudu/client/client-test.cc                     | 30 +++++++-
 src/kudu/client/meta_cache.cc                      | 40 +++++++++-
 src/kudu/client/meta_cache.h                       |  7 ++
 src/kudu/common/columnar_serialization.cc          | 61 ++++++++-------
 src/kudu/common/columnar_serialization.h           | 47 +++++++++---
 src/kudu/common/wire_protocol-test.cc              | 14 ++--
 src/kudu/common/wire_protocol.cc                   |  2 +
 src/kudu/common/wire_protocol.proto                |  5 ++
 src/kudu/master/catalog_manager.cc                 |  3 +
 src/kudu/master/master.proto                       |  5 ++
 src/kudu/rpc/acceptor_pool.cc                      | 15 +++-
 src/kudu/server/rpc_server.cc                      | 12 ++-
 src/kudu/server/rpc_server.h                       |  4 +
 src/kudu/server/server_base.cc                     | 17 +++++
 src/kudu/tserver/heartbeater.cc                    |  8 ++
 src/kudu/tserver/tablet_service.cc                 | 87 ++++++++++++++--------
 src/kudu/util/net/sockaddr.cc                      |  3 +
 src/kudu/util/net/sockaddr.h                       | 12 +++
 20 files changed, 295 insertions(+), 91 deletions(-)


[kudu] 04/04: client/tserver: add support for connecting over unix domain sockets

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit be68ce81beeb708edfc0545695e72282506f3845
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Thu Apr 9 16:42:44 2020 -0700

    client/tserver: add support for connecting over unix domain sockets
    
    This adds new experiental flags -rpc_listen_on_unix_domain_socket and
    -client_use_unix_domain_sockets. The former makes the RPC server bind to
    a unix socket and advertise this to the kudu master as part of the TS
    registration. The latter makes the client attempt to connect via a
    domain socket when it sees such a socket path advertised.
    
    Note that this makes one behavioral change even when those flags are not
    enabled: we now consider any tablet server with a loopback IP to be
    "local" (and thus a candidate for unix domain socket connection). This
    mostly affects the MiniCluster where tablet servers register using
    various IPs in the loopback range 127.0.0.0/8, and was necessary in
    order to test unix socket connections from the client.
    
    I perf tested by scanning an int32 column from a table with 800M rows
    and using 'perf stat -a -r10' to look at total CPU consumption across
    the tserver and system. There's a fair amount of variability here due to
    inconsistent scheduling to cores/numa nodes, but seems like the unix
    socket on average is 10% faster or so in terms of total cycles.
    
    TCP sockets:
     Performance counter stats for 'system wide' (10 runs):
    
            148,367.78 msec cpu-clock                 #   87.755 CPUs utilized            ( +-  4.82% )
               101,755      context-switches          #    0.686 K/sec                    ( +-  9.03% )
                   866      cpu-migrations            #    0.006 K/sec                    ( +-  6.42% )
                21,440      page-faults               #    0.145 K/sec                    ( +- 19.32% )
        43,847,792,445      cycles                    #    0.296 GHz                      ( +-  3.77% )  (1.01%)
        50,668,281,554      instructions              #    1.16  insn per cycle           ( +-  1.80% )  (1.11%)
         7,676,337,185      branches                  #   51.739 M/sec                    ( +-  4.61% )  (0.85%)
            69,634,718      branch-misses             #    0.91% of all branches          ( +-  4.72% )  (0.84%)
    
                1.6907 +- 0.0811 seconds time elapsed  ( +-  4.80% )
    
    Unix sockets:
    
     Performance counter stats for 'system wide' (10 runs):
    
            136,877.86 msec cpu-clock                 #   87.638 CPUs utilized            ( +-  2.67% )
                77,376      context-switches          #    0.565 K/sec                    ( +- 14.16% )
                   846      cpu-migrations            #    0.006 K/sec                    ( +-  6.58% )
                23,430      page-faults               #    0.171 K/sec                    ( +- 39.77% )
        39,106,012,185      cycles                    #    0.286 GHz                      ( +-  4.26% )  (0.99%)
        48,957,283,894      instructions              #    1.25  insn per cycle           ( +-  2.24% )  (1.08%)
         7,635,756,771      branches                  #   55.785 M/sec                    ( +-  3.54% )  (0.83%)
            69,900,882      branch-misses             #    0.92% of all branches          ( +-  5.14% )  (0.82%)
    
                1.5619 +- 0.0415 seconds time elapsed  ( +-  2.66% )
    
    Change-Id: I0c390b4209ac7e08cd45239c49499fb0b96405d0
    Reviewed-on: http://gerrit.cloudera.org:8080/15701
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/client/client-internal.cc  |  9 ++++++++-
 src/kudu/client/client-test.cc      | 30 ++++++++++++++++++++++++++--
 src/kudu/client/meta_cache.cc       | 40 +++++++++++++++++++++++++++++++++++--
 src/kudu/client/meta_cache.h        |  7 +++++++
 src/kudu/common/wire_protocol.cc    |  2 ++
 src/kudu/common/wire_protocol.proto |  5 +++++
 src/kudu/master/catalog_manager.cc  |  3 +++
 src/kudu/master/master.proto        |  5 +++++
 src/kudu/rpc/acceptor_pool.cc       | 15 +++++++++++---
 src/kudu/server/rpc_server.cc       | 12 ++++++++++-
 src/kudu/server/rpc_server.h        |  4 ++++
 src/kudu/server/server_base.cc      | 17 ++++++++++++++++
 src/kudu/tserver/heartbeater.cc     |  8 ++++++++
 src/kudu/util/net/sockaddr.cc       |  3 +++
 src/kudu/util/net/sockaddr.h        | 12 +++++++++++
 15 files changed, 163 insertions(+), 9 deletions(-)

diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index 9243836..f7bf98c 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -443,7 +443,14 @@ Status KuduClient::Data::InitLocalHostNames() {
 }
 
 bool KuduClient::Data::IsLocalHostPort(const HostPort& hp) const {
-  return ContainsKey(local_host_names_, hp.host());
+  if (ContainsKey(local_host_names_, hp.host())) {
+    return true;
+  }
+
+  // It may be that HostPort is a numeric form (non-reversable) address like
+  // 127.0.1.1, etc. In that case we can still consider it local.
+  Sockaddr addr;
+  return addr.ParseFromNumericHostPort(hp).ok() && addr.IsAnyLocalAddress();
 }
 
 bool KuduClient::Data::IsTabletServerLocal(const RemoteTabletServer& rts) const {
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 6961ff3..4c57900 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -125,11 +125,13 @@
 DECLARE_bool(allow_unsafe_replication_factor);
 DECLARE_bool(catalog_manager_support_live_row_count);
 DECLARE_bool(catalog_manager_support_on_disk_size);
+DECLARE_bool(client_use_unix_domain_sockets);
 DECLARE_bool(fail_dns_resolution);
 DECLARE_bool(location_mapping_by_uuid);
 DECLARE_bool(log_inject_latency);
 DECLARE_bool(master_support_connect_to_master_rpc);
 DECLARE_bool(mock_table_metrics_for_testing);
+DECLARE_bool(rpc_listen_on_unix_domain_socket);
 DECLARE_bool(rpc_trace_negotiation);
 DECLARE_bool(scanner_inject_service_unavailable_on_continue_scan);
 DECLARE_int32(flush_threshold_mb);
@@ -156,13 +158,14 @@ DECLARE_string(user_acl);
 DEFINE_int32(test_scan_num_rows, 1000, "Number of rows to insert and scan");
 
 METRIC_DECLARE_counter(block_manager_total_bytes_read);
-METRIC_DECLARE_counter(rpcs_queue_overflow);
 METRIC_DECLARE_counter(location_mapping_cache_hits);
 METRIC_DECLARE_counter(location_mapping_cache_queries);
+METRIC_DECLARE_counter(rpc_connections_accepted_unix_domain_socket);
+METRIC_DECLARE_counter(rpcs_queue_overflow);
 METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetMasterRegistration);
-METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTabletLocations);
 METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTableLocations);
 METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTableSchema);
+METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTabletLocations);
 METRIC_DECLARE_histogram(handler_latency_kudu_tserver_TabletServerService_Scan);
 
 using base::subtle::Atomic32;
@@ -6646,5 +6649,28 @@ TEST_F(ClientTest, TestProjectionPredicatesFuzz) {
             unordered_set<string>(rows.begin(), rows.end())) << rows;
 }
 
+class ClientTestUnixSocket : public ClientTest {
+ public:
+  void SetUp() override {
+    FLAGS_rpc_listen_on_unix_domain_socket = true;
+    FLAGS_client_use_unix_domain_sockets = true;
+    ClientTest::SetUp();
+  }
+};
+
+TEST_F(ClientTestUnixSocket, TestConnectViaUnixSocket) {
+  static constexpr int kNumRows = 100;
+  NO_FATALS(InsertTestRows(client_table_.get(), kNumRows));
+  ASSERT_EQ(kNumRows, CountRowsFromClient(client_table_.get()));
+
+  int total_unix_conns = 0;
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    auto counter = METRIC_rpc_connections_accepted_unix_domain_socket.Instantiate(
+        cluster_->mini_tablet_server(0)->server()->metric_entity());
+    total_unix_conns += counter->value();
+  }
+  ASSERT_EQ(1, total_unix_conns);
+}
+
 } // namespace client
 } // namespace kudu
diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc
index b005256..5fc227d 100644
--- a/src/kudu/client/meta_cache.cc
+++ b/src/kudu/client/meta_cache.cc
@@ -27,6 +27,7 @@
 #include <utility>
 #include <vector>
 
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <google/protobuf/repeated_field.h> // IWYU pragma: keep
 
@@ -48,11 +49,13 @@
 #include "kudu/rpc/rpc.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/net/dns_resolver.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/pb_util.h"
+#include "kudu/util/scoped_cleanup.h"
 
 using kudu::consensus::RaftPeerPB;
 using kudu::master::ANY_REPLICA;
@@ -71,6 +74,16 @@ using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
+// TODO(todd) before enabling by default, need to think about how this works with
+// docker/k8s -- I think the abstract namespace is scoped to a given k8s pod. We
+// probably need to have the client blacklist the socket if it attempts to use it
+// and can't connect.
+DEFINE_bool(client_use_unix_domain_sockets, false,
+            "Whether to try to connect to tablet servers using unix domain sockets. "
+            "This will only be attempted if the server has indicated that it is listening "
+            "on such a socket and the client is running on the same host.");
+TAG_FLAG(client_use_unix_domain_sockets, experimental);
+
 namespace kudu {
 namespace client {
 namespace internal {
@@ -85,8 +98,7 @@ void RemoteTabletServer::DnsResolutionFinished(const HostPort& hp,
                                                KuduClient* client,
                                                const StatusCallback& user_callback,
                                                const Status &result_status) {
-  unique_ptr<vector<Sockaddr>> scoped_addrs(addrs);
-
+  SCOPED_CLEANUP({ delete addrs; });
   Status s = result_status;
 
   if (s.ok() && addrs->empty()) {
@@ -129,6 +141,25 @@ void RemoteTabletServer::InitProxy(KuduClient* client, const StatusCallback& cb)
   }
 
   auto addrs = new vector<Sockaddr>();
+
+  if (FLAGS_client_use_unix_domain_sockets && unix_domain_socket_path_ &&
+      client->data_->IsLocalHostPort(hp)) {
+    Sockaddr unix_socket;
+    Status parse_status = unix_socket.ParseUnixDomainPath(*unix_domain_socket_path_);
+    if (!parse_status.ok()) {
+      KLOG_EVERY_N_SECS(WARNING, 60)
+          << Substitute("Tablet server $0 ($1) reported an invalid UNIX domain socket path '$2'",
+                        hp.ToString(), uuid_, *unix_domain_socket_path_);
+      // Fall through to normal TCP path.
+    } else {
+      VLOG(1) << Substitute("Will try to connect to UNIX socket $0 for local tablet server $1 ($2)",
+                            unix_socket.ToString(), hp.ToString(), uuid_);
+      addrs->emplace_back(unix_socket);
+      this->DnsResolutionFinished(hp, addrs, client, cb, Status::OK());
+      return;
+    }
+  }
+
   client->data_->dns_resolver_->ResolveAddressesAsync(
       hp, addrs, [=](const Status& s) {
         this->DnsResolutionFinished(hp, addrs, client, cb, s);
@@ -145,6 +176,11 @@ void RemoteTabletServer::Update(const master::TSInfoPB& pb) {
     rpc_hostports_.emplace_back(hostport_pb.host(), hostport_pb.port());
   }
   location_ = pb.location();
+  if (pb.has_unix_domain_socket_path()) {
+    unix_domain_socket_path_ = pb.unix_domain_socket_path();
+  } else {
+    unix_domain_socket_path_ = boost::none;
+  }
 }
 
 const string& RemoteTabletServer::permanent_uuid() const {
diff --git a/src/kudu/client/meta_cache.h b/src/kudu/client/meta_cache.h
index 2e9a216..f02305f 100644
--- a/src/kudu/client/meta_cache.h
+++ b/src/kudu/client/meta_cache.h
@@ -28,6 +28,7 @@
 #include <utility>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <glog/logging.h>
 #include <gtest/gtest_prod.h>
 
@@ -124,6 +125,12 @@ class RemoteTabletServer {
   std::string location_;
 
   std::vector<HostPort> rpc_hostports_;
+
+  // The path on which this server is listening for unix domain socket connections.
+  // This should only be used in the case that it can be determined that the tablet
+  // server is local to the client.
+  boost::optional<std::string> unix_domain_socket_path_;
+
   std::shared_ptr<tserver::TabletServerServiceProxy> proxy_;
 
   DISALLOW_COPY_AND_ASSIGN(RemoteTabletServer);
diff --git a/src/kudu/common/wire_protocol.cc b/src/kudu/common/wire_protocol.cc
index c423a02..cdc5dde 100644
--- a/src/kudu/common/wire_protocol.cc
+++ b/src/kudu/common/wire_protocol.cc
@@ -199,6 +199,8 @@ HostPort HostPortFromPB(const HostPortPB& host_port_pb) {
 Status AddHostPortPBs(const vector<Sockaddr>& addrs,
                       RepeatedPtrField<HostPortPB>* pbs) {
   for (const Sockaddr& addr : addrs) {
+    // Don't add unix domain sockets to the list of HostPorts.
+    if (!addr.is_ip()) continue;
     HostPortPB* pb = pbs->Add();
     if (addr.IsWildcard()) {
       RETURN_NOT_OK(GetFQDN(pb->mutable_host()));
diff --git a/src/kudu/common/wire_protocol.proto b/src/kudu/common/wire_protocol.proto
index 3b3bec2..3dce5a0 100644
--- a/src/kudu/common/wire_protocol.proto
+++ b/src/kudu/common/wire_protocol.proto
@@ -96,6 +96,11 @@ message ServerRegistrationPB {
 
   // Seconds since the epoch.
   optional int64 start_time = 5;
+
+  // The path of a UNIX domain socket where the server is listening.
+  // An '@' prefix indicates the abstract namespace. May be missing
+  // if this feature is not enabled.
+  optional string unix_domain_socket_path = 6;
 }
 
 message ServerEntryPB {
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 0f98119..05645f8 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -4940,6 +4940,9 @@ Status CatalogManager::BuildLocationsForTablet(
         ServerRegistrationPB reg;
         ts_desc->GetRegistration(&reg);
         tsinfo_pb->mutable_rpc_addresses()->Swap(reg.mutable_rpc_addresses());
+        if (reg.has_unix_domain_socket_path()) {
+          tsinfo_pb->set_unix_domain_socket_path(reg.unix_domain_socket_path());
+        }
         if (ts_desc->location()) tsinfo_pb->set_location(*(ts_desc->location()));
       } else {
         // If we've never received a heartbeat from the tserver, we'll fall back
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index bda76e2..9a7dbc6 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -431,6 +431,11 @@ message TSInfoPB {
   repeated HostPortPB rpc_addresses = 2;
 
   optional string location = 3;
+
+  // The path of a UNIX domain socket where the server is listening.
+  // An '@' prefix indicates the abstract namespace. May be missing
+  // if this feature is not enabled.
+  optional string unix_domain_socket_path = 4;
 }
 
 // Selector to specify policy for listing tablet replicas in
diff --git a/src/kudu/rpc/acceptor_pool.cc b/src/kudu/rpc/acceptor_pool.cc
index f466c59..84fbd18 100644
--- a/src/kudu/rpc/acceptor_pool.cc
+++ b/src/kudu/rpc/acceptor_pool.cc
@@ -54,6 +54,12 @@ METRIC_DEFINE_counter(server, rpc_connections_accepted,
                       "Number of incoming TCP connections made to the RPC server",
                       kudu::MetricLevel::kInfo);
 
+METRIC_DEFINE_counter(server, rpc_connections_accepted_unix_domain_socket,
+                      "RPC Connections Accepted via UNIX Domain Socket",
+                      kudu::MetricUnit::kConnections,
+                      "Number of incoming UNIX Domain Socket connections made to the RPC server",
+                      kudu::MetricLevel::kInfo);
+
 DEFINE_int32(rpc_acceptor_listen_backlog, 128,
              "Socket backlog parameter used when listening for RPC connections. "
              "This defines the maximum length to which the queue of pending "
@@ -71,9 +77,12 @@ AcceptorPool::AcceptorPool(Messenger* messenger, Socket* socket,
     : messenger_(messenger),
       socket_(socket->Release()),
       bind_address_(bind_address),
-      rpc_connections_accepted_(METRIC_rpc_connections_accepted.Instantiate(
-          messenger->metric_entity())),
-      closing_(false) {}
+      closing_(false) {
+  auto& accept_metric = bind_address.is_ip() ?
+      METRIC_rpc_connections_accepted :
+      METRIC_rpc_connections_accepted_unix_domain_socket;
+  rpc_connections_accepted_ = accept_metric.Instantiate(messenger->metric_entity());
+}
 
 AcceptorPool::~AcceptorPool() {
   Shutdown();
diff --git a/src/kudu/server/rpc_server.cc b/src/kudu/server/rpc_server.cc
index e1d8872..632a35e 100644
--- a/src/kudu/server/rpc_server.cc
+++ b/src/kudu/server/rpc_server.cc
@@ -121,7 +121,10 @@ Status RpcServer::Init(const shared_ptr<Messenger>& messenger) {
   RETURN_NOT_OK(ParseAddressList(options_.rpc_bind_addresses,
                                  options_.default_port,
                                  &rpc_bind_addresses_));
+
   for (const Sockaddr& addr : rpc_bind_addresses_) {
+    if (!addr.is_ip()) continue;
+
     if (IsPrivilegedPort(addr.port())) {
       LOG(WARNING) << "May be unable to bind to privileged port for address "
                    << addr.ToString();
@@ -171,6 +174,12 @@ Status RpcServer::RegisterService(unique_ptr<rpc::ServiceIf> service) {
   return Status::OK();
 }
 
+Status RpcServer::AddBindAddress(const Sockaddr& addr) {
+  CHECK_EQ(server_state_, INITIALIZED) << "must add bind addresses between Init() and Bind()";
+  rpc_bind_addresses_.emplace_back(addr);
+  return Status::OK();
+}
+
 Status RpcServer::Bind() {
   CHECK_EQ(server_state_, INITIALIZED);
 
@@ -182,8 +191,9 @@ Status RpcServer::Bind() {
     RETURN_NOT_OK(messenger_->AddAcceptorPool(
                     bind_addr,
                     &pool));
-    new_acceptor_pools.push_back(pool);
+    new_acceptor_pools.emplace_back(std::move(pool));
   }
+
   acceptor_pools_.swap(new_acceptor_pools);
 
   server_state_ = BOUND;
diff --git a/src/kudu/server/rpc_server.h b/src/kudu/server/rpc_server.h
index 06f462e..314b542 100644
--- a/src/kudu/server/rpc_server.h
+++ b/src/kudu/server/rpc_server.h
@@ -70,6 +70,10 @@ class RpcServer {
   }
 
   Status Init(const std::shared_ptr<rpc::Messenger>& messenger) WARN_UNUSED_RESULT;
+
+  // Add an additional address to bind and accept connections on.
+  Status AddBindAddress(const Sockaddr& addr) WARN_UNUSED_RESULT;
+
   // Services need to be registered after Init'ing, but before Start'ing.
   // The service's ownership will be given to a ServicePool.
   Status RegisterService(std::unique_ptr<rpc::ServiceIf> service) WARN_UNUSED_RESULT;
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index d8a12bf..89bb5b8 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -151,6 +151,12 @@ DEFINE_string(rpc_encryption, "optional",
 TAG_FLAG(rpc_authentication, evolving);
 TAG_FLAG(rpc_encryption, evolving);
 
+DEFINE_bool(rpc_listen_on_unix_domain_socket, false,
+            "Whether the RPC server should listen on a Unix domain socket. If enabled, "
+            "the RPC server will bind to a socket in the \"abstract namespace\" using "
+            "a name which uniquely identifies the server instance.");
+TAG_FLAG(rpc_listen_on_unix_domain_socket, experimental);
+
 DEFINE_string(rpc_tls_ciphers,
               kudu::security::SecurityDefaults::kDefaultTlsCiphers,
               "The cipher suite preferences to use for TLS-secured RPC connections. "
@@ -526,6 +532,17 @@ Status ServerBase::Init() {
   });
 
   RETURN_NOT_OK(rpc_server_->Init(messenger_));
+
+  if (FLAGS_rpc_listen_on_unix_domain_socket) {
+    VLOG(1) << "Enabling listening on unix domain socket.";
+    Sockaddr addr;
+    RETURN_NOT_OK_PREPEND(addr.ParseUnixDomainPath(Substitute("@kudu-$0", fs_manager_->uuid())),
+                          "unable to parse provided UNIX socket path");
+    RETURN_NOT_OK_PREPEND(rpc_server_->AddBindAddress(addr),
+                          "unable to add configured UNIX socket path to list of bind addresses "
+                          "for RPC server");
+  }
+
   RETURN_NOT_OK(rpc_server_->Bind());
 
   RETURN_NOT_OK_PREPEND(StartMetricsLogging(), "Could not enable metrics logging");
diff --git a/src/kudu/tserver/heartbeater.cc b/src/kudu/tserver/heartbeater.cc
index f3e6025..e3bf926 100644
--- a/src/kudu/tserver/heartbeater.cc
+++ b/src/kudu/tserver/heartbeater.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/tserver/heartbeater.h"
 
+#include <algorithm>
 #include <atomic>
 #include <cstdint>
 #include <functional>
@@ -337,6 +338,13 @@ Status Heartbeater::Thread::SetupRegistration(ServerRegistrationPB* reg) {
   RETURN_NOT_OK(CHECK_NOTNULL(server_->rpc_server())->GetAdvertisedAddresses(&addrs));
   RETURN_NOT_OK_PREPEND(AddHostPortPBs(addrs, reg->mutable_rpc_addresses()),
                         "Failed to add RPC addresses to registration");
+  auto unix_socket_it = std::find_if(addrs.begin(), addrs.end(),
+                                     [](const Sockaddr& addr) {
+                                       return addr.is_unix();
+                                     });
+  if (unix_socket_it != addrs.end()) {
+    reg->set_unix_domain_socket_path(unix_socket_it->UnixDomainPath());
+  }
 
   addrs.clear();
   if (server_->web_server()) {
diff --git a/src/kudu/util/net/sockaddr.cc b/src/kudu/util/net/sockaddr.cc
index b680294..f5d616c 100644
--- a/src/kudu/util/net/sockaddr.cc
+++ b/src/kudu/util/net/sockaddr.cc
@@ -89,7 +89,10 @@ Sockaddr::Sockaddr(const struct sockaddr_in& addr) :
 Status Sockaddr::ParseString(const string& s, uint16_t default_port) {
   HostPort hp;
   RETURN_NOT_OK(hp.ParseString(s, default_port));
+  return ParseFromNumericHostPort(hp);
+}
 
+Status Sockaddr::ParseFromNumericHostPort(const HostPort& hp) {
   struct in_addr addr;
   if (inet_pton(AF_INET, hp.host().c_str(), &addr) != 1) {
     return Status::InvalidArgument("Invalid IP address", hp.host());
diff --git a/src/kudu/util/net/sockaddr.h b/src/kudu/util/net/sockaddr.h
index cd50e50..e473c8b 100644
--- a/src/kudu/util/net/sockaddr.h
+++ b/src/kudu/util/net/sockaddr.h
@@ -32,6 +32,8 @@
 
 namespace kudu {
 
+class HostPort;
+
 /// Represents a sockaddr.
 ///
 /// Typically this wraps a sockaddr_in, but in the future will be extended to support
@@ -86,6 +88,12 @@ class Sockaddr {
   // Returns a bad Status if the input is malformed.
   Status ParseString(const std::string& s, uint16_t default_port);
 
+  // Parse a HostPort instance which must contain a hostname in numeric notation
+  // as described above.
+  //
+  // Note that this function will not handle resolving hostnames.
+  Status ParseFromNumericHostPort(const HostPort& hp);
+
   // Parse a UNIX domain path, storing the result in this Sockaddr object.
   // A leading '@' indicates the address should be in the UNIX domain "abstract
   // namespace" (see man unix(7)).
@@ -143,6 +151,10 @@ class Sockaddr {
     return family() == AF_INET;
   }
 
+  bool is_unix() const {
+    return family() == AF_UNIX;
+  }
+
   // Returns the stringified address in '1.2.3.4:<port>' format.
   std::string ToString() const;
 


[kudu] 02/04: columnar_serialization: avoid preallocating 8MB per column

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit adf5d9f5e847b4aadb82c540c98ac9ac707bfa1c
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Tue Apr 7 15:53:49 2020 -0700

    columnar_serialization: avoid preallocating 8MB per column
    
    Previously we would reserve an 8MB buffer for every column of data to be
    scanned. This wouldn't scale well for high number of concurrent queries
    with lots of columns.
    
    The new approach is to use the configured batch size and apportion that
    memory budget across the columns based on the size of those columns.
    It's not 100% accurate but at least shouldn't overshoot by hundreds of
    MB like the prior approach.
    
    Change-Id: I9b7ff78547792acbd975a606a02ec388dba3a8e8
    Reviewed-on: http://gerrit.cloudera.org:8080/15679
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/common/columnar_serialization.cc | 11 +++++++----
 src/kudu/common/columnar_serialization.h  |  8 +++++++-
 src/kudu/common/wire_protocol-test.cc     |  6 ++++--
 src/kudu/tserver/tablet_service.cc        | 11 +++++++----
 4 files changed, 25 insertions(+), 11 deletions(-)

diff --git a/src/kudu/common/columnar_serialization.cc b/src/kudu/common/columnar_serialization.cc
index 52a3425..6c481d9 100644
--- a/src/kudu/common/columnar_serialization.cc
+++ b/src/kudu/common/columnar_serialization.cc
@@ -588,10 +588,10 @@ void CopySelectedVarlenCellsFromColumn(const ColumnBlock& cblock,
 } // namespace internal
 
 ColumnarSerializedBatch::ColumnarSerializedBatch(const Schema& rowblock_schema,
-                                                 const Schema& client_schema) {
+                                                 const Schema& client_schema,
+                                                 int expected_batch_size_bytes) {
   // Initialize buffers for the columns.
-  // TODO(todd) don't pre-size these to 1MB per column -- quite
-  // expensive if there are a lot of columns!
+  int64_t row_bytes = client_schema.byte_size();
   columns_.reserve(client_schema.num_columns());
   for (const auto& schema_col : client_schema.columns()) {
     columns_.emplace_back();
@@ -600,7 +600,10 @@ ColumnarSerializedBatch::ColumnarSerializedBatch(const Schema& rowblock_schema,
     col.rowblock_schema_col_idx = rowblock_schema.find_column(schema_col.name());
     CHECK_NE(col.rowblock_schema_col_idx, -1);
 
-    col.data.reserve(1024 * 1024);
+    // Size the initial buffer based on the percentage of the total row that this column
+    // takes up. This isn't fully accurate because of costs like the null bitmap or varlen
+    // data, but tries to reasonably apportion the memory budget across the columns.
+    col.data.reserve(schema_col.type_info()->size() * expected_batch_size_bytes / row_bytes);
     if (schema_col.type_info()->physical_type() == BINARY) {
       col.varlen_data.emplace();
     }
diff --git a/src/kudu/common/columnar_serialization.h b/src/kudu/common/columnar_serialization.h
index b4862c7..f5c3d56 100644
--- a/src/kudu/common/columnar_serialization.h
+++ b/src/kudu/common/columnar_serialization.h
@@ -35,10 +35,16 @@ class ColumnarSerializedBatch {
  public:
   // 'rowblock_schema': the schema of the RowBlocks that will be passed to
   //                    AddRowBlock().
+  //
   // 'client_schema': the schema to be returned to the client, which may
   //                  contain a subset of columns
+  //
+  // 'expected_batch_size_bytes':
+  //      the batch size at which the caller expects to stop adding new rows to
+  //      this batch. This is is only a hint and does not affect correctness.
   ColumnarSerializedBatch(const Schema& rowblock_schema,
-                          const Schema& client_schema);
+                          const Schema& client_schema,
+                          int expected_batch_size_bytes);
 
   // Append the data in 'block' into this columnar batch.
   //
diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc
index 9ab534d..fb08521 100644
--- a/src/kudu/common/wire_protocol-test.cc
+++ b/src/kudu/common/wire_protocol-test.cc
@@ -298,6 +298,7 @@ TEST_F(WireProtocolTest, TestRowBlockToRowwisePB) {
 TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) {
   // Generate several blocks of random data.
   static constexpr int kNumBlocks = 3;
+  static constexpr int kBatchSizeBytes = 8192 * 1024;
   Arena arena(1024);
   std::list<RowBlock> blocks;
   for (int i = 0; i < kNumBlocks; i++) {
@@ -306,7 +307,7 @@ TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) {
   }
 
   // Convert all of the RowBlocks to a single serialized (concatenated) columnar format.
-  ColumnarSerializedBatch batch(schema_, schema_);
+  ColumnarSerializedBatch batch(schema_, schema_, kBatchSizeBytes);
   for (const auto& block : blocks) {
     batch.AddRowBlock(block);
   }
@@ -464,7 +465,8 @@ struct RowwiseConverter {
 
 struct ColumnarConverter {
   static void Run(const RowBlock& block) {
-    ColumnarSerializedBatch batch(*block.schema(), *block.schema());
+    constexpr int kBatchSizeBytes = 8192 * 1024;
+    ColumnarSerializedBatch batch(*block.schema(), *block.schema(), kBatchSizeBytes);
     batch.AddRowBlock(block);
   }
 
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index edfc1fd..927182c 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -801,13 +801,15 @@ class RowwiseResultSerializer : public ResultSerializer {
 class ColumnarResultSerializer : public ResultSerializer {
  public:
   static Status Create(uint64_t flags,
+                       int batch_size_bytes,
                        const Schema& scanner_schema,
                        const Schema& client_schema,
                        unique_ptr<ResultSerializer>* serializer) {
     if (flags & ~RowFormatFlags::COLUMNAR_LAYOUT) {
       return Status::InvalidArgument("Row format flags not supported with columnar layout");
     }
-    serializer->reset(new ColumnarResultSerializer(scanner_schema, client_schema));
+    serializer->reset(new ColumnarResultSerializer(
+        scanner_schema, client_schema, batch_size_bytes));
     return Status::OK();
   }
 
@@ -864,8 +866,9 @@ class ColumnarResultSerializer : public ResultSerializer {
 
  private:
   ColumnarResultSerializer(const Schema& scanner_schema,
-                           const Schema& client_schema)
-      : results_(scanner_schema, client_schema) {
+                           const Schema& client_schema,
+                           int batch_size_bytes)
+      : results_(scanner_schema, client_schema, batch_size_bytes) {
   }
 
   int64_t num_rows_ = 0;
@@ -920,7 +923,7 @@ class ScanResultCopier : public ScanResultCollector {
     }
     if (row_format_flags & COLUMNAR_LAYOUT) {
       return ColumnarResultSerializer::Create(
-          row_format_flags, scanner_schema, client_schema, &serializer_);
+          row_format_flags, batch_size_bytes_, scanner_schema, client_schema, &serializer_);
     }
     serializer_.reset(new RowwiseResultSerializer(batch_size_bytes_, row_format_flags));
     return Status::OK();


[kudu] 03/04: mini-cluster: exclude libpcre for license check

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit cd95fef01d19674a8c43784bc4f3cd0ef1f4b5ac
Author: Hao Hao <ha...@cloudera.com>
AuthorDate: Wed Apr 15 14:38:26 2020 -0700

    mini-cluster: exclude libpcre for license check
    
    While building mini cluster binaries on centos7, I saw error message
    'unknown license: lib/libpcre.so.1' during license check on artifact.
    By running lddtree on the binary, I found out libpcre is introduced
    as transitive dependency
    
    libcurl.so.4 => /root/kudu/thirdparty/installed/uninstrumented/lib/libcurl.so.4
            libk5crypto.so.3 => /lib64/libk5crypto.so.3
                libkrb5support.so.0 => /lib64/libkrb5support.so.0
                    libselinux.so.1 => /lib64/libselinux.so.1
                        libpcre.so.1 => /lib64/libpcre.so.1
    
    This patch excludes 'libpcre' for license check to account for licensing
    information only relevant to the binary artifacts.
    
    Change-Id: I0fc69eaf57025b1ce2f7857808e60218dc4f2771
    Reviewed-on: http://gerrit.cloudera.org:8080/15738
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 build-support/mini-cluster/relocate_binaries_for_mini_cluster.py | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/build-support/mini-cluster/relocate_binaries_for_mini_cluster.py b/build-support/mini-cluster/relocate_binaries_for_mini_cluster.py
index 21bed54..f3c0aad 100755
--- a/build-support/mini-cluster/relocate_binaries_for_mini_cluster.py
+++ b/build-support/mini-cluster/relocate_binaries_for_mini_cluster.py
@@ -52,7 +52,9 @@ KEY_PATH = 'path'
 PAT_SASL_LIBPLAIN = re.compile(r'libplain')
 
 # Exclude libraries that are (L)GPL-licensed and libraries that are not
-# portable across Linux kernel versions.
+# portable across Linux kernel versions. One exception is 'libpcre', which
+# is BSD-licensed. It is excluded because it is a transitive dependency
+# introduced by 'libselinux'.
 PAT_LINUX_LIB_EXCLUDE = re.compile(r"""(libpthread|
                                         libc|
                                         libstdc\+\+|
@@ -66,6 +68,7 @@ PAT_LINUX_LIB_EXCLUDE = re.compile(r"""(libpthread|
                                         libcom_err|
                                         libdb-[\d.]+|
                                         libselinux|
+                                        libpcre|
                                         libtinfo
                                        )\.so""", re.VERBOSE)
 


[kudu] 01/04: Avoid calling Schema::find_column() once per RowBlock in columnar serialization

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit c7c4d47ecca0ea0d90435dac736d22f4063d5507
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Tue Mar 31 12:03:09 2020 -0700

    Avoid calling Schema::find_column() once per RowBlock in columnar serialization
    
    Prior to this patch, each row block being serialized in the columnar
    format would result in a call to Schema::find_column(name) for each
    projected column. That was relatively expensive, involving a hash
    computation and string equality check, etc.
    
    This changes the projection calculation to happen "up front" once per
    Scan RPC and per-rowblock calls.
    
    This optimization could also apply to the rowwise serialization, but I
    found that the other overheads inherent in that code path are so high
    that the find_column calls aren't particularly noticeable. Nonetheless
    I left a TODO.
    
    Change-Id: I1b683c7d6d6fe1026ee06c8b5ebfe2a5f1ee6cb1
    Reviewed-on: http://gerrit.cloudera.org:8080/15678
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Todd Lipcon <to...@apache.org>
---
 src/kudu/common/columnar_serialization.cc | 54 +++++++++-----------
 src/kudu/common/columnar_serialization.h  | 41 +++++++++++----
 src/kudu/common/wire_protocol-test.cc     | 12 ++---
 src/kudu/tserver/tablet_service.cc        | 84 +++++++++++++++++++------------
 4 files changed, 112 insertions(+), 79 deletions(-)

diff --git a/src/kudu/common/columnar_serialization.cc b/src/kudu/common/columnar_serialization.cc
index f6b289e..52a3425 100644
--- a/src/kudu/common/columnar_serialization.cc
+++ b/src/kudu/common/columnar_serialization.cc
@@ -587,34 +587,31 @@ void CopySelectedVarlenCellsFromColumn(const ColumnBlock& cblock,
 } // anonymous namespace
 } // namespace internal
 
-int SerializeRowBlockColumnar(
-    const RowBlock& block,
-    const Schema* projection_schema,
-    ColumnarSerializedBatch* out) {
-  DCHECK_GT(block.nrows(), 0);
-  const Schema* tablet_schema = block.schema();
-
-  if (projection_schema == nullptr) {
-    projection_schema = tablet_schema;
-  }
-
+ColumnarSerializedBatch::ColumnarSerializedBatch(const Schema& rowblock_schema,
+                                                 const Schema& client_schema) {
   // Initialize buffers for the columns.
   // TODO(todd) don't pre-size these to 1MB per column -- quite
   // expensive if there are a lot of columns!
-  if (out->columns.size() != projection_schema->num_columns()) {
-    CHECK_EQ(out->columns.size(), 0);
-    out->columns.reserve(projection_schema->num_columns());
-    for (const auto& col : projection_schema->columns()) {
-      out->columns.emplace_back();
-      out->columns.back().data.reserve(1024 * 1024);
-      if (col.type_info()->physical_type() == BINARY) {
-        out->columns.back().varlen_data.emplace();
-      }
-      if (col.is_nullable()) {
-        out->columns.back().non_null_bitmap.emplace();
-      }
+  columns_.reserve(client_schema.num_columns());
+  for (const auto& schema_col : client_schema.columns()) {
+    columns_.emplace_back();
+    auto& col = columns_.back();
+
+    col.rowblock_schema_col_idx = rowblock_schema.find_column(schema_col.name());
+    CHECK_NE(col.rowblock_schema_col_idx, -1);
+
+    col.data.reserve(1024 * 1024);
+    if (schema_col.type_info()->physical_type() == BINARY) {
+      col.varlen_data.emplace();
+    }
+    if (schema_col.is_nullable()) {
+      col.non_null_bitmap.emplace();
     }
   }
+}
+
+int ColumnarSerializedBatch::AddRowBlock(const RowBlock& block) {
+  DCHECK_GT(block.nrows(), 0);
 
   SelectedRows sel = block.selection_vector()->GetSelectedRows();
   if (sel.num_selected() == 0) {
@@ -622,21 +619,18 @@ int SerializeRowBlockColumnar(
   }
 
   int col_idx = 0;
-  for (const auto& col : projection_schema->columns()) {
-    int t_schema_idx = tablet_schema->find_column(col.name());
-    CHECK_NE(t_schema_idx, -1);
-    const ColumnBlock& column_block = block.column_block(t_schema_idx);
-
+  for (const auto& col : columns_) {
+    const ColumnBlock& column_block = block.column_block(col.rowblock_schema_col_idx);
     if (column_block.type_info()->physical_type() == BINARY) {
       internal::CopySelectedVarlenCellsFromColumn(
           column_block,
           sel,
-          &out->columns[col_idx]);
+          &columns_[col_idx]);
     } else {
       internal::CopySelectedCellsFromColumn(
           column_block,
           sel,
-          &out->columns[col_idx]);
+          &columns_[col_idx]);
     }
     col_idx++;
   }
diff --git a/src/kudu/common/columnar_serialization.h b/src/kudu/common/columnar_serialization.h
index bc74f08..b4862c7 100644
--- a/src/kudu/common/columnar_serialization.h
+++ b/src/kudu/common/columnar_serialization.h
@@ -17,6 +17,7 @@
 #pragma once
 
 #include <cstdint>
+#include <utility>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
@@ -30,8 +31,24 @@ class Schema;
 
 // A pending batch of serialized rows, suitable for easy conversion
 // into the protobuf representation and a set of sidecars.
-struct ColumnarSerializedBatch {
+class ColumnarSerializedBatch {
+ public:
+  // 'rowblock_schema': the schema of the RowBlocks that will be passed to
+  //                    AddRowBlock().
+  // 'client_schema': the schema to be returned to the client, which may
+  //                  contain a subset of columns
+  ColumnarSerializedBatch(const Schema& rowblock_schema,
+                          const Schema& client_schema);
+
+  // Append the data in 'block' into this columnar batch.
+  //
+  // Returns the number of selected rows serialized.
+  int AddRowBlock(const RowBlock& block);
+
   struct Column {
+    // The index of the column in the schema of the RowBlocks to be appended.
+    int rowblock_schema_col_idx;
+
     // Underlying column data.
     faststring data;
 
@@ -41,17 +58,19 @@ struct ColumnarSerializedBatch {
     // Each bit is set when a value is non-null
     boost::optional<faststring> non_null_bitmap;
   };
-  std::vector<Column> columns;
-};
 
-// Serialize the data in 'block' into the columnar batch 'out', appending to
-// any data already serialized to the same batch.
-//
-// Returns the number of selected rows serialized.
-int SerializeRowBlockColumnar(
-    const RowBlock& block,
-    const Schema* projection_schema,
-    ColumnarSerializedBatch* out);
+  const std::vector<Column>& columns() const {
+    return columns_;
+  }
+
+  std::vector<Column> TakeColumns() && {
+    return std::move(columns_);
+  }
+
+ private:
+  friend class WireProtocolTest;
+  std::vector<Column> columns_;
+};
 
 
 ////////////////////////////////////////////////////////////
diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc
index 4a1c791..9ab534d 100644
--- a/src/kudu/common/wire_protocol-test.cc
+++ b/src/kudu/common/wire_protocol-test.cc
@@ -306,13 +306,13 @@ TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) {
   }
 
   // Convert all of the RowBlocks to a single serialized (concatenated) columnar format.
-  ColumnarSerializedBatch batch;
+  ColumnarSerializedBatch batch(schema_, schema_);
   for (const auto& block : blocks) {
-    SerializeRowBlockColumnar(block, nullptr, &batch);
+    batch.AddRowBlock(block);
   }
 
   // Verify that the resulting serialized data matches the concatenated original data blocks.
-  ASSERT_EQ(5, batch.columns.size());
+  ASSERT_EQ(5, batch.columns().size());
   int dst_row_idx = 0;
   for (const auto& block : blocks) {
     for (int src_row_idx = 0; src_row_idx < block.nrows(); src_row_idx++) {
@@ -325,7 +325,7 @@ TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) {
       for (int c = 0; c < schema_.num_columns(); c++) {
         SCOPED_TRACE(c);
         const auto& col = schema_.column(c);
-        const auto& serialized_col = batch.columns[c];
+        const auto& serialized_col = batch.columns()[c];
         if (col.is_nullable()) {
           bool expect_null = row.is_null(c);;
           EXPECT_EQ(!BitmapTest(serialized_col.non_null_bitmap->data(), dst_row_idx),
@@ -464,8 +464,8 @@ struct RowwiseConverter {
 
 struct ColumnarConverter {
   static void Run(const RowBlock& block) {
-    ColumnarSerializedBatch batch;
-    SerializeRowBlockColumnar(block, nullptr, &batch);
+    ColumnarSerializedBatch batch(*block.schema(), *block.schema());
+    batch.AddRowBlock(block);
   }
 
   static constexpr const char* kName = "columnar";
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 1bb93ff..edfc1fd 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -26,6 +26,7 @@
 #include <numeric>
 #include <ostream>
 #include <string>
+#include <type_traits>
 #include <unordered_set>
 #include <vector>
 
@@ -691,7 +692,9 @@ class ScanResultCollector {
   // request is decoded and checked for 'row_format_flags'.
   //
   // Does nothing by default.
-  virtual Status InitSerializer(uint64_t /* row_format_flags */) {
+  virtual Status InitSerializer(uint64_t /* row_format_flags */,
+                                const Schema& /* scanner_schema */,
+                                const Schema& /* client_schema */) {
     return Status::OK();
   }
 
@@ -754,6 +757,8 @@ class RowwiseResultSerializer : public ResultSerializer {
 
   int SerializeRowBlock(const RowBlock& row_block,
                         const Schema* client_projection_schema) override {
+    // TODO(todd) create some kind of serializer object that caches the projection
+    // information to avoid recalculating it on every SerializeRowBlock call.
     int num_selected = kudu::SerializeRowBlock(
         row_block, client_projection_schema,
         &rows_data_, &indirect_data_, pad_unixtime_micros_to_16_bytes_);
@@ -795,18 +800,21 @@ class RowwiseResultSerializer : public ResultSerializer {
 
 class ColumnarResultSerializer : public ResultSerializer {
  public:
-  static Status Create(uint64_t flags, unique_ptr<ResultSerializer>* serializer) {
+  static Status Create(uint64_t flags,
+                       const Schema& scanner_schema,
+                       const Schema& client_schema,
+                       unique_ptr<ResultSerializer>* serializer) {
     if (flags & ~RowFormatFlags::COLUMNAR_LAYOUT) {
       return Status::InvalidArgument("Row format flags not supported with columnar layout");
     }
-    serializer->reset(new ColumnarResultSerializer());
+    serializer->reset(new ColumnarResultSerializer(scanner_schema, client_schema));
     return Status::OK();
   }
 
   int SerializeRowBlock(const RowBlock& row_block,
-                        const Schema* client_projection_schema) override {
+                        const Schema* /* unused */) override {
     CHECK(!done_);
-    int n_sel = SerializeRowBlockColumnar(row_block, client_projection_schema, &results_);
+    int n_sel = results_.AddRowBlock(row_block);
     num_rows_ += n_sel;
     return n_sel;
   }
@@ -815,7 +823,7 @@ class ColumnarResultSerializer : public ResultSerializer {
     CHECK(!done_);
 
     int total = 0;
-    for (const auto& col : results_.columns) {
+    for (const auto& col : results_.columns()) {
       total += col.data.size();
       if (col.varlen_data) {
         total += col.varlen_data->size();
@@ -831,7 +839,8 @@ class ColumnarResultSerializer : public ResultSerializer {
     CHECK(!done_);
     done_ = true;
     ColumnarRowBlockPB* data = resp->mutable_columnar_data();
-    for (auto& col : results_.columns) {
+    auto cols = std::move(results_).TakeColumns();
+    for (auto& col : cols) {
       auto* col_pb = data->add_columns();
       int sidecar_idx;
       CHECK_OK(context->AddOutboundSidecar(
@@ -854,7 +863,10 @@ class ColumnarResultSerializer : public ResultSerializer {
   }
 
  private:
-  ColumnarResultSerializer() {}
+  ColumnarResultSerializer(const Schema& scanner_schema,
+                           const Schema& client_schema)
+      : results_(scanner_schema, client_schema) {
+  }
 
   int64_t num_rows_ = 0;
   ColumnarSerializedBatch results_;
@@ -898,14 +910,17 @@ class ScanResultCopier : public ScanResultCollector {
     return num_rows_returned_;
   }
 
-  Status InitSerializer(uint64_t row_format_flags) override {
+  Status InitSerializer(uint64_t row_format_flags,
+                        const Schema& scanner_schema,
+                        const Schema& client_schema) override {
     if (serializer_) {
       // TODO(todd) for the NewScanner case, this gets called twice
       // which is a bit ugly. Refactor to avoid!
       return Status::OK();
     }
     if (row_format_flags & COLUMNAR_LAYOUT) {
-      return ColumnarResultSerializer::Create(row_format_flags, &serializer_);
+      return ColumnarResultSerializer::Create(
+          row_format_flags, scanner_schema, client_schema, &serializer_);
     }
     serializer_.reset(new RowwiseResultSerializer(batch_size_bytes_, row_format_flags));
     return Status::OK();
@@ -2432,14 +2447,6 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
   TRACE_EVENT1("tserver", "TabletServiceImpl::HandleNewScanRequest",
                "tablet_id", scan_pb.tablet_id());
 
-  Status s = result_collector->InitSerializer(scan_pb.row_format_flags());
-  if (!s.ok()) {
-    *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
-    return s;
-  }
-
-  const Schema& tablet_schema = replica->tablet_metadata()->schema();
-
   SharedScanner scanner;
   server_->scanner_manager()->NewScanner(replica,
                                          rpc_context->remote_user(),
@@ -2455,7 +2462,7 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
   // Create the user's requested projection.
   // TODO(todd): Add test cases for bad projections including 0 columns.
   Schema projection;
-  s = ColumnPBsToSchema(scan_pb.projected_columns(), &projection);
+  Status s = ColumnPBsToSchema(scan_pb.projected_columns(), &projection);
   if (PREDICT_FALSE(!s.ok())) {
     *error_code = TabletServerErrorPB::INVALID_SCHEMA;
     return s;
@@ -2480,6 +2487,8 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
     }
   }
 
+  const Schema& tablet_schema = replica->tablet_metadata()->schema();
+
   ScanSpec spec;
   s = SetupScanSpec(scan_pb, tablet_schema, scanner, &spec);
   if (PREDICT_FALSE(!s.ok())) {
@@ -2498,11 +2507,6 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
   // NOTE: We should build the missing column after optimizing scan which will
   // remove unnecessary predicates.
   vector<ColumnSchema> missing_cols = spec.GetMissingColumns(projection);
-  if (spec.CanShortCircuit()) {
-    VLOG(1) << "short-circuiting without creating a server-side scanner.";
-    *has_more_results = false;
-    return Status::OK();
-  }
 
   // Store the original projection.
   {
@@ -2551,6 +2555,20 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
   projection = projection_builder.BuildWithoutIds();
   VLOG(3) << "Scan projection: " << projection.ToString(Schema::BASE_INFO);
 
+  s = result_collector->InitSerializer(scan_pb.row_format_flags(),
+                                       projection,
+                                       *scanner->client_projection_schema());
+  if (!s.ok()) {
+    *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
+    return s;
+  }
+
+  if (spec.CanShortCircuit()) {
+    VLOG(1) << "short-circuiting without creating a server-side scanner.";
+    *has_more_results = false;
+    return Status::OK();
+  }
+
   // It's important to keep the reference to the tablet for the case when the
   // tablet replica's shutdown is run concurrently with the code below.
   shared_ptr<Tablet> tablet;
@@ -2741,13 +2759,6 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
           << SecureShortDebugString(*req);
   TRACE("Found scanner $0 for tablet $1", scanner->id(), scanner->tablet_id());
 
-  // Set the row format flags on the ScanResultCollector.
-  s = result_collector->InitSerializer(scanner->row_format_flags());
-  if (!s.ok()) {
-    *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
-    return s;
-  }
-
   if (batch_size_bytes == 0 && req->close_scanner()) {
     *has_more_results = false;
     return Status::OK();
@@ -2761,11 +2772,20 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
 
   RowwiseIterator* iter = scanner->iter();
 
+  // Set the row format flags on the ScanResultCollector.
+  s = result_collector->InitSerializer(scanner->row_format_flags(),
+                                       iter->schema(),
+                                       *scanner->client_projection_schema());
+  if (!s.ok()) {
+    *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
+    return s;
+  }
+
   // TODO(todd): could size the RowBlock based on the user's requested batch size?
   // If people had really large indirect objects, we would currently overshoot
   // their requested batch size by a lot.
   Arena arena(32 * 1024);
-  RowBlock block(&scanner->iter()->schema(),
+  RowBlock block(&iter->schema(),
                  FLAGS_scanner_batch_size_rows, &arena);
 
   // TODO(todd): in the future, use the client timeout to set a budget. For now,