You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/24 01:59:10 UTC

[2/5] impala git commit: KUDU-2305: Fix local variable usage to handle 2GB messages

KUDU-2305: Fix local variable usage to handle 2GB messages

The maximum value for rpc_max_message_size (an int32_t) is
INT_MAX. However, when using this maximum value, certain
local variables in SerializeMessage() and InboundTransfer
can overflow if the message size approaches INT_MAX.

This changes certain variables to handle messages that
approach INT_MAX. Specifically:
 - Local variables in SerializeMessage() become int64_t
   rather than int.
 - The total message size prefixed to the message is now
   treated as a uint32_t everywhere. This impacts
   InboundTransfer::total_length_/cur_offset_ and
   a local variable in ParseMessage().

These changes mean that a sender can serialize a message
that is slightly larger than INT_MAX due to the added
header size, but the receiver will reject all messages
exceeding rpc_max_message_size (maximum INT_MAX).

For testing, this modifies rpc-test's TestRpcSidecarLimits
to send a message of size INT_MAX rather than
rpc_max_message_size+1. This increases the test runtime
from about 50ms to 2 seconds.

Change-Id: I8e468099b16f7eb55de71b753acae8f57d18287c
Reviewed-on: http://gerrit.cloudera.org:8080/9355
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-by: Dan Burkert <da...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/9407
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins
Reviewed-on: http://gerrit.cloudera.org:8080/9427
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8b772a08
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8b772a08
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8b772a08

Branch: refs/heads/2.x
Commit: 8b772a08598a839acfd9180e3f6174d081f9dbde
Parents: 876f289
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Thu Feb 22 14:53:35 2018 -0800
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Sat Feb 24 01:58:46 2018 +0000

----------------------------------------------------------------------
 be/src/kudu/rpc/rpc-test.cc      |  6 +++++-
 be/src/kudu/rpc/serialization.cc | 13 ++++++++-----
 be/src/kudu/rpc/transfer.cc      |  5 ++++-
 be/src/kudu/rpc/transfer.h       |  4 ++--
 4 files changed, 19 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/8b772a08/be/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-test.cc b/be/src/kudu/rpc/rpc-test.cc
index 6d9d156..f76cc8f 100644
--- a/be/src/kudu/rpc/rpc-test.cc
+++ b/be/src/kudu/rpc/rpc-test.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/rpc/rpc-test-base.h"
 
+#include <limits.h>
 #include <memory>
 #include <string>
 #include <unordered_map>
@@ -613,7 +614,10 @@ TEST_P(TestRpc, TestRpcSidecarLimits) {
             GenericCalculatorService::static_service_name());
 
     RpcController controller;
-    string s(FLAGS_rpc_max_message_size + 1, 'a');
+    // KUDU-2305: Test with a maximal payload to verify that the implementation
+    // can handle the limits.
+    string s;
+    s.resize(INT_MAX, 'a');
     int idx;
     CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx));
 

http://git-wip-us.apache.org/repos/asf/impala/blob/8b772a08/be/src/kudu/rpc/serialization.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/serialization.cc b/be/src/kudu/rpc/serialization.cc
index fbc05bc..c31e79b 100644
--- a/be/src/kudu/rpc/serialization.cc
+++ b/be/src/kudu/rpc/serialization.cc
@@ -51,9 +51,12 @@ void SerializeMessage(const MessageLite& message, faststring* param_buf,
                         int additional_size, bool use_cached_size) {
   int pb_size = use_cached_size ? message.GetCachedSize() : message.ByteSize();
   DCHECK_EQ(message.ByteSize(), pb_size);
-  int recorded_size = pb_size + additional_size;
-  int size_with_delim = pb_size + CodedOutputStream::VarintSize32(recorded_size);
-  int total_size = size_with_delim + additional_size;
+  // Use 8-byte integers to avoid overflowing when additional_size approaches INT_MAX.
+  int64_t recorded_size = static_cast<int64_t>(pb_size) +
+      static_cast<int64_t>(additional_size);
+  int64_t size_with_delim = static_cast<int64_t>(pb_size) +
+      static_cast<int64_t>(CodedOutputStream::VarintSize32(recorded_size));
+  int64_t total_size = size_with_delim + static_cast<int64_t>(additional_size);
 
   if (total_size > FLAGS_rpc_max_message_size) {
     LOG(WARNING) << Substitute("Serialized $0 ($1 bytes) is larger than the maximum configured "
@@ -109,8 +112,8 @@ Status ParseMessage(const Slice& buf,
                               KUDU_REDACT(buf.ToDebugString()));
   }
 
-  int total_len = NetworkByteOrder::Load32(buf.data());
-  DCHECK_EQ(total_len + kMsgLengthPrefixLength, buf.size())
+  uint32_t total_len = NetworkByteOrder::Load32(buf.data());
+  DCHECK_EQ(total_len, buf.size() - kMsgLengthPrefixLength)
     << "Got mis-sized buffer: " << KUDU_REDACT(buf.ToDebugString());
 
   CodedInputStream in(buf.data(), buf.size());

http://git-wip-us.apache.org/repos/asf/impala/blob/8b772a08/be/src/kudu/rpc/transfer.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/transfer.cc b/be/src/kudu/rpc/transfer.cc
index da44355..f83c432 100644
--- a/be/src/kudu/rpc/transfer.cc
+++ b/be/src/kudu/rpc/transfer.cc
@@ -76,7 +76,7 @@ InboundTransfer::InboundTransfer()
 
 Status InboundTransfer::ReceiveBuffer(Socket &socket) {
   if (cur_offset_ < kMsgLengthPrefixLength) {
-    // receive int32 length prefix
+    // receive uint32 length prefix
     int32_t rem = kMsgLengthPrefixLength - cur_offset_;
     int32_t nread;
     Status status = socket.Recv(&buf_[cur_offset_], rem, &nread);
@@ -115,6 +115,9 @@ Status InboundTransfer::ReceiveBuffer(Socket &socket) {
 
   // receive message body
   int32_t nread;
+
+  // If total_length_ > INT_MAX, then it would exceed the maximum rpc_max_message_size
+  // and exit above. Hence, it is safe to use int32_t here.
   int32_t rem = total_length_ - cur_offset_;
   Status status = socket.Recv(&buf_[cur_offset_], rem, &nread);
   RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);

http://git-wip-us.apache.org/repos/asf/impala/blob/8b772a08/be/src/kudu/rpc/transfer.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/transfer.h b/be/src/kudu/rpc/transfer.h
index 2a2b726..533fe83 100644
--- a/be/src/kudu/rpc/transfer.h
+++ b/be/src/kudu/rpc/transfer.h
@@ -93,8 +93,8 @@ class InboundTransfer {
 
   faststring buf_;
 
-  int32_t total_length_;
-  int32_t cur_offset_;
+  uint32_t total_length_;
+  uint32_t cur_offset_;
 
   DISALLOW_COPY_AND_ASSIGN(InboundTransfer);
 };