You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2018/02/22 21:28:16 UTC

kudu git commit: KUDU-2305: Fix local variable usage to handle 2GB messages

Repository: kudu
Updated Branches:
  refs/heads/master 1ae56048b -> 2b0c1c019


KUDU-2305: Fix local variable usage to handle 2GB messages

The maximum value for rpc_max_message_size (an int32_t) is
INT_MAX. However, when using this maximum value, certain
local variables in SerializeMessage() and InboundTransfer
can overflow if the message size approaches INT_MAX.

This changes certain variables to handle messages that
approach INT_MAX. Specifically:
 - Local variables in SerializeMessage() become int64_t
   rather than int.
 - The total message size prefixed to the message is now
   treated as a uint32_t everywhere. This impacts
   InboundTransfer::total_length_/cur_offset_ and
   a local variable in ParseMessage().

These changes mean that a sender can serialize a message
that is slightly larger than INT_MAX due to the added
header size, but the receiver will reject all messages
exceeding rpc_max_message_size (maximum INT_MAX).

For testing, this modifies rpc-test's TestRpcSidecarLimits
to send a message of size INT_MAX rather than
rpc_max_message_size+1. This increases the test runtime
from about 50ms to 2 seconds.

Change-Id: I8e468099b16f7eb55de71b753acae8f57d18287c
Reviewed-on: http://gerrit.cloudera.org:8080/9355
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-by: Dan Burkert <da...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/2b0c1c01
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/2b0c1c01
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/2b0c1c01

Branch: refs/heads/master
Commit: 2b0c1c019921e485f06c4be280fedba3d5279672
Parents: 1ae5604
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Fri Feb 16 15:00:37 2018 -0800
Committer: Dan Burkert <da...@cloudera.com>
Committed: Thu Feb 22 20:52:27 2018 +0000

----------------------------------------------------------------------
 src/kudu/rpc/rpc-test.cc      |  6 +++++-
 src/kudu/rpc/serialization.cc | 13 ++++++++-----
 src/kudu/rpc/transfer.cc      |  5 ++++-
 src/kudu/rpc/transfer.h       |  4 ++--
 4 files changed, 19 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/2b0c1c01/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index b6e563e..94a22cf 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -21,6 +21,7 @@
 #include <cstdint>
 #include <cstdlib>
 #include <cstring>
+#include <limits.h>
 #include <memory>
 #include <ostream>
 #include <set>
@@ -700,7 +701,10 @@ TEST_P(TestRpc, TestRpcSidecarLimits) {
             GenericCalculatorService::static_service_name());
 
     RpcController controller;
-    string s(FLAGS_rpc_max_message_size + 1, 'a');
+    // KUDU-2305: Test with a maximal payload to verify that the implementation
+    // can handle the limits.
+    string s;
+    s.resize(INT_MAX, 'a');
     int idx;
     CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx));
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/2b0c1c01/src/kudu/rpc/serialization.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/serialization.cc b/src/kudu/rpc/serialization.cc
index b6d6393..438ad18 100644
--- a/src/kudu/rpc/serialization.cc
+++ b/src/kudu/rpc/serialization.cc
@@ -56,9 +56,12 @@ void SerializeMessage(const MessageLite& message, faststring* param_buf,
                         int additional_size, bool use_cached_size) {
   int pb_size = use_cached_size ? message.GetCachedSize() : message.ByteSize();
   DCHECK_EQ(message.ByteSize(), pb_size);
-  int recorded_size = pb_size + additional_size;
-  int size_with_delim = pb_size + CodedOutputStream::VarintSize32(recorded_size);
-  int total_size = size_with_delim + additional_size;
+  // Use 8-byte integers to avoid overflowing when additional_size approaches INT_MAX.
+  int64_t recorded_size = static_cast<int64_t>(pb_size) +
+      static_cast<int64_t>(additional_size);
+  int64_t size_with_delim = static_cast<int64_t>(pb_size) +
+      static_cast<int64_t>(CodedOutputStream::VarintSize32(recorded_size));
+  int64_t total_size = size_with_delim + static_cast<int64_t>(additional_size);
 
   if (total_size > FLAGS_rpc_max_message_size) {
     LOG(WARNING) << Substitute("Serialized $0 ($1 bytes) is larger than the maximum configured "
@@ -114,8 +117,8 @@ Status ParseMessage(const Slice& buf,
                               KUDU_REDACT(buf.ToDebugString()));
   }
 
-  int total_len = NetworkByteOrder::Load32(buf.data());
-  DCHECK_EQ(total_len + kMsgLengthPrefixLength, buf.size())
+  uint32_t total_len = NetworkByteOrder::Load32(buf.data());
+  DCHECK_EQ(total_len, buf.size() - kMsgLengthPrefixLength)
     << "Got mis-sized buffer: " << KUDU_REDACT(buf.ToDebugString());
 
   CodedInputStream in(buf.data(), buf.size());

http://git-wip-us.apache.org/repos/asf/kudu/blob/2b0c1c01/src/kudu/rpc/transfer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/transfer.cc b/src/kudu/rpc/transfer.cc
index 67d845c..ee83d55 100644
--- a/src/kudu/rpc/transfer.cc
+++ b/src/kudu/rpc/transfer.cc
@@ -77,7 +77,7 @@ InboundTransfer::InboundTransfer()
 
 Status InboundTransfer::ReceiveBuffer(Socket &socket) {
   if (cur_offset_ < kMsgLengthPrefixLength) {
-    // receive int32 length prefix
+    // receive uint32 length prefix
     int32_t rem = kMsgLengthPrefixLength - cur_offset_;
     int32_t nread;
     Status status = socket.Recv(&buf_[cur_offset_], rem, &nread);
@@ -116,6 +116,9 @@ Status InboundTransfer::ReceiveBuffer(Socket &socket) {
 
   // receive message body
   int32_t nread;
+
+  // If total_length_ > INT_MAX, then it would exceed the maximum rpc_max_message_size
+  // and exit above. Hence, it is safe to use int32_t here.
   int32_t rem = total_length_ - cur_offset_;
   Status status = socket.Recv(&buf_[cur_offset_], rem, &nread);
   RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);

http://git-wip-us.apache.org/repos/asf/kudu/blob/2b0c1c01/src/kudu/rpc/transfer.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h
index 9561f84..53f4c90 100644
--- a/src/kudu/rpc/transfer.h
+++ b/src/kudu/rpc/transfer.h
@@ -89,8 +89,8 @@ class InboundTransfer {
 
   faststring buf_;
 
-  int32_t total_length_;
-  int32_t cur_offset_;
+  uint32_t total_length_;
+  uint32_t cur_offset_;
 
   DISALLOW_COPY_AND_ASSIGN(InboundTransfer);
 };