You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2020/03/25 03:28:23 UTC

[kudu] branch master updated (c8daf9a -> a4b2881)

This is an automated email from the ASF dual-hosted git repository.

todd pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from c8daf9a  [rpc] Add RPC feature flag for Bloom filter predicate
     new 9945954  Use popcnt instruction for Bits::Count
     new a4b2881  rpc: relax limit on number of sidecars

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/gutil/bits.cc        | 21 ++++++++++--
 src/kudu/gutil/cpu.cc         |  2 ++
 src/kudu/gutil/cpu.h          |  2 ++
 src/kudu/rpc/inbound_call.cc  |  3 +-
 src/kudu/rpc/inbound_call.h   |  4 +--
 src/kudu/rpc/outbound_call.cc |  3 +-
 src/kudu/rpc/outbound_call.h  |  4 +--
 src/kudu/rpc/rpc-test-base.h  | 75 +++++++++++++++++++++----------------------
 src/kudu/rpc/rpc-test.cc      | 45 ++++++++++++++++++++------
 src/kudu/rpc/rpc_sidecar.cc   | 13 +++++---
 src/kudu/rpc/rpc_sidecar.h    | 10 +++---
 src/kudu/rpc/rtest.proto      | 17 +++++-----
 src/kudu/rpc/transfer.cc      |  2 +-
 src/kudu/rpc/transfer.h       |  2 +-
 src/kudu/util/init.cc         |  7 ++++
 15 files changed, 134 insertions(+), 76 deletions(-)


[kudu] 02/02: rpc: relax limit on number of sidecars

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit a4b28812f43e6b8a85e1d61b48a2dbba70d56d1b
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Tue Mar 24 10:42:14 2020 -0700

    rpc: relax limit on number of sidecars
    
    Following the example of 0a7787f6563ae316a4a6207a9244c19ba8a7731f this
    changes the the sidecars for outbound calls to use a
    boost::container::small_vector instead of a fixed size array. This means
    we no longer need such a strict limit.
    
    This commit raises the limit to 10000 as a reasonable "can't see the
    need for more than this" number.
    
    This also fixes an issue where the underlying socket syscalls fail if
    the number of slices in the iovec list is more than IOV_MAX. The new
    test verifies the behavior.
    
    Change-Id: I8e53d5b50753e4aa57d885718ad5b24558636f82
    Reviewed-on: http://gerrit.cloudera.org:8080/15547
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/rpc/inbound_call.cc  |  3 +-
 src/kudu/rpc/inbound_call.h   |  4 +--
 src/kudu/rpc/outbound_call.cc |  3 +-
 src/kudu/rpc/outbound_call.h  |  4 +--
 src/kudu/rpc/rpc-test-base.h  | 75 +++++++++++++++++++++----------------------
 src/kudu/rpc/rpc-test.cc      | 45 ++++++++++++++++++++------
 src/kudu/rpc/rpc_sidecar.cc   | 13 +++++---
 src/kudu/rpc/rpc_sidecar.h    | 10 +++---
 src/kudu/rpc/rtest.proto      | 17 +++++-----
 src/kudu/rpc/transfer.cc      |  2 +-
 src/kudu/rpc/transfer.h       |  2 +-
 11 files changed, 104 insertions(+), 74 deletions(-)

diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc
index 60fdbb2..179a1d3 100644
--- a/src/kudu/rpc/inbound_call.cc
+++ b/src/kudu/rpc/inbound_call.cc
@@ -21,6 +21,7 @@
 #include <memory>
 #include <ostream>
 
+#include <boost/container/vector.hpp>
 #include <glog/logging.h>
 #include <google/protobuf/message.h>
 #include <google/protobuf/message_lite.h>
@@ -93,7 +94,7 @@ Status InboundCall::ParseFrom(unique_ptr<InboundTransfer> transfer) {
   }
 
   RETURN_NOT_OK(RpcSidecar::ParseSidecars(
-          header_.sidecar_offsets(), serialized_request_, inbound_sidecar_slices_));
+          header_.sidecar_offsets(), serialized_request_, &inbound_sidecar_slices_));
   if (header_.sidecar_offsets_size() > 0) {
     // Trim the request to just the message
     serialized_request_ = Slice(serialized_request_.data(), header_.sidecar_offsets(0));
diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h
index 1a9d8fb..c651aaa 100644
--- a/src/kudu/rpc/inbound_call.h
+++ b/src/kudu/rpc/inbound_call.h
@@ -30,6 +30,7 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/rpc/remote_method.h"
 #include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/rpc/service_if.h"
 #include "kudu/rpc/transfer.h"
 #include "kudu/util/faststring.h"
@@ -55,7 +56,6 @@ class Connection;
 class DumpConnectionsRequestPB;
 class RemoteUser;
 class RpcCallInProgressPB;
-class RpcSidecar;
 
 struct InboundCallTiming {
   MonoTime time_received;   // Time the call was first accepted.
@@ -264,7 +264,7 @@ class InboundCall {
 
   // Inbound sidecars from the request. The slices are views onto transfer_. There are as
   // many slices as header_.sidecar_offsets_size().
-  Slice inbound_sidecar_slices_[TransferLimits::kMaxSidecars];
+  SidecarSliceVector inbound_sidecar_slices_;
 
   // The trace buffer.
   scoped_refptr<Trace> trace_;
diff --git a/src/kudu/rpc/outbound_call.cc b/src/kudu/rpc/outbound_call.cc
index 5a0c1da..66557b9 100644
--- a/src/kudu/rpc/outbound_call.cc
+++ b/src/kudu/rpc/outbound_call.cc
@@ -26,6 +26,7 @@
 #include <utility>
 #include <vector>
 
+#include <boost/container/vector.hpp>
 #include <boost/function.hpp>
 #include <gflags/gflags.h>
 #include <google/protobuf/message.h>
@@ -512,7 +513,7 @@ Status CallResponse::ParseFrom(unique_ptr<InboundTransfer> transfer) {
 
   // Use information from header to extract the payload slices.
   RETURN_NOT_OK(RpcSidecar::ParseSidecars(header_.sidecar_offsets(),
-          serialized_response_, sidecar_slices_));
+          serialized_response_, &sidecar_slices_));
 
   if (header_.sidecar_offsets_size() > 0) {
     serialized_response_ =
diff --git a/src/kudu/rpc/outbound_call.h b/src/kudu/rpc/outbound_call.h
index eb36608..74db5ca 100644
--- a/src/kudu/rpc/outbound_call.h
+++ b/src/kudu/rpc/outbound_call.h
@@ -33,6 +33,7 @@
 #include "kudu/rpc/remote_method.h"
 #include "kudu/rpc/response_callback.h"
 #include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/rpc/transfer.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/locks.h"
@@ -55,7 +56,6 @@ class CallResponse;
 class DumpConnectionsRequestPB;
 class RpcCallInProgressPB;
 class RpcController;
-class RpcSidecar;
 
 // Tracks the status of a call on the client side.
 //
@@ -329,7 +329,7 @@ class CallResponse {
   Slice serialized_response_;
 
   // Slices of data for rpc sidecars. They point into memory owned by transfer_.
-  Slice sidecar_slices_[TransferLimits::kMaxSidecars];
+  SidecarSliceVector sidecar_slices_;
 
   // The incoming transfer data - retained because serialized_response_
   // and sidecar_slices_ refer into its data.
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index 59bfd41..c7c33f6 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -39,6 +39,7 @@
 #include "kudu/rpc/service_if.h"
 #include "kudu/rpc/service_pool.h"
 #include "kudu/security/security-test-util.h"
+#include "kudu/util/crc.h"
 #include "kudu/util/env.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/mem_tracker.h"
@@ -70,8 +71,8 @@ using kudu::rpc_test::ExactlyOnceResponsePB;
 using kudu::rpc_test::FeatureFlags;
 using kudu::rpc_test::PanicRequestPB;
 using kudu::rpc_test::PanicResponsePB;
-using kudu::rpc_test::PushTwoStringsRequestPB;
-using kudu::rpc_test::PushTwoStringsResponsePB;
+using kudu::rpc_test::PushStringsRequestPB;
+using kudu::rpc_test::PushStringsResponsePB;
 using kudu::rpc_test::SendTwoStringsRequestPB;
 using kudu::rpc_test::SendTwoStringsResponsePB;
 using kudu::rpc_test::SleepRequestPB;
@@ -93,7 +94,7 @@ class GenericCalculatorService : public ServiceIf {
   static const char *kAddMethodName;
   static const char *kSleepMethodName;
   static const char *kSleepWithSidecarMethodName;
-  static const char *kPushTwoStringsMethodName;
+  static const char *kPushStringsMethodName;
   static const char *kSendTwoStringsMethodName;
   static const char *kAddExactlyOnce;
 
@@ -118,8 +119,8 @@ class GenericCalculatorService : public ServiceIf {
       DoSleepWithSidecar(incoming);
     } else if (incoming->remote_method().method_name() == kSendTwoStringsMethodName) {
       DoSendTwoStrings(incoming);
-    } else if (incoming->remote_method().method_name() == kPushTwoStringsMethodName) {
-      DoPushTwoStrings(incoming);
+    } else if (incoming->remote_method().method_name() == kPushStringsMethodName) {
+      DoPushStrings(incoming);
     } else {
       incoming->RespondFailure(ErrorStatusPB::ERROR_NO_SUCH_METHOD,
                                Status::InvalidArgument("bad method"));
@@ -179,28 +180,25 @@ class GenericCalculatorService : public ServiceIf {
     incoming->RespondSuccess(resp);
   }
 
-  void DoPushTwoStrings(InboundCall* incoming) {
+  static void DoPushStrings(InboundCall* incoming) {
     Slice param(incoming->serialized_request());
-    PushTwoStringsRequestPB req;
+    PushStringsRequestPB req;
     if (!req.ParseFromArray(param.data(), param.size())) {
       LOG(FATAL) << "couldn't parse: " << param.ToDebugString();
     }
 
-    Slice sidecar1;
-    CHECK_OK(incoming->GetInboundSidecar(req.sidecar1_idx(), &sidecar1));
-
-    Slice sidecar2;
-    CHECK_OK(incoming->GetInboundSidecar(req.sidecar2_idx(), &sidecar2));
-
     // Check that reading non-existant sidecars doesn't work.
     Slice tmp;
-    CHECK(!incoming->GetInboundSidecar(req.sidecar2_idx() + 2, &tmp).ok());
+    CHECK(!incoming->GetInboundSidecar(req.sidecar_indexes_size() + 2, &tmp).ok());
+
+    PushStringsResponsePB resp;
+    for (const auto& sidecar_idx : req.sidecar_indexes()) {
+      Slice sidecar;
+      CHECK_OK(incoming->GetInboundSidecar(sidecar_idx, &sidecar));
 
-    PushTwoStringsResponsePB resp;
-    resp.set_size1(sidecar1.size());
-    resp.set_data1(reinterpret_cast<const char*>(sidecar1.data()), sidecar1.size());
-    resp.set_size2(sidecar2.size());
-    resp.set_data2(reinterpret_cast<const char*>(sidecar2.data()), sidecar2.size());
+      resp.add_sizes(sidecar.size());
+      resp.add_crcs(crc::Crc32c(sidecar.data(), sidecar.size()));
+    }
 
     // Drop the sidecars etc, just to confirm that it's safe to do so.
     CHECK_GT(incoming->GetTransferSize(), 0);
@@ -405,7 +403,7 @@ const char *GenericCalculatorService::kFullServiceName = "kudu.rpc.GenericCalcul
 const char *GenericCalculatorService::kAddMethodName = "Add";
 const char *GenericCalculatorService::kSleepMethodName = "Sleep";
 const char *GenericCalculatorService::kSleepWithSidecarMethodName = "SleepWithSidecar";
-const char *GenericCalculatorService::kPushTwoStringsMethodName = "PushTwoStrings";
+const char *GenericCalculatorService::kPushStringsMethodName = "PushStrings";
 const char *GenericCalculatorService::kSendTwoStringsMethodName = "SendTwoStrings";
 const char *GenericCalculatorService::kAddExactlyOnce = "AddExactlyOnce";
 
@@ -511,28 +509,29 @@ class RpcTestBase : public KuduTest {
     CHECK_EQ(0, second.compare(Slice(expected)));
   }
 
-  Status DoTestOutgoingSidecar(const Proxy &p, int size1, int size2) {
-    PushTwoStringsRequestPB request;
-    RpcController controller;
-
-    int idx1;
-    std::string s1(size1, 'a');
-    CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s1)), &idx1));
+  static Status DoTestOutgoingSidecar(const Proxy &p, int size1, int size2) {
+    return DoTestOutgoingSidecar(p, {std::string(size1, 'a'), std::string(size2, 'b')});
+  }
 
-    int idx2;
-    std::string s2(size2, 'b');
-    CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s2)), &idx2));
+  static Status DoTestOutgoingSidecar(const Proxy& p, const std::vector<std::string>& strings) {
+    PushStringsRequestPB request;
+    RpcController controller;
 
-    request.set_sidecar1_idx(idx1);
-    request.set_sidecar2_idx(idx2);
+    for (const auto& s : strings) {
+      int idx;
+      CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx));
+      request.add_sidecar_indexes(idx);
+    }
 
-    PushTwoStringsResponsePB resp;
-    KUDU_RETURN_NOT_OK(p.SyncRequest(GenericCalculatorService::kPushTwoStringsMethodName,
+    PushStringsResponsePB resp;
+    KUDU_RETURN_NOT_OK(p.SyncRequest(GenericCalculatorService::kPushStringsMethodName,
                                      request, &resp, &controller));
-    CHECK_EQ(size1, resp.size1());
-    CHECK_EQ(resp.data1(), s1);
-    CHECK_EQ(size2, resp.size2());
-    CHECK_EQ(resp.data2(), s2);
+    for (int i = 0; i < strings.size(); i++) {
+      CHECK_EQ(strings[i].size(), resp.sizes(i));
+      CHECK_EQ(crc::Crc32c(strings[i].data(), strings[i].size()),
+               resp.crcs(i));
+    }
+
     return Status::OK();
   }
 
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 58f0f16..005b660 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -19,6 +19,7 @@
 #include <cstdint>
 #include <cstdlib>
 #include <cstring>
+#include <functional>
 #include <limits>
 #include <memory>
 #include <ostream>
@@ -47,6 +48,7 @@
 #include "kudu/rpc/reactor.h"
 #include "kudu/rpc/rpc-test-base.h"
 #include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/rpc_introspection.pb.h"
 #include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/rpc/rtest.pb.h"
@@ -60,6 +62,7 @@
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
 #include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
@@ -840,6 +843,29 @@ TEST_P(TestRpc, TestRpcSidecar) {
   DoTestOutgoingSidecarExpectOK(p, 3000 * 1024, 2000 * 1024);
 }
 
+// Test sending the maximum number of sidecars, each of them being a single
+// character. This makes sure we handle the limit of IOV_MAX iovecs per sendmsg
+// call.
+TEST_P(TestRpc, TestMaxSmallSidecars) {
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+
+  // Set up client.
+  shared_ptr<Messenger> client_messenger;
+  ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, GetParam()));
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
+
+  Random rng(GetRandomSeed32());
+  vector<string> strings(TransferLimits::kMaxSidecars);
+  for (auto& s : strings) {
+    s = RandomString(2, &rng);
+  }
+  ASSERT_OK(DoTestOutgoingSidecar(p, strings));
+}
+
 TEST_P(TestRpc, TestRpcSidecarLimits) {
   {
     // Test that the limits on the number of sidecars is respected.
@@ -907,11 +933,10 @@ TEST_P(TestRpc, TestRpcSidecarLimits) {
     int idx;
     ASSERT_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(max_string)), &idx));
 
-    PushTwoStringsRequestPB request;
-    request.set_sidecar1_idx(idx);
-    request.set_sidecar2_idx(idx);
-    PushTwoStringsResponsePB resp;
-    Status status = p.SyncRequest(GenericCalculatorService::kPushTwoStringsMethodName,
+    PushStringsRequestPB request;
+    request.add_sidecar_indexes(idx);
+    PushStringsResponsePB resp;
+    Status status = p.SyncRequest(GenericCalculatorService::kPushStringsMethodName,
         request, &resp, &controller);
     ASSERT_TRUE(status.IsNetworkError()) << "Unexpected error: " << status.ToString();
     // Remote responds to extra-large payloads by closing the connection.
@@ -1423,16 +1448,16 @@ static void SendAndCancelRpcs(Proxy* p, const Slice& slice) {
   int i = 0;
   while (MonoTime::Now() < end_time) {
     controller.Reset();
-    PushTwoStringsRequestPB request;
-    PushTwoStringsResponsePB resp;
+    PushStringsRequestPB request;
+    PushStringsResponsePB resp;
     int idx;
     CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx));
-    request.set_sidecar1_idx(idx);
+    request.add_sidecar_indexes(idx);
     CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx));
-    request.set_sidecar2_idx(idx);
+    request.add_sidecar_indexes(idx);
 
     CountDownLatch latch(1);
-    p->AsyncRequest(GenericCalculatorService::kPushTwoStringsMethodName,
+    p->AsyncRequest(GenericCalculatorService::kPushStringsMethodName,
                     request, &resp, &controller,
                     boost::bind(&CountDownLatch::CountDown, boost::ref(latch)));
 
diff --git a/src/kudu/rpc/rpc_sidecar.cc b/src/kudu/rpc/rpc_sidecar.cc
index dbeb845..da7244d 100644
--- a/src/kudu/rpc/rpc_sidecar.cc
+++ b/src/kudu/rpc/rpc_sidecar.cc
@@ -17,11 +17,12 @@
 
 #include "kudu/rpc/rpc_sidecar.h"
 
+#include <algorithm>
 #include <cstdint>
 #include <memory>
-#include <utility>
 #include <vector>
 
+#include <boost/container/vector.hpp>
 #include <google/protobuf/repeated_field.h>
 
 #include "kudu/gutil/strings/substitute.h"
@@ -89,8 +90,9 @@ unique_ptr<RpcSidecar> RpcSidecar::FromSlice(Slice slice) {
 
 Status RpcSidecar::ParseSidecars(
     const ::google::protobuf::RepeatedField<::google::protobuf::uint32>& offsets,
-    Slice buffer, Slice* sidecars) {
-  if (offsets.size() == 0) return Status::OK();
+    Slice buffer,
+    SidecarSliceVector* sidecars) {
+  if (offsets.empty()) return Status::OK();
 
   int last = offsets.size() - 1;
   if (last >= TransferLimits::kMaxSidecars) {
@@ -105,6 +107,7 @@ Status RpcSidecar::ParseSidecars(
             buffer.size(), TransferLimits::kMaxTotalSidecarBytes));
   }
 
+  sidecars->resize(offsets.size());
   for (int i = 0; i < last; ++i) {
     int64_t cur_offset = offsets.Get(i);
     int64_t next_offset = offsets.Get(i + 1);
@@ -120,7 +123,7 @@ Status RpcSidecar::ParseSidecars(
               " but ends before that at offset $1.", i, cur_offset, next_offset));
     }
 
-    sidecars[i] = Slice(buffer.data() + cur_offset, next_offset - cur_offset);
+    (*sidecars)[i] = Slice(buffer.data() + cur_offset, next_offset - cur_offset);
   }
 
   int64_t cur_offset = offsets.Get(last);
@@ -129,7 +132,7 @@ Status RpcSidecar::ParseSidecars(
             "starts at offset $1after message ends (message length $2).", last,
             cur_offset, buffer.size()));
   }
-  sidecars[last] = Slice(buffer.data() + cur_offset, buffer.size() - cur_offset);
+  (*sidecars)[last] = Slice(buffer.data() + cur_offset, buffer.size() - cur_offset);
 
   return Status::OK();
 }
diff --git a/src/kudu/rpc/rpc_sidecar.h b/src/kudu/rpc/rpc_sidecar.h
index 3fccc5b..6cb3449 100644
--- a/src/kudu/rpc/rpc_sidecar.h
+++ b/src/kudu/rpc/rpc_sidecar.h
@@ -20,6 +20,7 @@
 #include <memory>
 #include <vector>
 
+#include <boost/container/small_vector.hpp>
 #include <google/protobuf/repeated_field.h> // IWYU pragma: keep
 #include <google/protobuf/stubs/port.h>
 
@@ -33,6 +34,8 @@ class faststring;
 
 namespace rpc {
 
+typedef boost::container::small_vector<Slice, 2> SidecarSliceVector;
+
 // An RpcSidecar is a mechanism which allows replies to RPCs to reference blocks of data
 // without extra copies. In other words, whenever a protobuf would have a large field
 // where additional copies become expensive, one may opt instead to use an RpcSidecar.
@@ -58,12 +61,11 @@ class RpcSidecar {
   static std::unique_ptr<RpcSidecar> FromSlice(Slice slice);
 
   // Utility method to parse a series of sidecar slices into 'sidecars' from 'buffer' and
-  // a set of offsets. 'sidecars' must have length >= TransferLimits::kMaxSidecars, and
-  // will be filled from index 0.
-  // TODO(henryr): Consider a vector instead here if there's no perf. impact.
+  // a set of offsets.
   static Status ParseSidecars(
       const ::google::protobuf::RepeatedField<::google::protobuf::uint32>& offsets,
-      Slice buffer, Slice* sidecars);
+      Slice buffer,
+      SidecarSliceVector* sidecars);
 
   // Append Slice representation of the sidecar's data to the given payload.
   //
diff --git a/src/kudu/rpc/rtest.proto b/src/kudu/rpc/rtest.proto
index d212cef..852bc6b 100644
--- a/src/kudu/rpc/rtest.proto
+++ b/src/kudu/rpc/rtest.proto
@@ -76,17 +76,16 @@ message SendTwoStringsResponsePB {
   required uint32 sidecar2 = 2;
 }
 
-// Push two strings to the server as part of the request, in sidecars.
-message PushTwoStringsRequestPB {
-  required uint32 sidecar1_idx = 1;
-  required uint32 sidecar2_idx = 2;
+// Push strings to the server as part of the request, in sidecars.
+message PushStringsRequestPB {
+  repeated uint32 sidecar_indexes = 1;
 }
 
-message PushTwoStringsResponsePB {
-  required uint32 size1 = 1;
-  required string data1 = 2;
-  required uint32 size2 = 3;
-  required string data2 = 4;
+// The server responds with the sizes and checksums of the sidecars that
+// it received.
+message PushStringsResponsePB {
+  repeated uint32 sizes = 1;
+  repeated uint32 crcs = 2;
 }
 
 message EchoRequestPB {
diff --git a/src/kudu/rpc/transfer.cc b/src/kudu/rpc/transfer.cc
index 7693a91..422e448 100644
--- a/src/kudu/rpc/transfer.cc
+++ b/src/kudu/rpc/transfer.cc
@@ -225,7 +225,7 @@ Status OutboundTransfer::SendBuffer(Socket &socket) {
   CHECK_LT(cur_slice_idx_, payload_slices_.size());
 
   started_ = true;
-  int n_iovecs = payload_slices_.size() - cur_slice_idx_;
+  int n_iovecs = std::min<int>(payload_slices_.size() - cur_slice_idx_, IOV_MAX);
   struct iovec iovec[n_iovecs];
   {
     int offset_in_slice = cur_offset_in_slice_;
diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h
index 3cab6b1..91135d6 100644
--- a/src/kudu/rpc/transfer.h
+++ b/src/kudu/rpc/transfer.h
@@ -44,7 +44,7 @@ struct TransferCallbacks;
 class TransferLimits {
  public:
   enum {
-    kMaxSidecars = 10,
+    kMaxSidecars = 10000,
     kMaxTotalSidecarBytes = INT_MAX
   };
 


[kudu] 01/02: Use popcnt instruction for Bits::Count

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 9945954a3f329a808d39bddad4c816de064f8f75
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Tue Mar 24 13:10:18 2020 -0700

    Use popcnt instruction for Bits::Count
    
    I found Bits::Count to be a noticeable contributor to profiles in my
    TSDB benchmarking (a couple percent, if I recall correctly). This
    switches to using popcnt instead of a lookup table.
    
    This is technically a new CPU requirement, since the popcnt flag is
    separate from the SSE4.2 flag. In practice, however, it doesn't appear
    that there are any SSE4.2-capable machines that aren't also capable of
    POPCNT. So, this shouldn't actually change our hardware requirements.
    Another bit of evidence here is that '-msse4.2' (which we use) in clang
    also enables -mpopcnt by default, so it was already possible for clang
    to emit popcnt instructions for its own optimizations.
    
    In any case, this instruction was introduced about 10 years ago, so even
    if I missed one case of a 2010-era server, it's unlikely to still be in
    use for Kudu.
    
    Change-Id: Iad045e8b77e7baf65c42366eea3e107900eb4a64
    Reviewed-on: http://gerrit.cloudera.org:8080/15549
    Reviewed-by: Bankim Bhavsar <ba...@cloudera.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/gutil/bits.cc | 21 +++++++++++++++++++--
 src/kudu/gutil/cpu.cc  |  2 ++
 src/kudu/gutil/cpu.h   |  2 ++
 src/kudu/util/init.cc  |  7 +++++++
 4 files changed, 30 insertions(+), 2 deletions(-)

diff --git a/src/kudu/gutil/bits.cc b/src/kudu/gutil/bits.cc
index 333e464..437df3f 100644
--- a/src/kudu/gutil/bits.cc
+++ b/src/kudu/gutil/bits.cc
@@ -5,6 +5,9 @@
 #include "kudu/gutil/bits.h"
 
 #include <assert.h>
+#include <stdint.h>
+
+#include "kudu/gutil/port.h"
 
 // this array gives the number of bits for any number from 0 to 255
 // (We could make these ints.  The tradeoff is size (eg does it overwhelm
@@ -29,9 +32,23 @@ const char Bits::num_bits[] = {
 
 int Bits::Count(const void *m, int num_bytes) {
   int nbits = 0;
-  const uint8 *s = (const uint8 *) m;
-  for (int i = 0; i < num_bytes; i++)
+  const uint8 *s = static_cast<const uint8*>(m);
+#ifdef __x86_64__
+  // Assume POPCNT since Kudu checks for it at startup.
+  while (num_bytes >= 8) {
+    nbits += __builtin_popcountll(UnalignedLoad<uint64_t>(s));
+    s += 8;
+    num_bytes -= 8;
+  }
+  while (num_bytes--) {
+    nbits += __builtin_popcount(*s++);
+  }
+#else
+  // Use lookup table on non-x86.
+  for (int i = 0; i < num_bytes; i++) {
     nbits += num_bits[*s++];
+  }
+#endif
   return nbits;
 }
 
diff --git a/src/kudu/gutil/cpu.cc b/src/kudu/gutil/cpu.cc
index b3ea105..e108304 100644
--- a/src/kudu/gutil/cpu.cc
+++ b/src/kudu/gutil/cpu.cc
@@ -34,6 +34,7 @@ CPU::CPU()
     has_ssse3_(false),
     has_sse41_(false),
     has_sse42_(false),
+    has_popcnt_(false),
     has_avx_(false),
     has_avx2_(false),
     has_aesni_(false),
@@ -226,6 +227,7 @@ void CPU::Initialize() {
     has_ssse3_ = (cpu_info[2] & 0x00000200) != 0;
     has_sse41_ = (cpu_info[2] & 0x00080000) != 0;
     has_sse42_ = (cpu_info[2] & 0x00100000) != 0;
+    has_popcnt_ = (cpu_info[2] & 0x00800000) != 0;
     // AVX instructions will generate an illegal instruction exception unless
     //   a) they are supported by the CPU,
     //   b) XSAVE is supported by the CPU and
diff --git a/src/kudu/gutil/cpu.h b/src/kudu/gutil/cpu.h
index b3cf2e5..6462642 100644
--- a/src/kudu/gutil/cpu.h
+++ b/src/kudu/gutil/cpu.h
@@ -45,6 +45,7 @@ class CPU {
   bool has_ssse3() const { return has_ssse3_; }
   bool has_sse41() const { return has_sse41_; }
   bool has_sse42() const { return has_sse42_; }
+  bool has_popcnt() const { return has_popcnt_; }
   bool has_avx() const { return has_avx_; }
   bool has_avx2() const { return has_avx2_; }
   bool has_aesni() const { return has_aesni_; }
@@ -80,6 +81,7 @@ class CPU {
   bool has_ssse3_;
   bool has_sse41_;
   bool has_sse42_;
+  bool has_popcnt_;
   bool has_avx_;
   bool has_avx2_;
   bool has_aesni_;
diff --git a/src/kudu/util/init.cc b/src/kudu/util/init.cc
index d06ea21..5267730 100644
--- a/src/kudu/util/init.cc
+++ b/src/kudu/util/init.cc
@@ -74,6 +74,13 @@ Status CheckCPUFlags() {
     return BadCPUStatus(cpu, "SSSE3");
   }
 
+  // POPCNT should always be present on machines with SSE4.2 support, but just in case
+  // there's some sort of weird missing support in virtualized environments, we'll check
+  // it explicitly.
+  if (!cpu.has_popcnt()) {
+    return BadCPUStatus(cpu, "POPCNT");
+  }
+
   return Status::OK();
 }