You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by in...@apache.org on 2018/07/02 09:24:11 UTC

incubator-hawq git commit: HAWQ-1627. Support setting the max protocol message size when talking with HDFS

Repository: incubator-hawq
Updated Branches:
  refs/heads/master c6146c02e -> 21991077c


HAWQ-1627. Support setting the max protocol message size when talking
with HDFS


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/21991077
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/21991077
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/21991077

Branch: refs/heads/master
Commit: 21991077c4812f1ee8f854ee55e56b97d04bc0f0
Parents: c6146c0
Author: interma <in...@outlook.com>
Authored: Wed Jun 20 13:02:44 2018 +0800
Committer: interma <in...@outlook.com>
Committed: Mon Jul 2 17:23:44 2018 +0800

----------------------------------------------------------------------
 depends/libhdfs3/src/common/SessionConfig.cpp |  2 ++
 depends/libhdfs3/src/common/SessionConfig.h   |  9 +++++++++
 depends/libhdfs3/src/rpc/RpcChannel.cpp       | 13 ++++++++++++-
 depends/libhdfs3/src/rpc/RpcConfig.h          | 13 ++++++++++++-
 src/backend/utils/misc/etc/hdfs-client.xml    |  9 +++++++++
 5 files changed, 44 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/21991077/depends/libhdfs3/src/common/SessionConfig.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/common/SessionConfig.cpp b/depends/libhdfs3/src/common/SessionConfig.cpp
index 3d9d9ad..569b7e9 100644
--- a/depends/libhdfs3/src/common/SessionConfig.cpp
+++ b/depends/libhdfs3/src/common/SessionConfig.cpp
@@ -83,6 +83,8 @@ SessionConfig::SessionConfig(const Config & conf) {
         }, {
             &rpcTimeout, "rpc.client.timeout", 3600 * 1000
         }, {
+            &rpcMaxDataLength, "ipc.maximum.data.length", 64 * 1024 * 1024
+        }, {
             &defaultReplica, "dfs.default.replica", 3, bind(CheckRangeGE<int32_t>, _1, _2, 1)
         }, {
             &inputConnTimeout, "input.connect.timeout", 600 * 1000

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/21991077/depends/libhdfs3/src/common/SessionConfig.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/common/SessionConfig.h b/depends/libhdfs3/src/common/SessionConfig.h
index 7722401..3352a49 100644
--- a/depends/libhdfs3/src/common/SessionConfig.h
+++ b/depends/libhdfs3/src/common/SessionConfig.h
@@ -245,6 +245,14 @@ public:
         this->rpcTimeout = rpcTimeout;
     }
 
+    int32_t getRpcMaxDataLength() const {
+        return rpcMaxDataLength;
+    }
+
+    void setRpcMaxDataLength(int32_t rpcMaxLength) {
+        this->rpcMaxDataLength = rpcMaxLength;
+    }
+
     bool doesNotRetryAnotherNode() const {
         return notRetryAnotherNode;
     }
@@ -334,6 +342,7 @@ public:
     int32_t rpcMaxHARetry;
     int32_t rpcSocketLingerTimeout;
     int32_t rpcTimeout;
+    int32_t rpcMaxDataLength; //ipc.maximum.data.length
     bool rpcTcpNoDelay;
     std::string rpcAuthMethod;
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/21991077/depends/libhdfs3/src/rpc/RpcChannel.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/rpc/RpcChannel.cpp b/depends/libhdfs3/src/rpc/RpcChannel.cpp
index 7f9ef5d..2ba82ed 100644
--- a/depends/libhdfs3/src/rpc/RpcChannel.cpp
+++ b/depends/libhdfs3/src/rpc/RpcChannel.cpp
@@ -33,6 +33,7 @@
 #include "WriteBuffer.h"
 
 #include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
 
 #define RPC_HEADER_MAGIC "hrpc"
 #define RPC_HEADER_VERSION 9
@@ -756,6 +757,8 @@ static exception_ptr HandlerRpcResponseException(exception_ptr e) {
 
 void RpcChannelImpl::readOneResponse(bool writeLock) {
     int readTimeout = key.getConf().getReadTimeout();
+    int maxLength = key.getConf().getRpcMaxLength();
+
     std::vector<char> buffer(128);
     RpcResponseHeaderProto curRespHeader;
     RpcResponseHeaderProto::RpcStatusProto status;
@@ -768,7 +771,15 @@ void RpcChannelImpl::readOneResponse(bool writeLock) {
     buffer.resize(headerSize);
     in->readFully(&buffer[0], headerSize, readTimeout);
 
-    if (!curRespHeader.ParseFromArray(&buffer[0], headerSize)) {
+    // use CodedInputStream around the buffer, so we can set TotalBytesLimit on it
+    ArrayInputStream ais(&buffer[0], headerSize);
+    CodedInputStream cis(&ais);
+    cis.SetTotalBytesLimit(maxLength, maxLength/2);
+
+    // use ParseFromCodedStream instead of ParseFromArray, so it can consume the above CodedInputStream
+    //
+    // if just use ParseFromArray, we have no chance to set TotalBytesLimit (64MB default)
+    if (!curRespHeader.ParseFromCodedStream(&cis)) {
         THROW(HdfsRpcException,
               "RPC channel to \"%s:%s\" got protocol mismatch: RPC channel cannot parse response header.",
               key.getServer().getHost().c_str(), key.getServer().getPort().c_str())

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/21991077/depends/libhdfs3/src/rpc/RpcConfig.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/rpc/RpcConfig.h b/depends/libhdfs3/src/rpc/RpcConfig.h
index efbb991..fd6b7b0 100644
--- a/depends/libhdfs3/src/rpc/RpcConfig.h
+++ b/depends/libhdfs3/src/rpc/RpcConfig.h
@@ -41,6 +41,7 @@ public:
         tcpNoDelay = conf.isRpcTcpNoDelay();
         lingerTimeout = conf.getRpcSocketLingerTimeout();
         rpcTimeout = conf.getRpcTimeout();
+        rpcMaxLength = conf.getRpcMaxDataLength();
     }
 
     size_t hash_value() const;
@@ -117,6 +118,14 @@ public:
         this->rpcTimeout = rpcTimeout;
     }
 
+    int getRpcMaxLength() const {
+        return rpcMaxLength;
+    }
+
+    void setRpcMaxLength(int rpcTimeout) {
+        this->rpcMaxLength = rpcTimeout;
+    }
+
     bool operator ==(const RpcConfig & other) const {
         return this->maxIdleTime == other.maxIdleTime
                && this->pingTimeout == other.pingTimeout
@@ -126,7 +135,8 @@ public:
                && this->maxRetryOnConnect == other.maxRetryOnConnect
                && this->tcpNoDelay == other.tcpNoDelay
                && this->lingerTimeout == other.lingerTimeout
-               && this->rpcTimeout == other.rpcTimeout;
+               && this->rpcTimeout == other.rpcTimeout
+               && this->rpcMaxLength == other.rpcMaxLength;
     }
 
 private:
@@ -138,6 +148,7 @@ private:
     int maxRetryOnConnect;
     int lingerTimeout;
     int rpcTimeout;
+    int rpcMaxLength;
     bool tcpNoDelay;
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/21991077/src/backend/utils/misc/etc/hdfs-client.xml
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/etc/hdfs-client.xml b/src/backend/utils/misc/etc/hdfs-client.xml
index 0ed8d65..fca9882 100644
--- a/src/backend/utils/misc/etc/hdfs-client.xml
+++ b/src/backend/utils/misc/etc/hdfs-client.xml
@@ -337,4 +337,13 @@ HA -->
 		</description>
 	</property>
 
+	<property>
+		<name>ipc.maximum.data.length</name>
+		<value>67108864</value>
+		<description>
+		The max protobuf message size when talking with HDFS.
+		Increase the value if encounter "Requested data length XXX is longer
+		than maximum configured RPC length XXX" error.
+		</description>
+	</property>
 </configuration>