You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by in...@apache.org on 2018/07/02 09:24:11 UTC
incubator-hawq git commit: HAWQ-1627. Support setting the max
protocol message size when talking with HDFS
Repository: incubator-hawq
Updated Branches:
refs/heads/master c6146c02e -> 21991077c
HAWQ-1627. Support setting the max protocol message size when talking
with HDFS
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/21991077
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/21991077
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/21991077
Branch: refs/heads/master
Commit: 21991077c4812f1ee8f854ee55e56b97d04bc0f0
Parents: c6146c0
Author: interma <in...@outlook.com>
Authored: Wed Jun 20 13:02:44 2018 +0800
Committer: interma <in...@outlook.com>
Committed: Mon Jul 2 17:23:44 2018 +0800
----------------------------------------------------------------------
depends/libhdfs3/src/common/SessionConfig.cpp | 2 ++
depends/libhdfs3/src/common/SessionConfig.h | 9 +++++++++
depends/libhdfs3/src/rpc/RpcChannel.cpp | 13 ++++++++++++-
depends/libhdfs3/src/rpc/RpcConfig.h | 13 ++++++++++++-
src/backend/utils/misc/etc/hdfs-client.xml | 9 +++++++++
5 files changed, 44 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/21991077/depends/libhdfs3/src/common/SessionConfig.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/common/SessionConfig.cpp b/depends/libhdfs3/src/common/SessionConfig.cpp
index 3d9d9ad..569b7e9 100644
--- a/depends/libhdfs3/src/common/SessionConfig.cpp
+++ b/depends/libhdfs3/src/common/SessionConfig.cpp
@@ -83,6 +83,8 @@ SessionConfig::SessionConfig(const Config & conf) {
}, {
&rpcTimeout, "rpc.client.timeout", 3600 * 1000
}, {
+ &rpcMaxDataLength, "ipc.maximum.data.length", 64 * 1024 * 1024
+ }, {
&defaultReplica, "dfs.default.replica", 3, bind(CheckRangeGE<int32_t>, _1, _2, 1)
}, {
&inputConnTimeout, "input.connect.timeout", 600 * 1000
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/21991077/depends/libhdfs3/src/common/SessionConfig.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/common/SessionConfig.h b/depends/libhdfs3/src/common/SessionConfig.h
index 7722401..3352a49 100644
--- a/depends/libhdfs3/src/common/SessionConfig.h
+++ b/depends/libhdfs3/src/common/SessionConfig.h
@@ -245,6 +245,14 @@ public:
this->rpcTimeout = rpcTimeout;
}
+ int32_t getRpcMaxDataLength() const {
+ return rpcMaxDataLength;
+ }
+
+ void setRpcMaxDataLength(int32_t rpcMaxLength) {
+ this->rpcMaxDataLength = rpcMaxLength;
+ }
+
bool doesNotRetryAnotherNode() const {
return notRetryAnotherNode;
}
@@ -334,6 +342,7 @@ public:
int32_t rpcMaxHARetry;
int32_t rpcSocketLingerTimeout;
int32_t rpcTimeout;
+ int32_t rpcMaxDataLength; //ipc.maximum.data.length
bool rpcTcpNoDelay;
std::string rpcAuthMethod;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/21991077/depends/libhdfs3/src/rpc/RpcChannel.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/rpc/RpcChannel.cpp b/depends/libhdfs3/src/rpc/RpcChannel.cpp
index 7f9ef5d..2ba82ed 100644
--- a/depends/libhdfs3/src/rpc/RpcChannel.cpp
+++ b/depends/libhdfs3/src/rpc/RpcChannel.cpp
@@ -33,6 +33,7 @@
#include "WriteBuffer.h"
#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
#define RPC_HEADER_MAGIC "hrpc"
#define RPC_HEADER_VERSION 9
@@ -756,6 +757,8 @@ static exception_ptr HandlerRpcResponseException(exception_ptr e) {
void RpcChannelImpl::readOneResponse(bool writeLock) {
int readTimeout = key.getConf().getReadTimeout();
+ int maxLength = key.getConf().getRpcMaxLength();
+
std::vector<char> buffer(128);
RpcResponseHeaderProto curRespHeader;
RpcResponseHeaderProto::RpcStatusProto status;
@@ -768,7 +771,15 @@ void RpcChannelImpl::readOneResponse(bool writeLock) {
buffer.resize(headerSize);
in->readFully(&buffer[0], headerSize, readTimeout);
- if (!curRespHeader.ParseFromArray(&buffer[0], headerSize)) {
+ // use CodedInputStream around the buffer, so we can set TotalBytesLimit on it
+ ArrayInputStream ais(&buffer[0], headerSize);
+ CodedInputStream cis(&ais);
+ cis.SetTotalBytesLimit(maxLength, maxLength/2);
+
+ // use ParseFromCodedStream instead of ParseFromArray, so it can consume the above CodedInputStream
+ //
+ // if just use ParseFromArray, we have no chance to set TotalBytesLimit (64MB default)
+ if (!curRespHeader.ParseFromCodedStream(&cis)) {
THROW(HdfsRpcException,
"RPC channel to \"%s:%s\" got protocol mismatch: RPC channel cannot parse response header.",
key.getServer().getHost().c_str(), key.getServer().getPort().c_str())
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/21991077/depends/libhdfs3/src/rpc/RpcConfig.h
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/rpc/RpcConfig.h b/depends/libhdfs3/src/rpc/RpcConfig.h
index efbb991..fd6b7b0 100644
--- a/depends/libhdfs3/src/rpc/RpcConfig.h
+++ b/depends/libhdfs3/src/rpc/RpcConfig.h
@@ -41,6 +41,7 @@ public:
tcpNoDelay = conf.isRpcTcpNoDelay();
lingerTimeout = conf.getRpcSocketLingerTimeout();
rpcTimeout = conf.getRpcTimeout();
+ rpcMaxLength = conf.getRpcMaxDataLength();
}
size_t hash_value() const;
@@ -117,6 +118,14 @@ public:
this->rpcTimeout = rpcTimeout;
}
+ int getRpcMaxLength() const {
+ return rpcMaxLength;
+ }
+
+ void setRpcMaxLength(int rpcTimeout) {
+ this->rpcMaxLength = rpcTimeout;
+ }
+
bool operator ==(const RpcConfig & other) const {
return this->maxIdleTime == other.maxIdleTime
&& this->pingTimeout == other.pingTimeout
@@ -126,7 +135,8 @@ public:
&& this->maxRetryOnConnect == other.maxRetryOnConnect
&& this->tcpNoDelay == other.tcpNoDelay
&& this->lingerTimeout == other.lingerTimeout
- && this->rpcTimeout == other.rpcTimeout;
+ && this->rpcTimeout == other.rpcTimeout
+ && this->rpcMaxLength == other.rpcMaxLength;
}
private:
@@ -138,6 +148,7 @@ private:
int maxRetryOnConnect;
int lingerTimeout;
int rpcTimeout;
+ int rpcMaxLength;
bool tcpNoDelay;
};
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/21991077/src/backend/utils/misc/etc/hdfs-client.xml
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/etc/hdfs-client.xml b/src/backend/utils/misc/etc/hdfs-client.xml
index 0ed8d65..fca9882 100644
--- a/src/backend/utils/misc/etc/hdfs-client.xml
+++ b/src/backend/utils/misc/etc/hdfs-client.xml
@@ -337,4 +337,13 @@ HA -->
</description>
</property>
+ <property>
+ <name>ipc.maximum.data.length</name>
+ <value>67108864</value>
+ <description>
+ The max protobuf message size when talking with HDFS.
+ Increase the value if encounter "Requested data length XXX is longer
+ than maximum configured RPC length XXX" error.
+ </description>
+ </property>
</configuration>