You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2021/04/30 17:55:44 UTC

[geode-native] branch develop updated: GEODE-9200: Clean up DisMess (#793)

This is an automated email from the ASF dual-hosted git repository.

bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7a03e4c  GEODE-9200: Clean up DisMess (#793)
7a03e4c is described below

commit 7a03e4c044996434c07e2178ccd13a5f5a84c621
Author: Blake Bender <bb...@pivotal.io>
AuthorDate: Fri Apr 30 10:55:28 2021 -0700

    GEODE-9200: Clean up DisMess (#793)
    
    Remove static endpoint disconnected message
    - use dynamically allocated message for this, a la other messages
    - no longer leak an instance at shutdown, no more special case code for
      this message when deleting messages.
---
 cppcache/src/CacheImpl.cpp                   | 12 ++--
 cppcache/src/CqService.cpp                   |  7 +--
 cppcache/src/CqService.hpp                   |  2 +-
 cppcache/src/RegionInternal.hpp              |  2 +-
 cppcache/src/RemoteQueryService.cpp          |  2 +-
 cppcache/src/RemoteQueryService.hpp          |  2 +-
 cppcache/src/TcrEndpoint.cpp                 | 42 ++++++-------
 cppcache/src/TcrMessage.cpp                  | 13 ++--
 cppcache/src/TcrMessage.hpp                  | 12 ++--
 cppcache/src/ThinClientHARegion.cpp          | 11 ++--
 cppcache/src/ThinClientHARegion.hpp          |  2 +-
 cppcache/src/ThinClientPoolHADM.cpp          | 90 ++++++++++++++--------------
 cppcache/src/ThinClientPoolHADM.hpp          | 14 ++---
 cppcache/src/ThinClientRedundancyManager.cpp | 12 ++--
 cppcache/src/ThinClientRedundancyManager.hpp |  4 +-
 cppcache/src/ThinClientRegion.cpp            | 68 +++++++++++----------
 cppcache/src/ThinClientRegion.hpp            |  4 +-
 17 files changed, 148 insertions(+), 151 deletions(-)

diff --git a/cppcache/src/CacheImpl.cpp b/cppcache/src/CacheImpl.cpp
index 988e79f..b9e5793 100644
--- a/cppcache/src/CacheImpl.cpp
+++ b/cppcache/src/CacheImpl.cpp
@@ -424,7 +424,7 @@ void CacheImpl::createRegion(std::string name,
     }
 
     regionPtr = rpImpl;
-    rpImpl->addDisMessToQueue();
+    rpImpl->addDisconnectedMessageToQueue();
     // Instantiate a PersistenceManager object if DiskPolicy is overflow
     if (regionAttributes.getDiskPolicy() == DiskPolicyType::OVERFLOWS) {
       auto pmPtr = regionAttributes.getPersistenceManager();
@@ -708,16 +708,14 @@ void CacheImpl::processMarker() {
     if (!kv.second->isDestroyed()) {
       if (const auto tcrHARegion =
               std::dynamic_pointer_cast<ThinClientHARegion>(kv.second)) {
-        auto regionMsg = new TcrMessageClientMarker(
-            new DataOutput(createDataOutput()), true);
-        tcrHARegion->receiveNotification(regionMsg);
+        tcrHARegion->receiveNotification(
+            TcrMessageClientMarker(new DataOutput(createDataOutput()), true));
         for (const auto& iter : tcrHARegion->subregions(true)) {
           if (!iter->isDestroyed()) {
             if (const auto subregion =
                     std::dynamic_pointer_cast<ThinClientHARegion>(iter)) {
-              regionMsg = new TcrMessageClientMarker(
-                  new DataOutput(createDataOutput()), true);
-              subregion->receiveNotification(regionMsg);
+              subregion->receiveNotification(TcrMessageClientMarker(
+                  new DataOutput(createDataOutput()), true));
             }
           }
         }
diff --git a/cppcache/src/CqService.cpp b/cppcache/src/CqService.cpp
index 3ced543..af81045 100644
--- a/cppcache/src/CqService.cpp
+++ b/cppcache/src/CqService.cpp
@@ -383,10 +383,9 @@ bool CqService::isCqExists(const std::string& cqName) {
 
   return m_cqQueryMap.find(cqName) != m_cqQueryMap.end();
 }
-void CqService::receiveNotification(TcrMessage* msg) {
-  invokeCqListeners(msg->getCqs(), msg->getMessageTypeForCq(), msg->getKey(),
-                    msg->getValue(), msg->getDeltaBytes(), msg->getEventId());
-  _GEODE_SAFE_DELETE(msg);
+void CqService::receiveNotification(TcrMessage& msg) {
+  invokeCqListeners(msg.getCqs(), msg.getMessageTypeForCq(), msg.getKey(),
+                    msg.getValue(), msg.getDeltaBytes(), msg.getEventId());
   notification_semaphore_.release();
 }
 
diff --git a/cppcache/src/CqService.hpp b/cppcache/src/CqService.hpp
index 7a6df07..a29a726 100644
--- a/cppcache/src/CqService.hpp
+++ b/cppcache/src/CqService.hpp
@@ -75,7 +75,7 @@ class CqService : public std::enable_shared_from_this<CqService> {
 
   ThinClientBaseDM* getDM() { return m_tccdm; }
 
-  void receiveNotification(TcrMessage* msg);
+  void receiveNotification(TcrMessage& msg);
 
   /**
    * Returns the state of the cqService.
diff --git a/cppcache/src/RegionInternal.hpp b/cppcache/src/RegionInternal.hpp
index 4e4358d..65b0798 100644
--- a/cppcache/src/RegionInternal.hpp
+++ b/cppcache/src/RegionInternal.hpp
@@ -271,7 +271,7 @@ class RegionInternal : public Region {
   std::shared_ptr<RegionEntry> createRegionEntry(
       const std::shared_ptr<CacheableKey>& key,
       const std::shared_ptr<Cacheable>& value);
-  virtual void addDisMessToQueue() {}
+  virtual void addDisconnectedMessageToQueue() {}
 
   virtual void txDestroy(const std::shared_ptr<CacheableKey>& key,
                          const std::shared_ptr<Serializable>& callBack,
diff --git a/cppcache/src/RemoteQueryService.cpp b/cppcache/src/RemoteQueryService.cpp
index 9598a5f..cf03367 100644
--- a/cppcache/src/RemoteQueryService.cpp
+++ b/cppcache/src/RemoteQueryService.cpp
@@ -272,7 +272,7 @@ RemoteQueryService::getCqServiceStatistics() const {
   return nullptr;
 }
 
-void RemoteQueryService::receiveNotification(TcrMessage* msg) {
+void RemoteQueryService::receiveNotification(TcrMessage& msg) {
   {
     TryReadGuard guard(m_rwLock, m_invalid);
 
diff --git a/cppcache/src/RemoteQueryService.hpp b/cppcache/src/RemoteQueryService.hpp
index 8c64ba2..74e1bb3 100644
--- a/cppcache/src/RemoteQueryService.hpp
+++ b/cppcache/src/RemoteQueryService.hpp
@@ -87,7 +87,7 @@ class RemoteQueryService
    * execute all cqs on the endpoint after failover
    */
   GfErrType executeAllCqs(TcrEndpoint* endpoint);
-  void receiveNotification(TcrMessage* msg);
+  void receiveNotification(TcrMessage& msg);
   void invokeCqConnectedListeners(ThinClientPoolDM* pool, bool connected);
   // For Lazy Cq Start-no use, no start
   inline void initCqService() {
diff --git a/cppcache/src/TcrEndpoint.cpp b/cppcache/src/TcrEndpoint.cpp
index 7735cf7..dd79756 100644
--- a/cppcache/src/TcrEndpoint.cpp
+++ b/cppcache/src/TcrEndpoint.cpp
@@ -550,7 +550,6 @@ bool TcrEndpoint::checkDupAndAdd(std::shared_ptr<EventId> eventid) {
 void TcrEndpoint::receiveNotification(std::atomic<bool>& isRunning) {
   LOGFINE("Started subscription channel for endpoint %s", m_name.c_str());
   while (isRunning) {
-    TcrMessageReply* msg = nullptr;
     try {
       size_t dataLen;
       ConnErrType opErr = CONN_NOERR;
@@ -576,34 +575,32 @@ void TcrEndpoint::receiveNotification(std::atomic<bool>& isRunning) {
       }
 
       if (data) {
-        msg = new TcrMessageReply(true, m_baseDM);
-        msg->initCqMap();
-        msg->setData(data, static_cast<int32_t>(dataLen),
-                     getDistributedMemberID(),
-                     *(m_cacheImpl->getSerializationRegistry()),
-                     *(m_cacheImpl->getMemberListForVersionStamp()));
+        TcrMessageReply msg(true, m_baseDM);
+        msg.initCqMap();
+        msg.setData(data, static_cast<int32_t>(dataLen),
+                    getDistributedMemberID(),
+                    *(m_cacheImpl->getSerializationRegistry()),
+                    *(m_cacheImpl->getMemberListForVersionStamp()));
         handleNotificationStats(static_cast<int64_t>(dataLen));
-        LOGDEBUG("receive notification %d", msg->getMessageType());
+        LOGDEBUG("receive notification %d", msg.getMessageType());
 
         if (!isRunning) {
-          _GEODE_SAFE_DELETE(msg);
           break;
         }
 
-        if (msg->getMessageType() == TcrMessage::SERVER_TO_CLIENT_PING) {
+        if (msg.getMessageType() == TcrMessage::SERVER_TO_CLIENT_PING) {
           LOGFINE("Received ping from server subscription channel.");
         }
 
         // ignore some message types like REGISTER_INSTANTIATORS
-        if (msg->shouldIgnore()) {
-          _GEODE_SAFE_DELETE(msg);
+        if (msg.shouldIgnore()) {
           continue;
         }
 
-        bool isMarker = (msg->getMessageType() == TcrMessage::CLIENT_MARKER);
-        if (!msg->hasCqPart()) {
-          if (msg->getMessageType() != TcrMessage::CLIENT_MARKER) {
-            const std::string& regionFullPath1 = msg->getRegionName();
+        bool isMarker = (msg.getMessageType() == TcrMessage::CLIENT_MARKER);
+        if (!msg.hasCqPart()) {
+          if (msg.getMessageType() != TcrMessage::CLIENT_MARKER) {
+            const std::string& regionFullPath1 = msg.getRegionName();
             auto region1 = m_cacheImpl->getRegion(regionFullPath1);
 
             if (region1 != nullptr &&
@@ -614,18 +611,16 @@ void TcrEndpoint::receiveNotification(std::atomic<bool>& isRunning) {
               // checking
               LOGFINER("Endpoint %s dropping event for region %s",
                        m_name.c_str(), regionFullPath1.c_str());
-              _GEODE_SAFE_DELETE(msg);
               continue;
             }
           }
         }
 
-        if (!checkDupAndAdd(msg->getEventId())) {
+        if (!checkDupAndAdd(msg.getEventId())) {
           m_dupCount++;
           if (m_dupCount % 100 == 1) {
             LOGFINE("Dropped %dst duplicate notification message", m_dupCount);
           }
-          _GEODE_SAFE_DELETE(msg);
           continue;
         }
 
@@ -633,11 +628,10 @@ void TcrEndpoint::receiveNotification(std::atomic<bool>& isRunning) {
           LOGFINE("Got a marker message on endpont %s", m_name.c_str());
           m_cacheImpl->processMarker();
           processMarker();
-          _GEODE_SAFE_DELETE(msg);
         } else {
-          if (!msg->hasCqPart())  // || msg->isInterestListPassed())
+          if (!msg.hasCqPart())  // || msg.isInterestListPassed())
           {
-            const std::string& regionFullPath = msg->getRegionName();
+            const std::string& regionFullPath = msg.getRegionName();
             auto region = m_cacheImpl->getRegion(regionFullPath);
 
             if (region != nullptr) {
@@ -650,7 +644,7 @@ void TcrEndpoint::receiveNotification(std::atomic<bool>& isRunning) {
                   regionFullPath.c_str());
             }
           } else {
-            LOGDEBUG("receive cq notification %d", msg->getMessageType());
+            LOGDEBUG("receive cq notification %d", msg.getMessageType());
             auto queryService = getQueryService();
             if (queryService != nullptr) {
               static_cast<RemoteQueryService*>(queryService.get())
@@ -685,13 +679,11 @@ void TcrEndpoint::receiveNotification(std::atomic<bool>& isRunning) {
       }
       break;
     } catch (const Exception& ex) {
-      _GEODE_SAFE_DELETE(msg);
       LOGERROR(
           "Exception while receiving subscription event for endpoint %s:: %s: "
           "%s",
           m_name.c_str(), ex.getName().c_str(), ex.what());
     } catch (...) {
-      _GEODE_SAFE_DELETE(msg);
       LOGERROR(
           "Unexpected exception while "
           "receiving subscription event from endpoint %s",
diff --git a/cppcache/src/TcrMessage.cpp b/cppcache/src/TcrMessage.cpp
index bdaf340..6b30c4a 100644
--- a/cppcache/src/TcrMessage.cpp
+++ b/cppcache/src/TcrMessage.cpp
@@ -255,7 +255,7 @@ TcrChunkedResult* TcrMessage::getChunkedResultHandler() {
   return m_chunkedResult;
 }
 
-DataInput* TcrMessage::getDelta() { return m_delta.get(); }
+DataInput* TcrMessage::getDelta() const { return m_delta.get(); }
 
 //  getDeltaBytes( ) is called *only* by CqService, returns a CacheableBytes
 //  that
@@ -270,7 +270,7 @@ std::shared_ptr<CacheableBytes> TcrMessage::getDeltaBytes() {
   return retVal;
 }
 
-bool TcrMessage::hasDelta() { return (m_delta != nullptr); }
+bool TcrMessage::hasDelta() const { return (m_delta != nullptr); }
 
 void TcrMessage::setMetaRegion(bool isMetaRegion) {
   m_isMetaRegion = isMetaRegion;
@@ -307,7 +307,9 @@ void TcrMessage::setCallBackArguement(bool aCallBackArguement) {
 void TcrMessage::setVersionTag(std::shared_ptr<VersionTag> versionTag) {
   m_versionTag = versionTag;
 }
-std::shared_ptr<VersionTag> TcrMessage::getVersionTag() { return m_versionTag; }
+std::shared_ptr<VersionTag> TcrMessage::getVersionTag() const {
+  return m_versionTag;
+}
 
 uint8_t TcrMessage::hasResult() const { return m_hasResult; }
 
@@ -319,11 +321,6 @@ std::shared_ptr<CacheableHashSet> TcrMessage::getTombstoneKeys() const {
   return m_tombstoneKeys;
 }
 
-TcrMessage* TcrMessage::getAllEPDisMess() {
-  static auto allEPDisconnected = new TcrMessageReply(true, nullptr);
-  return allEPDisconnected;
-}
-
 void TcrMessage::writeInterestResultPolicyPart(InterestResultPolicy policy) {
   m_request->writeInt(static_cast<int32_t>(3));  // size
   m_request->write(static_cast<int8_t>(1));      // isObject
diff --git a/cppcache/src/TcrMessage.hpp b/cppcache/src/TcrMessage.hpp
index d151048..1107b24 100644
--- a/cppcache/src/TcrMessage.hpp
+++ b/cppcache/src/TcrMessage.hpp
@@ -259,7 +259,6 @@ class TcrMessage {
   std::chrono::milliseconds getTimeout() const;
   void setTimeout(std::chrono::milliseconds timeout);
 
-  static TcrMessage* getAllEPDisMess();
   bool isDurable() const;
   bool receiveValues() const;
   bool hasCqPart() const;
@@ -277,13 +276,13 @@ class TcrMessage {
   void setChunkedResultHandler(TcrChunkedResult* chunkedResult);
   TcrChunkedResult* getChunkedResultHandler();
 
-  DataInput* getDelta();
+  DataInput* getDelta() const;
   //  getDeltaBytes( ) is called *only* by CqService, returns a CacheableBytes
   //  that
   // takes ownership of delta bytes.
   std::shared_ptr<CacheableBytes> getDeltaBytes();
 
-  bool hasDelta();
+  bool hasDelta() const;
 
   void addSecurityPart(int64_t connectionId, int64_t unique_id,
                        TcrConnection* conn);
@@ -321,7 +320,7 @@ class TcrMessage {
   void setCallBackArguement(bool aCallBackArguement);
 
   void setVersionTag(std::shared_ptr<VersionTag> versionTag);
-  std::shared_ptr<VersionTag> getVersionTag();
+  std::shared_ptr<VersionTag> getVersionTag() const;
   uint8_t hasResult() const;
   std::shared_ptr<CacheableHashMap> getTombstoneVersions() const;
   std::shared_ptr<CacheableHashSet> getTombstoneKeys() const;
@@ -910,6 +909,11 @@ class TcrMessageReply : public TcrMessage {
   ~TcrMessageReply() override = default;
 };
 
+class TcrMessageAllEndpointsDisconnectedMarker : public TcrMessage {
+ public:
+  TcrMessageAllEndpointsDisconnectedMarker() = default;
+};
+
 /**
  * Helper class to invoke some internal methods of TcrMessage. Add any
  * methods that response processor methods require to access here.
diff --git a/cppcache/src/ThinClientHARegion.cpp b/cppcache/src/ThinClientHARegion.cpp
index 1910919..9c66b19 100644
--- a/cppcache/src/ThinClientHARegion.cpp
+++ b/cppcache/src/ThinClientHARegion.cpp
@@ -117,15 +117,14 @@ void ThinClientHARegion::destroyDM(bool) {
   poolDM->decRegionCount();
 }
 
-void ThinClientHARegion::addDisMessToQueue() {
+void ThinClientHARegion::addDisconnectedMessageToQueue() {
   auto poolDM = std::dynamic_pointer_cast<ThinClientPoolHADM>(m_tcrdm);
-  poolDM->addDisMessToQueue(this);
+  poolDM->addDisconnectedMessageToQueue(this);
 
-  if (poolDM->m_redundancyManager->m_globalProcessedMarker &&
+  if (poolDM->redundancyManager_->m_globalProcessedMarker &&
       !m_processedMarker) {
-    TcrMessage* regionMsg = new TcrMessageClientMarker(
-        new DataOutput(m_cacheImpl->createDataOutput()), true);
-    receiveNotification(regionMsg);
+    receiveNotification(TcrMessageClientMarker(
+        new DataOutput(m_cacheImpl->createDataOutput()), true));
   }
 }
 
diff --git a/cppcache/src/ThinClientHARegion.hpp b/cppcache/src/ThinClientHARegion.hpp
index b66ed75..c7c7f8d 100644
--- a/cppcache/src/ThinClientHARegion.hpp
+++ b/cppcache/src/ThinClientHARegion.hpp
@@ -57,7 +57,7 @@ class ThinClientHARegion : public ThinClientRegion {
   void setProcessedMarker(bool mark = true) override {
     m_processedMarker = mark;
   }
-  void addDisMessToQueue() override;
+  void addDisconnectedMessageToQueue() override;
 
  protected:
   GfErrType getNoThrow_FullObject(
diff --git a/cppcache/src/ThinClientPoolHADM.cpp b/cppcache/src/ThinClientPoolHADM.cpp
index 3d43c85..1113b2a 100644
--- a/cppcache/src/ThinClientPoolHADM.cpp
+++ b/cppcache/src/ThinClientPoolHADM.cpp
@@ -33,11 +33,11 @@ ThinClientPoolHADM::ThinClientPoolHADM(const char* name,
                                        std::shared_ptr<PoolAttributes> poolAttr,
                                        TcrConnectionManager& connManager)
     : ThinClientPoolDM(name, poolAttr, connManager),
-      m_theTcrConnManager(connManager),
+      theTcrConnManager_(connManager),
       redundancy_semaphore_(0),
-      m_redundancyTask(nullptr),
+      redundancyTask_(nullptr),
       server_monitor_task_id_(ExpiryTask::invalid()) {
-  m_redundancyManager = std::unique_ptr<ThinClientRedundancyManager>(
+  redundancyManager_ = std::unique_ptr<ThinClientRedundancyManager>(
       new ThinClientRedundancyManager(
           &connManager, poolAttr->getSubscriptionRedundancy(), this));
 }
@@ -54,10 +54,10 @@ void ThinClientPoolHADM::startBackgroundThreads() {
                     ->getDistributedSystem()
                     .getSystemProperties();
 
-  m_redundancyManager->initialize(m_attrs->getSubscriptionRedundancy());
+  redundancyManager_->initialize(m_attrs->getSubscriptionRedundancy());
   //  Call maintain redundancy level, so primary is available for notification
   //  operations.
-  GfErrType err = m_redundancyManager->maintainRedundancyLevel(true);
+  GfErrType err = redundancyManager_->maintainRedundancyLevel(true);
 
   const auto interval = props.redundancyMonitorInterval();
   auto& manager = m_connManager.getCacheImpl()->getExpiryTaskManager();
@@ -82,11 +82,11 @@ void ThinClientPoolHADM::startBackgroundThreads() {
     }
   }
 
-  m_redundancyManager->startPeriodicAck();
-  m_redundancyTask =
+  redundancyManager_->startPeriodicAck();
+  redundancyTask_ =
       std::unique_ptr<Task<ThinClientPoolHADM>>(new Task<ThinClientPoolHADM>(
           this, &ThinClientPoolHADM::redundancy, NC_Redundancy));
-  m_redundancyTask->start();
+  redundancyTask_->start();
 }
 
 GfErrType ThinClientPoolHADM::sendSyncRequest(TcrMessage& request,
@@ -121,13 +121,13 @@ GfErrType ThinClientPoolHADM::sendSyncRequestRegisterInterestEP(
 GfErrType ThinClientPoolHADM::sendSyncRequestRegisterInterest(
     TcrMessage& request, TcrMessageReply& reply, bool attemptFailover,
     ThinClientRegion* region, TcrEndpoint* endpoint) {
-  return m_redundancyManager->sendSyncRequestRegisterInterest(
+  return redundancyManager_->sendSyncRequestRegisterInterest(
       request, reply, attemptFailover, endpoint, this, region);
 }
 
 GfErrType ThinClientPoolHADM::sendSyncRequestCq(TcrMessage& request,
                                                 TcrMessageReply& reply) {
-  return m_redundancyManager->sendSyncRequestCq(request, reply, this);
+  return redundancyManager_->sendSyncRequestCq(request, reply, this);
 }
 
 bool ThinClientPoolHADM::preFailoverAction() { return true; }
@@ -143,7 +143,7 @@ void ThinClientPoolHADM::redundancy(std::atomic<bool>& isRunning) {
   redundancy_semaphore_.acquire();
   while (isRunning) {
     if (!m_connManager.isNetDown()) {
-      m_redundancyManager->maintainRedundancyLevel();
+      redundancyManager_->maintainRedundancyLevel();
     }
 
     redundancy_semaphore_.acquire();
@@ -165,7 +165,7 @@ void ThinClientPoolHADM::destroy(bool keepAlive) {
 
     sendNotificationCloseMsgs();
 
-    m_redundancyManager->close();
+    redundancyManager_->close();
 
     m_destroyPendingHADM = true;
     ThinClientPoolDM::destroy(keepAlive);
@@ -173,14 +173,14 @@ void ThinClientPoolHADM::destroy(bool keepAlive) {
 }
 
 void ThinClientPoolHADM::sendNotificationCloseMsgs() {
-  if (m_redundancyTask) {
+  if (redundancyTask_) {
     auto& manager = m_connManager.getCacheImpl()->getExpiryTaskManager();
     manager.cancel(server_monitor_task_id_);
-    m_redundancyTask->stopNoblock();
+    redundancyTask_->stopNoblock();
     redundancy_semaphore_.release();
-    m_redundancyTask->wait();
-    m_redundancyTask = nullptr;
-    m_redundancyManager->sendNotificationCloseMsgs();
+    redundancyTask_->wait();
+    redundancyTask_ = nullptr;
+    redundancyManager_->sendNotificationCloseMsgs();
   }
 }
 
@@ -188,8 +188,8 @@ GfErrType ThinClientPoolHADM::registerInterestAllRegions(
     TcrEndpoint* ep, const TcrMessage* request, TcrMessageReply* reply) {
   GfErrType err = GF_NOERR;
 
-  std::lock_guard<decltype(m_regionsLock)> guard(m_regionsLock);
-  for (const auto& region : m_regions) {
+  std::lock_guard<decltype(regionsLock_)> guard(regionsLock_);
+  for (const auto& region : regions_) {
     auto opErr = region->registerKeys(ep, request, reply);
     if (err == GF_NOERR) {
       err = opErr;
@@ -200,51 +200,52 @@ GfErrType ThinClientPoolHADM::registerInterestAllRegions(
 }
 
 bool ThinClientPoolHADM::checkDupAndAdd(std::shared_ptr<EventId> eventid) {
-  return m_redundancyManager->checkDupAndAdd(eventid);
+  return redundancyManager_->checkDupAndAdd(eventid);
 }
 
 void ThinClientPoolHADM::processMarker() {
   // also set the static bool m_processedMarker for makePrimary messages
-  m_redundancyManager->m_globalProcessedMarker = true;
+  redundancyManager_->m_globalProcessedMarker = true;
 }
 
 void ThinClientPoolHADM::acquireRedundancyLock() {
-  m_redundancyManager->acquireRedundancyLock();
+  redundancyManager_->acquireRedundancyLock();
 }
 
 void ThinClientPoolHADM::releaseRedundancyLock() {
-  m_redundancyManager->releaseRedundancyLock();
+  redundancyManager_->releaseRedundancyLock();
 }
 
 std::recursive_mutex& ThinClientPoolHADM::getRedundancyLock() {
-  return m_redundancyManager->getRedundancyLock();
+  return redundancyManager_->getRedundancyLock();
 }
 
 GfErrType ThinClientPoolHADM::sendRequestToPrimary(TcrMessage& request,
                                                    TcrMessageReply& reply) {
-  return m_redundancyManager->sendRequestToPrimary(request, reply);
+  return redundancyManager_->sendRequestToPrimary(request, reply);
 }
 
 bool ThinClientPoolHADM::isReadyForEvent() const {
-  return m_redundancyManager->isSentReadyForEvents();
+  return redundancyManager_->isSentReadyForEvents();
 }
 
 void ThinClientPoolHADM::addRegion(ThinClientRegion* theTCR) {
-  std::lock_guard<decltype(m_regionsLock)> guard(m_regionsLock);
-  m_regions.push_back(theTCR);
+  std::lock_guard<decltype(regionsLock_)> guard(regionsLock_);
+  regions_.push_back(theTCR);
 }
-void ThinClientPoolHADM::addDisMessToQueue(ThinClientRegion* theTCR) {
-  std::lock_guard<decltype(m_regionsLock)> guard(m_regionsLock);
-  if (m_redundancyManager->allEndPointDiscon()) {
-    theTCR->receiveNotification(TcrMessage::getAllEPDisMess());
+void ThinClientPoolHADM::addDisconnectedMessageToQueue(
+    ThinClientRegion* theTCR) {
+  std::lock_guard<decltype(regionsLock_)> guard(regionsLock_);
+  if (redundancyManager_->allEndPointDiscon()) {
+    theTCR->receiveNotification(TcrMessageAllEndpointsDisconnectedMarker());
   }
 }
 void ThinClientPoolHADM::removeRegion(ThinClientRegion* theTCR) {
-  std::lock_guard<decltype(m_regionsLock)> guard(m_regionsLock);
-  for (std::list<ThinClientRegion*>::iterator itr = m_regions.begin();
-       itr != m_regions.end(); itr++) {
+  std::lock_guard<decltype(regionsLock_)> guard(regionsLock_);
+  for (std::list<ThinClientRegion*>::iterator itr = regions_.begin();
+       itr != regions_.end(); itr++) {
     if (*itr == theTCR) {
-      m_regions.erase(itr);
+      regions_.erase(itr);
       return;
     }
   }
@@ -260,7 +261,7 @@ void ThinClientPoolHADM::readyForEvents() {
 
   auto&& durable = sysProp.durableClientId();
   if (!durable.empty()) {
-    m_redundancyManager->readyForEvents();
+    redundancyManager_->readyForEvents();
   }
 }
 
@@ -274,24 +275,23 @@ void ThinClientPoolHADM::netDown() {
     }
   }
 
-  m_redundancyManager->netDown();
+  redundancyManager_->netDown();
 }
 
 void ThinClientPoolHADM::pingServerLocal() {
-  auto& mutex = m_redundancyManager->getRedundancyLock();
+  auto& mutex = redundancyManager_->getRedundancyLock();
   std::lock_guard<decltype(mutex)> guard(mutex);
   ThinClientPoolDM::pingServerLocal();
 }
 
 void ThinClientPoolHADM::removeCallbackConnection(TcrEndpoint* ep) {
-  m_redundancyManager->removeCallbackConnection(ep);
+  redundancyManager_->removeCallbackConnection(ep);
 }
 
-void ThinClientPoolHADM::sendNotConMesToAllregions() {
-  std::lock_guard<decltype(m_regionsLock)> guard(m_regionsLock);
-  for (std::list<ThinClientRegion*>::iterator it = m_regions.begin();
-       it != m_regions.end(); it++) {
-    (*it)->receiveNotification(TcrMessage::getAllEPDisMess());
+void ThinClientPoolHADM::sendNotConnectedMessageToAllregions() {
+  std::lock_guard<decltype(regionsLock_)> guard(regionsLock_);
+  for (auto region : regions_) {
+    region->receiveNotification(TcrMessageAllEndpointsDisconnectedMarker());
   }
 }
 
diff --git a/cppcache/src/ThinClientPoolHADM.hpp b/cppcache/src/ThinClientPoolHADM.hpp
index f1094e1..3bfb055 100644
--- a/cppcache/src/ThinClientPoolHADM.hpp
+++ b/cppcache/src/ThinClientPoolHADM.hpp
@@ -101,11 +101,11 @@ class ThinClientPoolHADM : public ThinClientPoolDM {
   void startBackgroundThreads() override;
 
  private:
-  std::unique_ptr<ThinClientRedundancyManager> m_redundancyManager;
+  std::unique_ptr<ThinClientRedundancyManager> redundancyManager_;
 
-  TcrConnectionManager& m_theTcrConnManager;
+  TcrConnectionManager& theTcrConnManager_;
   binary_semaphore redundancy_semaphore_;
-  std::unique_ptr<Task<ThinClientPoolHADM>> m_redundancyTask;
+  std::unique_ptr<Task<ThinClientPoolHADM>> redundancyTask_;
 
   void redundancy(std::atomic<bool>& isRunning);
 
@@ -115,12 +115,12 @@ class ThinClientPoolHADM : public ThinClientPoolDM {
 
   void removeCallbackConnection(TcrEndpoint*) override;
 
-  std::list<ThinClientRegion*> m_regions;
-  std::recursive_mutex m_regionsLock;
+  std::list<ThinClientRegion*> regions_;
+  std::recursive_mutex regionsLock_;
   void addRegion(ThinClientRegion* theTCR);
   void removeRegion(ThinClientRegion* theTCR);
-  void sendNotConMesToAllregions();
-  void addDisMessToQueue(ThinClientRegion* theTCR);
+  void sendNotConnectedMessageToAllregions();
+  void addDisconnectedMessageToQueue(ThinClientRegion* theTCR);
 
   friend class ThinClientHARegion;
   friend class TcrConnectionManager;
diff --git a/cppcache/src/ThinClientRedundancyManager.cpp b/cppcache/src/ThinClientRedundancyManager.cpp
index 8a2db40..7374b2a 100644
--- a/cppcache/src/ThinClientRedundancyManager.cpp
+++ b/cppcache/src/ThinClientRedundancyManager.cpp
@@ -46,7 +46,7 @@ ThinClientRedundancyManager::ThinClientRedundancyManager(
     ThinClientPoolHADM* poolHADM, bool sentReadyForEvents,
     bool globalProcessedMarker)
     : m_globalProcessedMarker(globalProcessedMarker),
-      m_IsAllEpDisCon(false),
+      m_allEndpointsDisconnected(false),
       m_server(0),
       m_sentReadyForEvents(sentReadyForEvents),
       m_redundancyLevel(redundancyLevel),
@@ -429,14 +429,14 @@ GfErrType ThinClientRedundancyManager::maintainRedundancyLevel(
   }
 
   if (isRedundancySatisfied) {
-    m_IsAllEpDisCon = false;
+    m_allEndpointsDisconnected = false;
     m_loggedRedundancyWarning = false;
     return GF_NOERR;
   } else if (isPrimaryConnected) {
     if (fatal && err != GF_NOERR) {
       return fatalError;
     }
-    m_IsAllEpDisCon = false;
+    m_allEndpointsDisconnected = false;
     if (m_redundancyLevel == -1) {
       LOGINFO("Current subscription redundancy level is %zu",
               m_redundantEndpoints.size() - 1);
@@ -454,9 +454,9 @@ GfErrType ThinClientRedundancyManager::maintainRedundancyLevel(
     // save any fatal errors that occur during maintain redundancy so
     // that we can send it back to the caller, to avoid missing out due
     // to nonfatal errors such as server not available
-    if (m_poolHADM && !m_IsAllEpDisCon) {
-      m_poolHADM->sendNotConMesToAllregions();
-      m_IsAllEpDisCon = true;
+    if (m_poolHADM && !m_allEndpointsDisconnected) {
+      m_poolHADM->sendNotConnectedMessageToAllregions();
+      m_allEndpointsDisconnected = true;
     }
     if (fatal && err != GF_NOERR) {
       return fatalError;
diff --git a/cppcache/src/ThinClientRedundancyManager.hpp b/cppcache/src/ThinClientRedundancyManager.hpp
index 3b6e4ce..ae339f1 100644
--- a/cppcache/src/ThinClientRedundancyManager.hpp
+++ b/cppcache/src/ThinClientRedundancyManager.hpp
@@ -79,7 +79,7 @@ class ThinClientRedundancyManager {
   void netDown();
   void acquireRedundancyLock() { m_redundantEndpointsLock.lock(); }
   void releaseRedundancyLock() { m_redundantEndpointsLock.unlock(); }
-  bool allEndPointDiscon() { return m_IsAllEpDisCon; }
+  bool allEndPointDiscon() { return m_allEndpointsDisconnected; }
   void removeCallbackConnection(TcrEndpoint*);
 
   std::recursive_mutex& getRedundancyLock() { return m_redundantEndpointsLock; }
@@ -92,7 +92,7 @@ class ThinClientRedundancyManager {
   using time_point = clock::time_point;
 
   // for selectServers
-  volatile bool m_IsAllEpDisCon;
+  volatile bool m_allEndpointsDisconnected;
   int m_server;
   bool m_sentReadyForEvents;
   int m_redundancyLevel;
diff --git a/cppcache/src/ThinClientRegion.cpp b/cppcache/src/ThinClientRegion.cpp
index aba0f97..f190198 100644
--- a/cppcache/src/ThinClientRegion.cpp
+++ b/cppcache/src/ThinClientRegion.cpp
@@ -2620,7 +2620,7 @@ ThinClientRegion::getInterestListRegex() const {
   return vlist;
 }
 
-GfErrType ThinClientRegion::clientNotificationHandler(TcrMessage& msg) {
+GfErrType ThinClientRegion::clientNotificationHandler(const TcrMessage& msg) {
   GfErrType err = GF_NOERR;
   std::shared_ptr<Cacheable> oldValue;
   switch (msg.getMessageType()) {
@@ -2672,11 +2672,16 @@ GfErrType ThinClientRegion::clientNotificationHandler(TcrMessage& msg) {
       LocalRegion::tombstoneOperationNoThrow(msg.getTombstoneVersions(),
                                              msg.getTombstoneKeys());
       break;
-    default: {
-      if (TcrMessage::getAllEPDisMess() == &msg) {
+    default:
+      try {
+        auto& marker =
+            dynamic_cast<const TcrMessageAllEndpointsDisconnectedMarker&>(msg);
         setProcessedMarker(false);
+        LOGDEBUG(
+            "ThinClientRegion::clientNotificationHandler: rec'd endpoints "
+            "disconnected message");
         LocalRegion::invokeAfterAllEndPointDisconnected();
-      } else {
+      } catch (std::bad_cast&) {
         LOGERROR(
             "Unknown message type %d in subscription event handler; possible "
             "serialization mismatch",
@@ -2684,13 +2689,12 @@ GfErrType ThinClientRegion::clientNotificationHandler(TcrMessage& msg) {
         err = GF_MSG;
       }
       break;
-    }
   }
 
   // Update EventIdMap to mark event processed, Only for durable client.
   // In case of closing, don't send it as listener might not be invoked.
   if (!m_destroyPending && (m_isDurableClnt || msg.hasDelta()) &&
-      TcrMessage::getAllEPDisMess() != &msg) {
+      msg.getMessageType() != TcrMessage::INVALID) {
     m_tcrdm->checkDupAndAdd(msg.getEventId());
   }
 
@@ -2723,12 +2727,12 @@ GfErrType ThinClientRegion::handleServerException(
                                "TransactionDataNodeHasDepartedException") !=
              std::string::npos) {
     error = GF_TRANSACTION_DATA_NODE_HAS_DEPARTED_EXCEPTION;
-  } else if (exceptionMsg.find(
-                 "org.apache.geode.cache.TransactionDataRebalancedException") !=
+  } else if (exceptionMsg.find("org.apache.geode.cache."
+                               "TransactionDataRebalancedException") !=
              std::string::npos) {
     error = GF_TRANSACTION_DATA_REBALANCED_EXCEPTION;
-  } else if (exceptionMsg.find(
-                 "org.apache.geode.security.AuthenticationRequiredException") !=
+  } else if (exceptionMsg.find("org.apache.geode.security."
+                               "AuthenticationRequiredException") !=
              std::string::npos) {
     error = GF_AUTHENTICATION_REQUIRED_EXCEPTION;
   } else if (exceptionMsg.find("org.apache.geode.cache.LowMemoryException") !=
@@ -2752,27 +2756,23 @@ GfErrType ThinClientRegion::handleServerException(
   return error;
 }
 
-void ThinClientRegion::receiveNotification(TcrMessage* msg) {
+void ThinClientRegion::receiveNotification(const TcrMessage& msg) {
   std::unique_lock<std::mutex> lock(m_notificationMutex, std::defer_lock);
   {
     TryReadGuard guard(m_rwLock, m_destroyPending);
     if (m_destroyPending) {
-      if (msg != TcrMessage::getAllEPDisMess()) {
-        _GEODE_SAFE_DELETE(msg);
-      }
       return;
     }
     lock.lock();
   }
 
-  if (msg->getMessageType() == TcrMessage::CLIENT_MARKER) {
+  if (msg.getMessageType() == TcrMessage::CLIENT_MARKER) {
     handleMarker();
   } else {
-    clientNotificationHandler(*msg);
+    clientNotificationHandler(msg);
   }
 
   lock.unlock();
-  if (TcrMessage::getAllEPDisMess() != msg) _GEODE_SAFE_DELETE(msg);
 }
 
 void ThinClientRegion::localInvalidateRegion_internal() {
@@ -2967,7 +2967,8 @@ void ThinClientRegion::executeFunction(
       } else if (err == GF_NOTCON) {
         attempt++;
         LOGDEBUG(
-            "ThinClientRegion::executeFunction with GF_NOTCON retry attempt = "
+            "ThinClientRegion::executeFunction with GF_NOTCON retry attempt "
+            "= "
             "%d ",
             attempt);
         if (attempt > retryAttempts) {
@@ -3052,7 +3053,8 @@ std::shared_ptr<CacheableVector> ThinClientRegion::reExecuteFunction(
         failedNodes->clear();
         if (failedNodesIds) {
           LOGDEBUG(
-              "ThinClientRegion::reExecuteFunction with GF_FUNCTION_EXCEPTION "
+              "ThinClientRegion::reExecuteFunction with "
+              "GF_FUNCTION_EXCEPTION "
               "failedNodesIds size = %zu ",
               failedNodesIds->size());
           failedNodes->insert(failedNodesIds->begin(), failedNodesIds->end());
@@ -3141,7 +3143,8 @@ bool ThinClientRegion::executeFunctionSH(
             currentReply->getFailedNode());
         if (failedNodeIds) {
           LOGDEBUG(
-              "ThinClientRegion::executeFunctionSH with GF_FUNCTION_EXCEPTION "
+              "ThinClientRegion::executeFunctionSH with "
+              "GF_FUNCTION_EXCEPTION "
               "failedNodeIds size = %zu ",
               failedNodeIds->size());
           failedNodes->insert(failedNodeIds->begin(), failedNodeIds->end());
@@ -3365,12 +3368,14 @@ void ChunkedQueryResponse::readObjectPartList(DataInput& input,
           if (arrayType != DSFid::CacheableObjectPartList) {
             LOGERROR(
                 "Query response got unhandled message format %d while "
-                "expecting struct set object part list; possible serialization "
+                "expecting struct set object part list; possible "
+                "serialization "
                 "mismatch",
                 arrayType);
             throw MessageException(
                 "Query response got unhandled message format while expecting "
-                "struct set object part list; possible serialization mismatch");
+                "struct set object part list; possible serialization "
+                "mismatch");
           }
           readObjectPartList(input, true);
         } else {
@@ -3491,7 +3496,8 @@ void ChunkedQueryResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen,
           "object part list; possible serialization mismatch",
           arrayType);
       throw MessageException(
-          "Query response got unhandled message format while expecting object "
+          "Query response got unhandled message format while expecting "
+          "object "
           "part list; possible serialization mismatch");
     }
     readObjectPartList(input, isResultSet);
@@ -3556,9 +3562,9 @@ void ChunkedFunctionExecutionResponse::handleChunk(
     return;
   }
 
-  auto startLen = static_cast<size_t>(
-      input.getBytesRead() -
-      1);  // from here need to look value part + memberid AND -1 for array type
+  // from here need to look value part + memberid AND -1 for array type
+  auto startLen = static_cast<size_t>(input.getBytesRead() - 1);
+
   // read and ignore array length
   input.readArrayLength();
 
@@ -3571,7 +3577,8 @@ void ChunkedFunctionExecutionResponse::handleChunk(
   const int SECURE_PART_LEN = 5 + 8;
   bool readPart = true;
   LOGDEBUG(
-      "ChunkedFunctionExecutionResponse::handleChunk chunkLen = %d & partLen = "
+      "ChunkedFunctionExecutionResponse::handleChunk chunkLen = %d & partLen "
+      "= "
       "%d ",
       chunkLen, partLen);
   if (partType == DSCode::JavaSerializable) {
@@ -3592,7 +3599,8 @@ void ChunkedFunctionExecutionResponse::handleChunk(
       // skip first part i.e JavaSerializable.
       TcrMessageHelper::skipParts(m_msg, input, 1);
 
-      // read the second part which is string in usual manner, first its length.
+      // read the second part which is string in usual manner, first its
+      // length.
       partLen = input.readInt32();
 
       // then isObject byte
@@ -3601,8 +3609,8 @@ void ChunkedFunctionExecutionResponse::handleChunk(
       startLen = input.getBytesRead();  // reset from here need to look value
       // part + memberid AND -1 for array type
 
-      // Since it is contained as a part of other results, read arrayType which
-      // is arrayList = 65.
+      // Since it is contained as a part of other results, read arrayType
+      // which is arrayList = 65.
       input.read();
 
       // read and ignore its len which is 2
diff --git a/cppcache/src/ThinClientRegion.hpp b/cppcache/src/ThinClientRegion.hpp
index 94fbac0..59dd51b 100644
--- a/cppcache/src/ThinClientRegion.hpp
+++ b/cppcache/src/ThinClientRegion.hpp
@@ -133,7 +133,7 @@ class ThinClientRegion : public LocalRegion {
   std::vector<std::shared_ptr<CacheableString>> getInterestListRegex()
       const override;
 
-  void receiveNotification(TcrMessage* msg);
+  void receiveNotification(const TcrMessage& msg);
 
   static GfErrType handleServerException(const std::string& func,
                                          const std::string& exceptionMsg);
@@ -257,7 +257,7 @@ class ThinClientRegion : public LocalRegion {
                                    bool attemptFailover = true);
   GfErrType unregisterRegexNoThrowLocalDestroy(const std::string& regex,
                                                bool attemptFailover = true);
-  GfErrType clientNotificationHandler(TcrMessage& msg);
+  GfErrType clientNotificationHandler(const TcrMessage& msg);
 
   virtual void localInvalidateRegion_internal();