You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2019/05/09 16:04:07 UTC

[geode-native] branch develop updated: GEODE-6718: Refactor TcrConnection::sendRequestForChunkedResponse, derived class, and remove pass-through calls in TrcConnection and derived (#479)

This is an automated email from the ASF dual-hosted git repository.

bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git


The following commit(s) were added to refs/heads/develop by this push:
     new e394467  GEODE-6718: Refactor TcrConnection::sendRequestForChunkedResponse, derived class, and remove pass-through calls in TrcConnection and derived (#479)
e394467 is described below

commit e394467eb07f09e3f6c6a02b429b00a010eb7bf2
Author: Blake Bender <bb...@pivotal.io>
AuthorDate: Thu May 9 09:04:02 2019 -0700

    GEODE-6718: Refactor TcrConnection::sendRequestForChunkedResponse, derived class, and remove pass-through calls in TrcConnection and derived (#479)
    
    * Extract switch block in sendRequestForChunkedResponse into separate method
    * Refactor check for empty result set into predicate method
    * Remove pass-through method sendRequestForChunkedResponse, which was serving no purpose.
    * Formatting changes.
    * Move new methods below caller
---
 cppcache/src/TcrConnection.cpp   | 65 ++++++++++++++++++++++++++--------------
 cppcache/src/TcrConnection.hpp   |  5 ++++
 cppcache/src/TcrEndpoint.cpp     | 14 ++++-----
 cppcache/src/TcrEndpoint.hpp     |  3 --
 cppcache/src/TcrPoolEndPoint.cpp |  6 ----
 cppcache/src/TcrPoolEndPoint.hpp |  3 --
 6 files changed, 53 insertions(+), 43 deletions(-)

diff --git a/cppcache/src/TcrConnection.cpp b/cppcache/src/TcrConnection.cpp
index 2089580..c046096 100644
--- a/cppcache/src/TcrConnection.cpp
+++ b/cppcache/src/TcrConnection.cpp
@@ -695,36 +695,54 @@ void TcrConnection::sendRequestForChunkedResponse(
     const TcrMessage& request, size_t len, TcrMessageReply& reply,
     std::chrono::microseconds sendTimeoutSec,
     std::chrono::microseconds receiveTimeoutSec) {
-  auto msgType = request.getMessageType();
-  switch (msgType) {
-    case TcrMessage::QUERY:
-    case TcrMessage::QUERY_WITH_PARAMETERS:
-    case TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE:
-    case TcrMessage::GETDURABLECQS_MSG_TYPE:
-    case TcrMessage::EXECUTE_FUNCTION:
-    case TcrMessage::EXECUTE_REGION_FUNCTION:
-    case TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP: {
-      receiveTimeoutSec = reply.getTimeout();
-      sendTimeoutSec = reply.getTimeout();
-      break;
-    }
-    default:
-      break;
+  if (useReplyTimeout(request)) {
+    receiveTimeoutSec = reply.getTimeout();
+    sendTimeoutSec = reply.getTimeout();
   }
 
+  receiveTimeoutSec -= sendWithTimeouts(request.getMsgData(), len,
+                                        sendTimeoutSec, receiveTimeoutSec);
+
+  // to help in decoding the reply based on what was the request type
+  reply.setMessageTypeRequest(request.getMessageType());
+
+  if (replyHasResult(request, reply)) {
+    readMessageChunked(reply, receiveTimeoutSec, true);
+  }
+}
+
+bool TcrConnection::useReplyTimeout(const TcrMessage& request) const {
+  auto messageType = request.getMessageType();
+  return ((messageType == TcrMessage::QUERY) ||
+          (messageType == TcrMessage::QUERY_WITH_PARAMETERS) ||
+          (messageType == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE) ||
+          (messageType == TcrMessage::GETDURABLECQS_MSG_TYPE) ||
+          (messageType == TcrMessage::EXECUTE_FUNCTION) ||
+          (messageType == TcrMessage::EXECUTE_REGION_FUNCTION) ||
+          (messageType == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP));
+}
+
+std::chrono::microseconds TcrConnection::sendWithTimeouts(
+    const char* data, size_t len, std::chrono::microseconds sendTimeout,
+    std::chrono::microseconds receiveTimeout) {
   std::chrono::microseconds timeSpent{0};
-  send(timeSpent, request.getMsgData(), len, sendTimeoutSec, true);
+  send(timeSpent, data, len, sendTimeout, true);
 
-  if (timeSpent >= receiveTimeoutSec)
+  if (timeSpent >= receiveTimeout) {
     throwException(
         TimeoutException("TcrConnection::send: connection timed out"));
+  }
 
-  receiveTimeoutSec -= timeSpent;
+  return timeSpent;
+}
+
+bool TcrConnection::replyHasResult(const TcrMessage& request,
+                                   TcrMessageReply& reply) {
+  auto hasResult = true;
 
-  // to help in decoding the reply based on what was the request type
-  reply.setMessageTypeRequest(msgType);
   // no need of it now, this will not come here
-  if (msgType == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP) {
+  if (request.getMessageType() ==
+      TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP) {
     ChunkedFunctionExecutionResponse* resultCollector =
         static_cast<ChunkedFunctionExecutionResponse*>(
             reply.getChunkedResultHandler());
@@ -732,10 +750,11 @@ void TcrConnection::sendRequestForChunkedResponse(
       LOGDEBUG(
           "TcrConnection::sendRequestForChunkedResponse: function execution, "
           "no response desired");
-      return;
+      hasResult = false;
     }
   }
-  readMessageChunked(reply, receiveTimeoutSec, true);
+
+  return hasResult;
 }
 
 void TcrConnection::send(const char* buffer, size_t len,
diff --git a/cppcache/src/TcrConnection.hpp b/cppcache/src/TcrConnection.hpp
index cd5bb21..b026062 100644
--- a/cppcache/src/TcrConnection.hpp
+++ b/cppcache/src/TcrConnection.hpp
@@ -417,6 +417,11 @@ class APACHE_GEODE_EXPORT TcrConnection {
   volatile bool m_isBeingUsed;
   std::atomic<uint32_t> m_isUsed;
   ThinClientPoolDM* m_poolDM;
+  bool useReplyTimeout(const TcrMessage& request) const;
+  std::chrono::microseconds sendWithTimeouts(
+      const char* data, size_t len, std::chrono::microseconds sendTimeout,
+      std::chrono::microseconds receiveTimeout);
+  bool replyHasResult(const TcrMessage& request, TcrMessageReply& reply);
 };
 }  // namespace client
 }  // namespace geode
diff --git a/cppcache/src/TcrEndpoint.cpp b/cppcache/src/TcrEndpoint.cpp
index 7b57e5d..812de37 100644
--- a/cppcache/src/TcrEndpoint.cpp
+++ b/cppcache/src/TcrEndpoint.cpp
@@ -793,7 +793,9 @@ GfErrType TcrEndpoint::sendRequestConn(const TcrMessage& request,
   if (((type == TcrMessage::EXECUTE_FUNCTION ||
         type == TcrMessage::EXECUTE_REGION_FUNCTION) &&
        (request.hasResult() & 2))) {
-    sendRequestForChunkedResponse(request, reply, conn);
+    conn->sendRequestForChunkedResponse(request, request.getMsgLength(), reply,
+                                        request.getTimeout(),
+                                        reply.getTimeout());
   } else if (type == TcrMessage::REGISTER_INTEREST_LIST ||
              type == TcrMessage::REGISTER_INTEREST ||
              type == TcrMessage::QUERY ||
@@ -824,7 +826,9 @@ GfErrType TcrEndpoint::sendRequestConn(const TcrMessage& request,
              type == TcrMessage::MONITORCQ_MSG_TYPE ||
              type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE ||
              type == TcrMessage::GETDURABLECQS_MSG_TYPE) {
-    sendRequestForChunkedResponse(request, reply, conn);
+    conn->sendRequestForChunkedResponse(request, request.getMsgLength(), reply,
+                                        request.getTimeout(),
+                                        reply.getTimeout());
     LOGDEBUG("sendRequestConn: calling sendRequestForChunkedResponse DONE");
   } else {
     // Chk request type to request if so request.getCallBackArg flag & setCall
@@ -1313,12 +1317,6 @@ std::shared_ptr<QueryService> TcrEndpoint::getQueryService() {
   return m_cacheImpl->getQueryService(true);
 }
 
-void TcrEndpoint::sendRequestForChunkedResponse(const TcrMessage& request,
-                                                TcrMessageReply& reply,
-                                                TcrConnection* conn) {
-  conn->sendRequestForChunkedResponse(request, request.getMsgLength(), reply);
-}
-
 void TcrEndpoint::closeFailedConnection(TcrConnection*& conn) {
   closeConnection(conn);
 }
diff --git a/cppcache/src/TcrEndpoint.hpp b/cppcache/src/TcrEndpoint.hpp
index 4413529..f1cea69 100644
--- a/cppcache/src/TcrEndpoint.hpp
+++ b/cppcache/src/TcrEndpoint.hpp
@@ -210,9 +210,6 @@ class TcrEndpoint {
   virtual void processMarker();
   virtual void triggerRedundancyThread();
   virtual std::shared_ptr<QueryService> getQueryService();
-  virtual void sendRequestForChunkedResponse(const TcrMessage& request,
-                                             TcrMessageReply& reply,
-                                             TcrConnection* conn);
   virtual void closeFailedConnection(TcrConnection*& conn);
   void closeConnection(TcrConnection*& conn);
   virtual void handleNotificationStats(int64_t byteLength);
diff --git a/cppcache/src/TcrPoolEndPoint.cpp b/cppcache/src/TcrPoolEndPoint.cpp
index c3ff4c0..7ac81f4 100644
--- a/cppcache/src/TcrPoolEndPoint.cpp
+++ b/cppcache/src/TcrPoolEndPoint.cpp
@@ -40,12 +40,6 @@ void TcrPoolEndPoint::processMarker() { m_dm->processMarker(); }
 std::shared_ptr<QueryService> TcrPoolEndPoint::getQueryService() {
   return m_dm->getQueryServiceWithoutCheck();
 }
-void TcrPoolEndPoint::sendRequestForChunkedResponse(const TcrMessage& request,
-                                                    TcrMessageReply& reply,
-                                                    TcrConnection* conn) {
-  conn->sendRequestForChunkedResponse(request, request.getMsgLength(), reply,
-                                      request.getTimeout(), reply.getTimeout());
-}
 ThinClientPoolDM* TcrPoolEndPoint::getPoolHADM() { return m_dm; }
 void TcrPoolEndPoint::triggerRedundancyThread() {
   m_dm->triggerRedundancyThread();
diff --git a/cppcache/src/TcrPoolEndPoint.hpp b/cppcache/src/TcrPoolEndPoint.hpp
index ff99479..fc5fbc1 100644
--- a/cppcache/src/TcrPoolEndPoint.hpp
+++ b/cppcache/src/TcrPoolEndPoint.hpp
@@ -39,9 +39,6 @@ class TcrPoolEndPoint : public TcrEndpoint {
   bool checkDupAndAdd(std::shared_ptr<EventId> eventid) override;
   void processMarker() override;
   std::shared_ptr<QueryService> getQueryService() override;
-  void sendRequestForChunkedResponse(const TcrMessage& request,
-                                     TcrMessageReply& reply,
-                                     TcrConnection* conn) override;
   void closeFailedConnection(TcrConnection*& conn) override;
   GfErrType registerDM(bool clientNotification, bool isSecondary = false,
                        bool isActiveEndpoint = false,