You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2019/05/09 16:04:07 UTC
[geode-native] branch develop updated: GEODE-6718: Refactor
TcrConnection::sendRequestForChunkedResponse, derived class,
and remove pass-through calls in TrcConnection and derived (#479)
This is an automated email from the ASF dual-hosted git repository.
bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git
The following commit(s) were added to refs/heads/develop by this push:
new e394467 GEODE-6718: Refactor TcrConnection::sendRequestForChunkedResponse, derived class, and remove pass-through calls in TrcConnection and derived (#479)
e394467 is described below
commit e394467eb07f09e3f6c6a02b429b00a010eb7bf2
Author: Blake Bender <bb...@pivotal.io>
AuthorDate: Thu May 9 09:04:02 2019 -0700
GEODE-6718: Refactor TcrConnection::sendRequestForChunkedResponse, derived class, and remove pass-through calls in TrcConnection and derived (#479)
* Extract switch block in sendRequestForChunkedResponse into separate method
* Refactor check for empty result set into predicate method
* Remove pass-through method sendRequestForChunkedResponse, which was serving no purpose.
* Formatting changes.
* Move new methods below caller
---
cppcache/src/TcrConnection.cpp | 65 ++++++++++++++++++++++++++--------------
cppcache/src/TcrConnection.hpp | 5 ++++
cppcache/src/TcrEndpoint.cpp | 14 ++++-----
cppcache/src/TcrEndpoint.hpp | 3 --
cppcache/src/TcrPoolEndPoint.cpp | 6 ----
cppcache/src/TcrPoolEndPoint.hpp | 3 --
6 files changed, 53 insertions(+), 43 deletions(-)
diff --git a/cppcache/src/TcrConnection.cpp b/cppcache/src/TcrConnection.cpp
index 2089580..c046096 100644
--- a/cppcache/src/TcrConnection.cpp
+++ b/cppcache/src/TcrConnection.cpp
@@ -695,36 +695,54 @@ void TcrConnection::sendRequestForChunkedResponse(
const TcrMessage& request, size_t len, TcrMessageReply& reply,
std::chrono::microseconds sendTimeoutSec,
std::chrono::microseconds receiveTimeoutSec) {
- auto msgType = request.getMessageType();
- switch (msgType) {
- case TcrMessage::QUERY:
- case TcrMessage::QUERY_WITH_PARAMETERS:
- case TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE:
- case TcrMessage::GETDURABLECQS_MSG_TYPE:
- case TcrMessage::EXECUTE_FUNCTION:
- case TcrMessage::EXECUTE_REGION_FUNCTION:
- case TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP: {
- receiveTimeoutSec = reply.getTimeout();
- sendTimeoutSec = reply.getTimeout();
- break;
- }
- default:
- break;
+ if (useReplyTimeout(request)) {
+ receiveTimeoutSec = reply.getTimeout();
+ sendTimeoutSec = reply.getTimeout();
}
+ receiveTimeoutSec -= sendWithTimeouts(request.getMsgData(), len,
+ sendTimeoutSec, receiveTimeoutSec);
+
+ // to help in decoding the reply based on what was the request type
+ reply.setMessageTypeRequest(request.getMessageType());
+
+ if (replyHasResult(request, reply)) {
+ readMessageChunked(reply, receiveTimeoutSec, true);
+ }
+}
+
+bool TcrConnection::useReplyTimeout(const TcrMessage& request) const {
+ auto messageType = request.getMessageType();
+ return ((messageType == TcrMessage::QUERY) ||
+ (messageType == TcrMessage::QUERY_WITH_PARAMETERS) ||
+ (messageType == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE) ||
+ (messageType == TcrMessage::GETDURABLECQS_MSG_TYPE) ||
+ (messageType == TcrMessage::EXECUTE_FUNCTION) ||
+ (messageType == TcrMessage::EXECUTE_REGION_FUNCTION) ||
+ (messageType == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP));
+}
+
+std::chrono::microseconds TcrConnection::sendWithTimeouts(
+ const char* data, size_t len, std::chrono::microseconds sendTimeout,
+ std::chrono::microseconds receiveTimeout) {
std::chrono::microseconds timeSpent{0};
- send(timeSpent, request.getMsgData(), len, sendTimeoutSec, true);
+ send(timeSpent, data, len, sendTimeout, true);
- if (timeSpent >= receiveTimeoutSec)
+ if (timeSpent >= receiveTimeout) {
throwException(
TimeoutException("TcrConnection::send: connection timed out"));
+ }
- receiveTimeoutSec -= timeSpent;
+ return timeSpent;
+}
+
+bool TcrConnection::replyHasResult(const TcrMessage& request,
+ TcrMessageReply& reply) {
+ auto hasResult = true;
- // to help in decoding the reply based on what was the request type
- reply.setMessageTypeRequest(msgType);
// no need of it now, this will not come here
- if (msgType == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP) {
+ if (request.getMessageType() ==
+ TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP) {
ChunkedFunctionExecutionResponse* resultCollector =
static_cast<ChunkedFunctionExecutionResponse*>(
reply.getChunkedResultHandler());
@@ -732,10 +750,11 @@ void TcrConnection::sendRequestForChunkedResponse(
LOGDEBUG(
"TcrConnection::sendRequestForChunkedResponse: function execution, "
"no response desired");
- return;
+ hasResult = false;
}
}
- readMessageChunked(reply, receiveTimeoutSec, true);
+
+ return hasResult;
}
void TcrConnection::send(const char* buffer, size_t len,
diff --git a/cppcache/src/TcrConnection.hpp b/cppcache/src/TcrConnection.hpp
index cd5bb21..b026062 100644
--- a/cppcache/src/TcrConnection.hpp
+++ b/cppcache/src/TcrConnection.hpp
@@ -417,6 +417,11 @@ class APACHE_GEODE_EXPORT TcrConnection {
volatile bool m_isBeingUsed;
std::atomic<uint32_t> m_isUsed;
ThinClientPoolDM* m_poolDM;
+ bool useReplyTimeout(const TcrMessage& request) const;
+ std::chrono::microseconds sendWithTimeouts(
+ const char* data, size_t len, std::chrono::microseconds sendTimeout,
+ std::chrono::microseconds receiveTimeout);
+ bool replyHasResult(const TcrMessage& request, TcrMessageReply& reply);
};
} // namespace client
} // namespace geode
diff --git a/cppcache/src/TcrEndpoint.cpp b/cppcache/src/TcrEndpoint.cpp
index 7b57e5d..812de37 100644
--- a/cppcache/src/TcrEndpoint.cpp
+++ b/cppcache/src/TcrEndpoint.cpp
@@ -793,7 +793,9 @@ GfErrType TcrEndpoint::sendRequestConn(const TcrMessage& request,
if (((type == TcrMessage::EXECUTE_FUNCTION ||
type == TcrMessage::EXECUTE_REGION_FUNCTION) &&
(request.hasResult() & 2))) {
- sendRequestForChunkedResponse(request, reply, conn);
+ conn->sendRequestForChunkedResponse(request, request.getMsgLength(), reply,
+ request.getTimeout(),
+ reply.getTimeout());
} else if (type == TcrMessage::REGISTER_INTEREST_LIST ||
type == TcrMessage::REGISTER_INTEREST ||
type == TcrMessage::QUERY ||
@@ -824,7 +826,9 @@ GfErrType TcrEndpoint::sendRequestConn(const TcrMessage& request,
type == TcrMessage::MONITORCQ_MSG_TYPE ||
type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE ||
type == TcrMessage::GETDURABLECQS_MSG_TYPE) {
- sendRequestForChunkedResponse(request, reply, conn);
+ conn->sendRequestForChunkedResponse(request, request.getMsgLength(), reply,
+ request.getTimeout(),
+ reply.getTimeout());
LOGDEBUG("sendRequestConn: calling sendRequestForChunkedResponse DONE");
} else {
// Chk request type to request if so request.getCallBackArg flag & setCall
@@ -1313,12 +1317,6 @@ std::shared_ptr<QueryService> TcrEndpoint::getQueryService() {
return m_cacheImpl->getQueryService(true);
}
-void TcrEndpoint::sendRequestForChunkedResponse(const TcrMessage& request,
- TcrMessageReply& reply,
- TcrConnection* conn) {
- conn->sendRequestForChunkedResponse(request, request.getMsgLength(), reply);
-}
-
void TcrEndpoint::closeFailedConnection(TcrConnection*& conn) {
closeConnection(conn);
}
diff --git a/cppcache/src/TcrEndpoint.hpp b/cppcache/src/TcrEndpoint.hpp
index 4413529..f1cea69 100644
--- a/cppcache/src/TcrEndpoint.hpp
+++ b/cppcache/src/TcrEndpoint.hpp
@@ -210,9 +210,6 @@ class TcrEndpoint {
virtual void processMarker();
virtual void triggerRedundancyThread();
virtual std::shared_ptr<QueryService> getQueryService();
- virtual void sendRequestForChunkedResponse(const TcrMessage& request,
- TcrMessageReply& reply,
- TcrConnection* conn);
virtual void closeFailedConnection(TcrConnection*& conn);
void closeConnection(TcrConnection*& conn);
virtual void handleNotificationStats(int64_t byteLength);
diff --git a/cppcache/src/TcrPoolEndPoint.cpp b/cppcache/src/TcrPoolEndPoint.cpp
index c3ff4c0..7ac81f4 100644
--- a/cppcache/src/TcrPoolEndPoint.cpp
+++ b/cppcache/src/TcrPoolEndPoint.cpp
@@ -40,12 +40,6 @@ void TcrPoolEndPoint::processMarker() { m_dm->processMarker(); }
std::shared_ptr<QueryService> TcrPoolEndPoint::getQueryService() {
return m_dm->getQueryServiceWithoutCheck();
}
-void TcrPoolEndPoint::sendRequestForChunkedResponse(const TcrMessage& request,
- TcrMessageReply& reply,
- TcrConnection* conn) {
- conn->sendRequestForChunkedResponse(request, request.getMsgLength(), reply,
- request.getTimeout(), reply.getTimeout());
-}
ThinClientPoolDM* TcrPoolEndPoint::getPoolHADM() { return m_dm; }
void TcrPoolEndPoint::triggerRedundancyThread() {
m_dm->triggerRedundancyThread();
diff --git a/cppcache/src/TcrPoolEndPoint.hpp b/cppcache/src/TcrPoolEndPoint.hpp
index ff99479..fc5fbc1 100644
--- a/cppcache/src/TcrPoolEndPoint.hpp
+++ b/cppcache/src/TcrPoolEndPoint.hpp
@@ -39,9 +39,6 @@ class TcrPoolEndPoint : public TcrEndpoint {
bool checkDupAndAdd(std::shared_ptr<EventId> eventid) override;
void processMarker() override;
std::shared_ptr<QueryService> getQueryService() override;
- void sendRequestForChunkedResponse(const TcrMessage& request,
- TcrMessageReply& reply,
- TcrConnection* conn) override;
void closeFailedConnection(TcrConnection*& conn) override;
GfErrType registerDM(bool clientNotification, bool isSecondary = false,
bool isActiveEndpoint = false,