You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2021/04/30 17:55:44 UTC
[geode-native] branch develop updated: GEODE-9200: Clean up DisMess
(#793)
This is an automated email from the ASF dual-hosted git repository.
bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git
The following commit(s) were added to refs/heads/develop by this push:
new 7a03e4c GEODE-9200: Clean up DisMess (#793)
7a03e4c is described below
commit 7a03e4c044996434c07e2178ccd13a5f5a84c621
Author: Blake Bender <bb...@pivotal.io>
AuthorDate: Fri Apr 30 10:55:28 2021 -0700
GEODE-9200: Clean up DisMess (#793)
Remove static endpoint disconnected message
- use dynamically allocated message for this, a la other messages
- no longer leak an instance at shutdown, no more special case code for
this message when deleting messages.
---
cppcache/src/CacheImpl.cpp | 12 ++--
cppcache/src/CqService.cpp | 7 +--
cppcache/src/CqService.hpp | 2 +-
cppcache/src/RegionInternal.hpp | 2 +-
cppcache/src/RemoteQueryService.cpp | 2 +-
cppcache/src/RemoteQueryService.hpp | 2 +-
cppcache/src/TcrEndpoint.cpp | 42 ++++++-------
cppcache/src/TcrMessage.cpp | 13 ++--
cppcache/src/TcrMessage.hpp | 12 ++--
cppcache/src/ThinClientHARegion.cpp | 11 ++--
cppcache/src/ThinClientHARegion.hpp | 2 +-
cppcache/src/ThinClientPoolHADM.cpp | 90 ++++++++++++++--------------
cppcache/src/ThinClientPoolHADM.hpp | 14 ++---
cppcache/src/ThinClientRedundancyManager.cpp | 12 ++--
cppcache/src/ThinClientRedundancyManager.hpp | 4 +-
cppcache/src/ThinClientRegion.cpp | 68 +++++++++++----------
cppcache/src/ThinClientRegion.hpp | 4 +-
17 files changed, 148 insertions(+), 151 deletions(-)
diff --git a/cppcache/src/CacheImpl.cpp b/cppcache/src/CacheImpl.cpp
index 988e79f..b9e5793 100644
--- a/cppcache/src/CacheImpl.cpp
+++ b/cppcache/src/CacheImpl.cpp
@@ -424,7 +424,7 @@ void CacheImpl::createRegion(std::string name,
}
regionPtr = rpImpl;
- rpImpl->addDisMessToQueue();
+ rpImpl->addDisconnectedMessageToQueue();
// Instantiate a PersistenceManager object if DiskPolicy is overflow
if (regionAttributes.getDiskPolicy() == DiskPolicyType::OVERFLOWS) {
auto pmPtr = regionAttributes.getPersistenceManager();
@@ -708,16 +708,14 @@ void CacheImpl::processMarker() {
if (!kv.second->isDestroyed()) {
if (const auto tcrHARegion =
std::dynamic_pointer_cast<ThinClientHARegion>(kv.second)) {
- auto regionMsg = new TcrMessageClientMarker(
- new DataOutput(createDataOutput()), true);
- tcrHARegion->receiveNotification(regionMsg);
+ tcrHARegion->receiveNotification(
+ TcrMessageClientMarker(new DataOutput(createDataOutput()), true));
for (const auto& iter : tcrHARegion->subregions(true)) {
if (!iter->isDestroyed()) {
if (const auto subregion =
std::dynamic_pointer_cast<ThinClientHARegion>(iter)) {
- regionMsg = new TcrMessageClientMarker(
- new DataOutput(createDataOutput()), true);
- subregion->receiveNotification(regionMsg);
+ subregion->receiveNotification(TcrMessageClientMarker(
+ new DataOutput(createDataOutput()), true));
}
}
}
diff --git a/cppcache/src/CqService.cpp b/cppcache/src/CqService.cpp
index 3ced543..af81045 100644
--- a/cppcache/src/CqService.cpp
+++ b/cppcache/src/CqService.cpp
@@ -383,10 +383,9 @@ bool CqService::isCqExists(const std::string& cqName) {
return m_cqQueryMap.find(cqName) != m_cqQueryMap.end();
}
-void CqService::receiveNotification(TcrMessage* msg) {
- invokeCqListeners(msg->getCqs(), msg->getMessageTypeForCq(), msg->getKey(),
- msg->getValue(), msg->getDeltaBytes(), msg->getEventId());
- _GEODE_SAFE_DELETE(msg);
+void CqService::receiveNotification(TcrMessage& msg) {
+ invokeCqListeners(msg.getCqs(), msg.getMessageTypeForCq(), msg.getKey(),
+ msg.getValue(), msg.getDeltaBytes(), msg.getEventId());
notification_semaphore_.release();
}
diff --git a/cppcache/src/CqService.hpp b/cppcache/src/CqService.hpp
index 7a6df07..a29a726 100644
--- a/cppcache/src/CqService.hpp
+++ b/cppcache/src/CqService.hpp
@@ -75,7 +75,7 @@ class CqService : public std::enable_shared_from_this<CqService> {
ThinClientBaseDM* getDM() { return m_tccdm; }
- void receiveNotification(TcrMessage* msg);
+ void receiveNotification(TcrMessage& msg);
/**
* Returns the state of the cqService.
diff --git a/cppcache/src/RegionInternal.hpp b/cppcache/src/RegionInternal.hpp
index 4e4358d..65b0798 100644
--- a/cppcache/src/RegionInternal.hpp
+++ b/cppcache/src/RegionInternal.hpp
@@ -271,7 +271,7 @@ class RegionInternal : public Region {
std::shared_ptr<RegionEntry> createRegionEntry(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value);
- virtual void addDisMessToQueue() {}
+ virtual void addDisconnectedMessageToQueue() {}
virtual void txDestroy(const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Serializable>& callBack,
diff --git a/cppcache/src/RemoteQueryService.cpp b/cppcache/src/RemoteQueryService.cpp
index 9598a5f..cf03367 100644
--- a/cppcache/src/RemoteQueryService.cpp
+++ b/cppcache/src/RemoteQueryService.cpp
@@ -272,7 +272,7 @@ RemoteQueryService::getCqServiceStatistics() const {
return nullptr;
}
-void RemoteQueryService::receiveNotification(TcrMessage* msg) {
+void RemoteQueryService::receiveNotification(TcrMessage& msg) {
{
TryReadGuard guard(m_rwLock, m_invalid);
diff --git a/cppcache/src/RemoteQueryService.hpp b/cppcache/src/RemoteQueryService.hpp
index 8c64ba2..74e1bb3 100644
--- a/cppcache/src/RemoteQueryService.hpp
+++ b/cppcache/src/RemoteQueryService.hpp
@@ -87,7 +87,7 @@ class RemoteQueryService
* execute all cqs on the endpoint after failover
*/
GfErrType executeAllCqs(TcrEndpoint* endpoint);
- void receiveNotification(TcrMessage* msg);
+ void receiveNotification(TcrMessage& msg);
void invokeCqConnectedListeners(ThinClientPoolDM* pool, bool connected);
// For Lazy Cq Start-no use, no start
inline void initCqService() {
diff --git a/cppcache/src/TcrEndpoint.cpp b/cppcache/src/TcrEndpoint.cpp
index 7735cf7..dd79756 100644
--- a/cppcache/src/TcrEndpoint.cpp
+++ b/cppcache/src/TcrEndpoint.cpp
@@ -550,7 +550,6 @@ bool TcrEndpoint::checkDupAndAdd(std::shared_ptr<EventId> eventid) {
void TcrEndpoint::receiveNotification(std::atomic<bool>& isRunning) {
LOGFINE("Started subscription channel for endpoint %s", m_name.c_str());
while (isRunning) {
- TcrMessageReply* msg = nullptr;
try {
size_t dataLen;
ConnErrType opErr = CONN_NOERR;
@@ -576,34 +575,32 @@ void TcrEndpoint::receiveNotification(std::atomic<bool>& isRunning) {
}
if (data) {
- msg = new TcrMessageReply(true, m_baseDM);
- msg->initCqMap();
- msg->setData(data, static_cast<int32_t>(dataLen),
- getDistributedMemberID(),
- *(m_cacheImpl->getSerializationRegistry()),
- *(m_cacheImpl->getMemberListForVersionStamp()));
+ TcrMessageReply msg(true, m_baseDM);
+ msg.initCqMap();
+ msg.setData(data, static_cast<int32_t>(dataLen),
+ getDistributedMemberID(),
+ *(m_cacheImpl->getSerializationRegistry()),
+ *(m_cacheImpl->getMemberListForVersionStamp()));
handleNotificationStats(static_cast<int64_t>(dataLen));
- LOGDEBUG("receive notification %d", msg->getMessageType());
+ LOGDEBUG("receive notification %d", msg.getMessageType());
if (!isRunning) {
- _GEODE_SAFE_DELETE(msg);
break;
}
- if (msg->getMessageType() == TcrMessage::SERVER_TO_CLIENT_PING) {
+ if (msg.getMessageType() == TcrMessage::SERVER_TO_CLIENT_PING) {
LOGFINE("Received ping from server subscription channel.");
}
// ignore some message types like REGISTER_INSTANTIATORS
- if (msg->shouldIgnore()) {
- _GEODE_SAFE_DELETE(msg);
+ if (msg.shouldIgnore()) {
continue;
}
- bool isMarker = (msg->getMessageType() == TcrMessage::CLIENT_MARKER);
- if (!msg->hasCqPart()) {
- if (msg->getMessageType() != TcrMessage::CLIENT_MARKER) {
- const std::string& regionFullPath1 = msg->getRegionName();
+ bool isMarker = (msg.getMessageType() == TcrMessage::CLIENT_MARKER);
+ if (!msg.hasCqPart()) {
+ if (msg.getMessageType() != TcrMessage::CLIENT_MARKER) {
+ const std::string& regionFullPath1 = msg.getRegionName();
auto region1 = m_cacheImpl->getRegion(regionFullPath1);
if (region1 != nullptr &&
@@ -614,18 +611,16 @@ void TcrEndpoint::receiveNotification(std::atomic<bool>& isRunning) {
// checking
LOGFINER("Endpoint %s dropping event for region %s",
m_name.c_str(), regionFullPath1.c_str());
- _GEODE_SAFE_DELETE(msg);
continue;
}
}
}
- if (!checkDupAndAdd(msg->getEventId())) {
+ if (!checkDupAndAdd(msg.getEventId())) {
m_dupCount++;
if (m_dupCount % 100 == 1) {
LOGFINE("Dropped %dst duplicate notification message", m_dupCount);
}
- _GEODE_SAFE_DELETE(msg);
continue;
}
@@ -633,11 +628,10 @@ void TcrEndpoint::receiveNotification(std::atomic<bool>& isRunning) {
LOGFINE("Got a marker message on endpont %s", m_name.c_str());
m_cacheImpl->processMarker();
processMarker();
- _GEODE_SAFE_DELETE(msg);
} else {
- if (!msg->hasCqPart()) // || msg->isInterestListPassed())
+ if (!msg.hasCqPart()) // || msg.isInterestListPassed())
{
- const std::string& regionFullPath = msg->getRegionName();
+ const std::string& regionFullPath = msg.getRegionName();
auto region = m_cacheImpl->getRegion(regionFullPath);
if (region != nullptr) {
@@ -650,7 +644,7 @@ void TcrEndpoint::receiveNotification(std::atomic<bool>& isRunning) {
regionFullPath.c_str());
}
} else {
- LOGDEBUG("receive cq notification %d", msg->getMessageType());
+ LOGDEBUG("receive cq notification %d", msg.getMessageType());
auto queryService = getQueryService();
if (queryService != nullptr) {
static_cast<RemoteQueryService*>(queryService.get())
@@ -685,13 +679,11 @@ void TcrEndpoint::receiveNotification(std::atomic<bool>& isRunning) {
}
break;
} catch (const Exception& ex) {
- _GEODE_SAFE_DELETE(msg);
LOGERROR(
"Exception while receiving subscription event for endpoint %s:: %s: "
"%s",
m_name.c_str(), ex.getName().c_str(), ex.what());
} catch (...) {
- _GEODE_SAFE_DELETE(msg);
LOGERROR(
"Unexpected exception while "
"receiving subscription event from endpoint %s",
diff --git a/cppcache/src/TcrMessage.cpp b/cppcache/src/TcrMessage.cpp
index bdaf340..6b30c4a 100644
--- a/cppcache/src/TcrMessage.cpp
+++ b/cppcache/src/TcrMessage.cpp
@@ -255,7 +255,7 @@ TcrChunkedResult* TcrMessage::getChunkedResultHandler() {
return m_chunkedResult;
}
-DataInput* TcrMessage::getDelta() { return m_delta.get(); }
+DataInput* TcrMessage::getDelta() const { return m_delta.get(); }
// getDeltaBytes( ) is called *only* by CqService, returns a CacheableBytes
// that
@@ -270,7 +270,7 @@ std::shared_ptr<CacheableBytes> TcrMessage::getDeltaBytes() {
return retVal;
}
-bool TcrMessage::hasDelta() { return (m_delta != nullptr); }
+bool TcrMessage::hasDelta() const { return (m_delta != nullptr); }
void TcrMessage::setMetaRegion(bool isMetaRegion) {
m_isMetaRegion = isMetaRegion;
@@ -307,7 +307,9 @@ void TcrMessage::setCallBackArguement(bool aCallBackArguement) {
void TcrMessage::setVersionTag(std::shared_ptr<VersionTag> versionTag) {
m_versionTag = versionTag;
}
-std::shared_ptr<VersionTag> TcrMessage::getVersionTag() { return m_versionTag; }
+std::shared_ptr<VersionTag> TcrMessage::getVersionTag() const {
+ return m_versionTag;
+}
uint8_t TcrMessage::hasResult() const { return m_hasResult; }
@@ -319,11 +321,6 @@ std::shared_ptr<CacheableHashSet> TcrMessage::getTombstoneKeys() const {
return m_tombstoneKeys;
}
-TcrMessage* TcrMessage::getAllEPDisMess() {
- static auto allEPDisconnected = new TcrMessageReply(true, nullptr);
- return allEPDisconnected;
-}
-
void TcrMessage::writeInterestResultPolicyPart(InterestResultPolicy policy) {
m_request->writeInt(static_cast<int32_t>(3)); // size
m_request->write(static_cast<int8_t>(1)); // isObject
diff --git a/cppcache/src/TcrMessage.hpp b/cppcache/src/TcrMessage.hpp
index d151048..1107b24 100644
--- a/cppcache/src/TcrMessage.hpp
+++ b/cppcache/src/TcrMessage.hpp
@@ -259,7 +259,6 @@ class TcrMessage {
std::chrono::milliseconds getTimeout() const;
void setTimeout(std::chrono::milliseconds timeout);
- static TcrMessage* getAllEPDisMess();
bool isDurable() const;
bool receiveValues() const;
bool hasCqPart() const;
@@ -277,13 +276,13 @@ class TcrMessage {
void setChunkedResultHandler(TcrChunkedResult* chunkedResult);
TcrChunkedResult* getChunkedResultHandler();
- DataInput* getDelta();
+ DataInput* getDelta() const;
// getDeltaBytes( ) is called *only* by CqService, returns a CacheableBytes
// that
// takes ownership of delta bytes.
std::shared_ptr<CacheableBytes> getDeltaBytes();
- bool hasDelta();
+ bool hasDelta() const;
void addSecurityPart(int64_t connectionId, int64_t unique_id,
TcrConnection* conn);
@@ -321,7 +320,7 @@ class TcrMessage {
void setCallBackArguement(bool aCallBackArguement);
void setVersionTag(std::shared_ptr<VersionTag> versionTag);
- std::shared_ptr<VersionTag> getVersionTag();
+ std::shared_ptr<VersionTag> getVersionTag() const;
uint8_t hasResult() const;
std::shared_ptr<CacheableHashMap> getTombstoneVersions() const;
std::shared_ptr<CacheableHashSet> getTombstoneKeys() const;
@@ -910,6 +909,11 @@ class TcrMessageReply : public TcrMessage {
~TcrMessageReply() override = default;
};
+class TcrMessageAllEndpointsDisconnectedMarker : public TcrMessage {
+ public:
+ TcrMessageAllEndpointsDisconnectedMarker() = default;
+};
+
/**
* Helper class to invoke some internal methods of TcrMessage. Add any
* methods that response processor methods require to access here.
diff --git a/cppcache/src/ThinClientHARegion.cpp b/cppcache/src/ThinClientHARegion.cpp
index 1910919..9c66b19 100644
--- a/cppcache/src/ThinClientHARegion.cpp
+++ b/cppcache/src/ThinClientHARegion.cpp
@@ -117,15 +117,14 @@ void ThinClientHARegion::destroyDM(bool) {
poolDM->decRegionCount();
}
-void ThinClientHARegion::addDisMessToQueue() {
+void ThinClientHARegion::addDisconnectedMessageToQueue() {
auto poolDM = std::dynamic_pointer_cast<ThinClientPoolHADM>(m_tcrdm);
- poolDM->addDisMessToQueue(this);
+ poolDM->addDisconnectedMessageToQueue(this);
- if (poolDM->m_redundancyManager->m_globalProcessedMarker &&
+ if (poolDM->redundancyManager_->m_globalProcessedMarker &&
!m_processedMarker) {
- TcrMessage* regionMsg = new TcrMessageClientMarker(
- new DataOutput(m_cacheImpl->createDataOutput()), true);
- receiveNotification(regionMsg);
+ receiveNotification(TcrMessageClientMarker(
+ new DataOutput(m_cacheImpl->createDataOutput()), true));
}
}
diff --git a/cppcache/src/ThinClientHARegion.hpp b/cppcache/src/ThinClientHARegion.hpp
index b66ed75..c7c7f8d 100644
--- a/cppcache/src/ThinClientHARegion.hpp
+++ b/cppcache/src/ThinClientHARegion.hpp
@@ -57,7 +57,7 @@ class ThinClientHARegion : public ThinClientRegion {
void setProcessedMarker(bool mark = true) override {
m_processedMarker = mark;
}
- void addDisMessToQueue() override;
+ void addDisconnectedMessageToQueue() override;
protected:
GfErrType getNoThrow_FullObject(
diff --git a/cppcache/src/ThinClientPoolHADM.cpp b/cppcache/src/ThinClientPoolHADM.cpp
index 3d43c85..1113b2a 100644
--- a/cppcache/src/ThinClientPoolHADM.cpp
+++ b/cppcache/src/ThinClientPoolHADM.cpp
@@ -33,11 +33,11 @@ ThinClientPoolHADM::ThinClientPoolHADM(const char* name,
std::shared_ptr<PoolAttributes> poolAttr,
TcrConnectionManager& connManager)
: ThinClientPoolDM(name, poolAttr, connManager),
- m_theTcrConnManager(connManager),
+ theTcrConnManager_(connManager),
redundancy_semaphore_(0),
- m_redundancyTask(nullptr),
+ redundancyTask_(nullptr),
server_monitor_task_id_(ExpiryTask::invalid()) {
- m_redundancyManager = std::unique_ptr<ThinClientRedundancyManager>(
+ redundancyManager_ = std::unique_ptr<ThinClientRedundancyManager>(
new ThinClientRedundancyManager(
&connManager, poolAttr->getSubscriptionRedundancy(), this));
}
@@ -54,10 +54,10 @@ void ThinClientPoolHADM::startBackgroundThreads() {
->getDistributedSystem()
.getSystemProperties();
- m_redundancyManager->initialize(m_attrs->getSubscriptionRedundancy());
+ redundancyManager_->initialize(m_attrs->getSubscriptionRedundancy());
// Call maintain redundancy level, so primary is available for notification
// operations.
- GfErrType err = m_redundancyManager->maintainRedundancyLevel(true);
+ GfErrType err = redundancyManager_->maintainRedundancyLevel(true);
const auto interval = props.redundancyMonitorInterval();
auto& manager = m_connManager.getCacheImpl()->getExpiryTaskManager();
@@ -82,11 +82,11 @@ void ThinClientPoolHADM::startBackgroundThreads() {
}
}
- m_redundancyManager->startPeriodicAck();
- m_redundancyTask =
+ redundancyManager_->startPeriodicAck();
+ redundancyTask_ =
std::unique_ptr<Task<ThinClientPoolHADM>>(new Task<ThinClientPoolHADM>(
this, &ThinClientPoolHADM::redundancy, NC_Redundancy));
- m_redundancyTask->start();
+ redundancyTask_->start();
}
GfErrType ThinClientPoolHADM::sendSyncRequest(TcrMessage& request,
@@ -121,13 +121,13 @@ GfErrType ThinClientPoolHADM::sendSyncRequestRegisterInterestEP(
GfErrType ThinClientPoolHADM::sendSyncRequestRegisterInterest(
TcrMessage& request, TcrMessageReply& reply, bool attemptFailover,
ThinClientRegion* region, TcrEndpoint* endpoint) {
- return m_redundancyManager->sendSyncRequestRegisterInterest(
+ return redundancyManager_->sendSyncRequestRegisterInterest(
request, reply, attemptFailover, endpoint, this, region);
}
GfErrType ThinClientPoolHADM::sendSyncRequestCq(TcrMessage& request,
TcrMessageReply& reply) {
- return m_redundancyManager->sendSyncRequestCq(request, reply, this);
+ return redundancyManager_->sendSyncRequestCq(request, reply, this);
}
bool ThinClientPoolHADM::preFailoverAction() { return true; }
@@ -143,7 +143,7 @@ void ThinClientPoolHADM::redundancy(std::atomic<bool>& isRunning) {
redundancy_semaphore_.acquire();
while (isRunning) {
if (!m_connManager.isNetDown()) {
- m_redundancyManager->maintainRedundancyLevel();
+ redundancyManager_->maintainRedundancyLevel();
}
redundancy_semaphore_.acquire();
@@ -165,7 +165,7 @@ void ThinClientPoolHADM::destroy(bool keepAlive) {
sendNotificationCloseMsgs();
- m_redundancyManager->close();
+ redundancyManager_->close();
m_destroyPendingHADM = true;
ThinClientPoolDM::destroy(keepAlive);
@@ -173,14 +173,14 @@ void ThinClientPoolHADM::destroy(bool keepAlive) {
}
void ThinClientPoolHADM::sendNotificationCloseMsgs() {
- if (m_redundancyTask) {
+ if (redundancyTask_) {
auto& manager = m_connManager.getCacheImpl()->getExpiryTaskManager();
manager.cancel(server_monitor_task_id_);
- m_redundancyTask->stopNoblock();
+ redundancyTask_->stopNoblock();
redundancy_semaphore_.release();
- m_redundancyTask->wait();
- m_redundancyTask = nullptr;
- m_redundancyManager->sendNotificationCloseMsgs();
+ redundancyTask_->wait();
+ redundancyTask_ = nullptr;
+ redundancyManager_->sendNotificationCloseMsgs();
}
}
@@ -188,8 +188,8 @@ GfErrType ThinClientPoolHADM::registerInterestAllRegions(
TcrEndpoint* ep, const TcrMessage* request, TcrMessageReply* reply) {
GfErrType err = GF_NOERR;
- std::lock_guard<decltype(m_regionsLock)> guard(m_regionsLock);
- for (const auto& region : m_regions) {
+ std::lock_guard<decltype(regionsLock_)> guard(regionsLock_);
+ for (const auto& region : regions_) {
auto opErr = region->registerKeys(ep, request, reply);
if (err == GF_NOERR) {
err = opErr;
@@ -200,51 +200,52 @@ GfErrType ThinClientPoolHADM::registerInterestAllRegions(
}
bool ThinClientPoolHADM::checkDupAndAdd(std::shared_ptr<EventId> eventid) {
- return m_redundancyManager->checkDupAndAdd(eventid);
+ return redundancyManager_->checkDupAndAdd(eventid);
}
void ThinClientPoolHADM::processMarker() {
// also set the static bool m_processedMarker for makePrimary messages
- m_redundancyManager->m_globalProcessedMarker = true;
+ redundancyManager_->m_globalProcessedMarker = true;
}
void ThinClientPoolHADM::acquireRedundancyLock() {
- m_redundancyManager->acquireRedundancyLock();
+ redundancyManager_->acquireRedundancyLock();
}
void ThinClientPoolHADM::releaseRedundancyLock() {
- m_redundancyManager->releaseRedundancyLock();
+ redundancyManager_->releaseRedundancyLock();
}
std::recursive_mutex& ThinClientPoolHADM::getRedundancyLock() {
- return m_redundancyManager->getRedundancyLock();
+ return redundancyManager_->getRedundancyLock();
}
GfErrType ThinClientPoolHADM::sendRequestToPrimary(TcrMessage& request,
TcrMessageReply& reply) {
- return m_redundancyManager->sendRequestToPrimary(request, reply);
+ return redundancyManager_->sendRequestToPrimary(request, reply);
}
bool ThinClientPoolHADM::isReadyForEvent() const {
- return m_redundancyManager->isSentReadyForEvents();
+ return redundancyManager_->isSentReadyForEvents();
}
void ThinClientPoolHADM::addRegion(ThinClientRegion* theTCR) {
- std::lock_guard<decltype(m_regionsLock)> guard(m_regionsLock);
- m_regions.push_back(theTCR);
+ std::lock_guard<decltype(regionsLock_)> guard(regionsLock_);
+ regions_.push_back(theTCR);
}
-void ThinClientPoolHADM::addDisMessToQueue(ThinClientRegion* theTCR) {
- std::lock_guard<decltype(m_regionsLock)> guard(m_regionsLock);
- if (m_redundancyManager->allEndPointDiscon()) {
- theTCR->receiveNotification(TcrMessage::getAllEPDisMess());
+void ThinClientPoolHADM::addDisconnectedMessageToQueue(
+ ThinClientRegion* theTCR) {
+ std::lock_guard<decltype(regionsLock_)> guard(regionsLock_);
+ if (redundancyManager_->allEndPointDiscon()) {
+ theTCR->receiveNotification(TcrMessageAllEndpointsDisconnectedMarker());
}
}
void ThinClientPoolHADM::removeRegion(ThinClientRegion* theTCR) {
- std::lock_guard<decltype(m_regionsLock)> guard(m_regionsLock);
- for (std::list<ThinClientRegion*>::iterator itr = m_regions.begin();
- itr != m_regions.end(); itr++) {
+ std::lock_guard<decltype(regionsLock_)> guard(regionsLock_);
+ for (std::list<ThinClientRegion*>::iterator itr = regions_.begin();
+ itr != regions_.end(); itr++) {
if (*itr == theTCR) {
- m_regions.erase(itr);
+ regions_.erase(itr);
return;
}
}
@@ -260,7 +261,7 @@ void ThinClientPoolHADM::readyForEvents() {
auto&& durable = sysProp.durableClientId();
if (!durable.empty()) {
- m_redundancyManager->readyForEvents();
+ redundancyManager_->readyForEvents();
}
}
@@ -274,24 +275,23 @@ void ThinClientPoolHADM::netDown() {
}
}
- m_redundancyManager->netDown();
+ redundancyManager_->netDown();
}
void ThinClientPoolHADM::pingServerLocal() {
- auto& mutex = m_redundancyManager->getRedundancyLock();
+ auto& mutex = redundancyManager_->getRedundancyLock();
std::lock_guard<decltype(mutex)> guard(mutex);
ThinClientPoolDM::pingServerLocal();
}
void ThinClientPoolHADM::removeCallbackConnection(TcrEndpoint* ep) {
- m_redundancyManager->removeCallbackConnection(ep);
+ redundancyManager_->removeCallbackConnection(ep);
}
-void ThinClientPoolHADM::sendNotConMesToAllregions() {
- std::lock_guard<decltype(m_regionsLock)> guard(m_regionsLock);
- for (std::list<ThinClientRegion*>::iterator it = m_regions.begin();
- it != m_regions.end(); it++) {
- (*it)->receiveNotification(TcrMessage::getAllEPDisMess());
+void ThinClientPoolHADM::sendNotConnectedMessageToAllregions() {
+ std::lock_guard<decltype(regionsLock_)> guard(regionsLock_);
+ for (auto region : regions_) {
+ region->receiveNotification(TcrMessageAllEndpointsDisconnectedMarker());
}
}
diff --git a/cppcache/src/ThinClientPoolHADM.hpp b/cppcache/src/ThinClientPoolHADM.hpp
index f1094e1..3bfb055 100644
--- a/cppcache/src/ThinClientPoolHADM.hpp
+++ b/cppcache/src/ThinClientPoolHADM.hpp
@@ -101,11 +101,11 @@ class ThinClientPoolHADM : public ThinClientPoolDM {
void startBackgroundThreads() override;
private:
- std::unique_ptr<ThinClientRedundancyManager> m_redundancyManager;
+ std::unique_ptr<ThinClientRedundancyManager> redundancyManager_;
- TcrConnectionManager& m_theTcrConnManager;
+ TcrConnectionManager& theTcrConnManager_;
binary_semaphore redundancy_semaphore_;
- std::unique_ptr<Task<ThinClientPoolHADM>> m_redundancyTask;
+ std::unique_ptr<Task<ThinClientPoolHADM>> redundancyTask_;
void redundancy(std::atomic<bool>& isRunning);
@@ -115,12 +115,12 @@ class ThinClientPoolHADM : public ThinClientPoolDM {
void removeCallbackConnection(TcrEndpoint*) override;
- std::list<ThinClientRegion*> m_regions;
- std::recursive_mutex m_regionsLock;
+ std::list<ThinClientRegion*> regions_;
+ std::recursive_mutex regionsLock_;
void addRegion(ThinClientRegion* theTCR);
void removeRegion(ThinClientRegion* theTCR);
- void sendNotConMesToAllregions();
- void addDisMessToQueue(ThinClientRegion* theTCR);
+ void sendNotConnectedMessageToAllregions();
+ void addDisconnectedMessageToQueue(ThinClientRegion* theTCR);
friend class ThinClientHARegion;
friend class TcrConnectionManager;
diff --git a/cppcache/src/ThinClientRedundancyManager.cpp b/cppcache/src/ThinClientRedundancyManager.cpp
index 8a2db40..7374b2a 100644
--- a/cppcache/src/ThinClientRedundancyManager.cpp
+++ b/cppcache/src/ThinClientRedundancyManager.cpp
@@ -46,7 +46,7 @@ ThinClientRedundancyManager::ThinClientRedundancyManager(
ThinClientPoolHADM* poolHADM, bool sentReadyForEvents,
bool globalProcessedMarker)
: m_globalProcessedMarker(globalProcessedMarker),
- m_IsAllEpDisCon(false),
+ m_allEndpointsDisconnected(false),
m_server(0),
m_sentReadyForEvents(sentReadyForEvents),
m_redundancyLevel(redundancyLevel),
@@ -429,14 +429,14 @@ GfErrType ThinClientRedundancyManager::maintainRedundancyLevel(
}
if (isRedundancySatisfied) {
- m_IsAllEpDisCon = false;
+ m_allEndpointsDisconnected = false;
m_loggedRedundancyWarning = false;
return GF_NOERR;
} else if (isPrimaryConnected) {
if (fatal && err != GF_NOERR) {
return fatalError;
}
- m_IsAllEpDisCon = false;
+ m_allEndpointsDisconnected = false;
if (m_redundancyLevel == -1) {
LOGINFO("Current subscription redundancy level is %zu",
m_redundantEndpoints.size() - 1);
@@ -454,9 +454,9 @@ GfErrType ThinClientRedundancyManager::maintainRedundancyLevel(
// save any fatal errors that occur during maintain redundancy so
// that we can send it back to the caller, to avoid missing out due
// to nonfatal errors such as server not available
- if (m_poolHADM && !m_IsAllEpDisCon) {
- m_poolHADM->sendNotConMesToAllregions();
- m_IsAllEpDisCon = true;
+ if (m_poolHADM && !m_allEndpointsDisconnected) {
+ m_poolHADM->sendNotConnectedMessageToAllregions();
+ m_allEndpointsDisconnected = true;
}
if (fatal && err != GF_NOERR) {
return fatalError;
diff --git a/cppcache/src/ThinClientRedundancyManager.hpp b/cppcache/src/ThinClientRedundancyManager.hpp
index 3b6e4ce..ae339f1 100644
--- a/cppcache/src/ThinClientRedundancyManager.hpp
+++ b/cppcache/src/ThinClientRedundancyManager.hpp
@@ -79,7 +79,7 @@ class ThinClientRedundancyManager {
void netDown();
void acquireRedundancyLock() { m_redundantEndpointsLock.lock(); }
void releaseRedundancyLock() { m_redundantEndpointsLock.unlock(); }
- bool allEndPointDiscon() { return m_IsAllEpDisCon; }
+ bool allEndPointDiscon() { return m_allEndpointsDisconnected; }
void removeCallbackConnection(TcrEndpoint*);
std::recursive_mutex& getRedundancyLock() { return m_redundantEndpointsLock; }
@@ -92,7 +92,7 @@ class ThinClientRedundancyManager {
using time_point = clock::time_point;
// for selectServers
- volatile bool m_IsAllEpDisCon;
+ volatile bool m_allEndpointsDisconnected;
int m_server;
bool m_sentReadyForEvents;
int m_redundancyLevel;
diff --git a/cppcache/src/ThinClientRegion.cpp b/cppcache/src/ThinClientRegion.cpp
index aba0f97..f190198 100644
--- a/cppcache/src/ThinClientRegion.cpp
+++ b/cppcache/src/ThinClientRegion.cpp
@@ -2620,7 +2620,7 @@ ThinClientRegion::getInterestListRegex() const {
return vlist;
}
-GfErrType ThinClientRegion::clientNotificationHandler(TcrMessage& msg) {
+GfErrType ThinClientRegion::clientNotificationHandler(const TcrMessage& msg) {
GfErrType err = GF_NOERR;
std::shared_ptr<Cacheable> oldValue;
switch (msg.getMessageType()) {
@@ -2672,11 +2672,16 @@ GfErrType ThinClientRegion::clientNotificationHandler(TcrMessage& msg) {
LocalRegion::tombstoneOperationNoThrow(msg.getTombstoneVersions(),
msg.getTombstoneKeys());
break;
- default: {
- if (TcrMessage::getAllEPDisMess() == &msg) {
+ default:
+ try {
+ auto& marker =
+ dynamic_cast<const TcrMessageAllEndpointsDisconnectedMarker&>(msg);
setProcessedMarker(false);
+ LOGDEBUG(
+ "ThinClientRegion::clientNotificationHandler: rec'd endpoints "
+ "disconnected message");
LocalRegion::invokeAfterAllEndPointDisconnected();
- } else {
+ } catch (std::bad_cast&) {
LOGERROR(
"Unknown message type %d in subscription event handler; possible "
"serialization mismatch",
@@ -2684,13 +2689,12 @@ GfErrType ThinClientRegion::clientNotificationHandler(TcrMessage& msg) {
err = GF_MSG;
}
break;
- }
}
// Update EventIdMap to mark event processed, Only for durable client.
// In case of closing, don't send it as listener might not be invoked.
if (!m_destroyPending && (m_isDurableClnt || msg.hasDelta()) &&
- TcrMessage::getAllEPDisMess() != &msg) {
+ msg.getMessageType() != TcrMessage::INVALID) {
m_tcrdm->checkDupAndAdd(msg.getEventId());
}
@@ -2723,12 +2727,12 @@ GfErrType ThinClientRegion::handleServerException(
"TransactionDataNodeHasDepartedException") !=
std::string::npos) {
error = GF_TRANSACTION_DATA_NODE_HAS_DEPARTED_EXCEPTION;
- } else if (exceptionMsg.find(
- "org.apache.geode.cache.TransactionDataRebalancedException") !=
+ } else if (exceptionMsg.find("org.apache.geode.cache."
+ "TransactionDataRebalancedException") !=
std::string::npos) {
error = GF_TRANSACTION_DATA_REBALANCED_EXCEPTION;
- } else if (exceptionMsg.find(
- "org.apache.geode.security.AuthenticationRequiredException") !=
+ } else if (exceptionMsg.find("org.apache.geode.security."
+ "AuthenticationRequiredException") !=
std::string::npos) {
error = GF_AUTHENTICATION_REQUIRED_EXCEPTION;
} else if (exceptionMsg.find("org.apache.geode.cache.LowMemoryException") !=
@@ -2752,27 +2756,23 @@ GfErrType ThinClientRegion::handleServerException(
return error;
}
-void ThinClientRegion::receiveNotification(TcrMessage* msg) {
+void ThinClientRegion::receiveNotification(const TcrMessage& msg) {
std::unique_lock<std::mutex> lock(m_notificationMutex, std::defer_lock);
{
TryReadGuard guard(m_rwLock, m_destroyPending);
if (m_destroyPending) {
- if (msg != TcrMessage::getAllEPDisMess()) {
- _GEODE_SAFE_DELETE(msg);
- }
return;
}
lock.lock();
}
- if (msg->getMessageType() == TcrMessage::CLIENT_MARKER) {
+ if (msg.getMessageType() == TcrMessage::CLIENT_MARKER) {
handleMarker();
} else {
- clientNotificationHandler(*msg);
+ clientNotificationHandler(msg);
}
lock.unlock();
- if (TcrMessage::getAllEPDisMess() != msg) _GEODE_SAFE_DELETE(msg);
}
void ThinClientRegion::localInvalidateRegion_internal() {
@@ -2967,7 +2967,8 @@ void ThinClientRegion::executeFunction(
} else if (err == GF_NOTCON) {
attempt++;
LOGDEBUG(
- "ThinClientRegion::executeFunction with GF_NOTCON retry attempt = "
+ "ThinClientRegion::executeFunction with GF_NOTCON retry attempt "
+ "= "
"%d ",
attempt);
if (attempt > retryAttempts) {
@@ -3052,7 +3053,8 @@ std::shared_ptr<CacheableVector> ThinClientRegion::reExecuteFunction(
failedNodes->clear();
if (failedNodesIds) {
LOGDEBUG(
- "ThinClientRegion::reExecuteFunction with GF_FUNCTION_EXCEPTION "
+ "ThinClientRegion::reExecuteFunction with "
+ "GF_FUNCTION_EXCEPTION "
"failedNodesIds size = %zu ",
failedNodesIds->size());
failedNodes->insert(failedNodesIds->begin(), failedNodesIds->end());
@@ -3141,7 +3143,8 @@ bool ThinClientRegion::executeFunctionSH(
currentReply->getFailedNode());
if (failedNodeIds) {
LOGDEBUG(
- "ThinClientRegion::executeFunctionSH with GF_FUNCTION_EXCEPTION "
+ "ThinClientRegion::executeFunctionSH with "
+ "GF_FUNCTION_EXCEPTION "
"failedNodeIds size = %zu ",
failedNodeIds->size());
failedNodes->insert(failedNodeIds->begin(), failedNodeIds->end());
@@ -3365,12 +3368,14 @@ void ChunkedQueryResponse::readObjectPartList(DataInput& input,
if (arrayType != DSFid::CacheableObjectPartList) {
LOGERROR(
"Query response got unhandled message format %d while "
- "expecting struct set object part list; possible serialization "
+ "expecting struct set object part list; possible "
+ "serialization "
"mismatch",
arrayType);
throw MessageException(
"Query response got unhandled message format while expecting "
- "struct set object part list; possible serialization mismatch");
+ "struct set object part list; possible serialization "
+ "mismatch");
}
readObjectPartList(input, true);
} else {
@@ -3491,7 +3496,8 @@ void ChunkedQueryResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen,
"object part list; possible serialization mismatch",
arrayType);
throw MessageException(
- "Query response got unhandled message format while expecting object "
+ "Query response got unhandled message format while expecting "
+ "object "
"part list; possible serialization mismatch");
}
readObjectPartList(input, isResultSet);
@@ -3556,9 +3562,9 @@ void ChunkedFunctionExecutionResponse::handleChunk(
return;
}
- auto startLen = static_cast<size_t>(
- input.getBytesRead() -
- 1); // from here need to look value part + memberid AND -1 for array type
+ // from here need to look value part + memberid AND -1 for array type
+ auto startLen = static_cast<size_t>(input.getBytesRead() - 1);
+
// read and ignore array length
input.readArrayLength();
@@ -3571,7 +3577,8 @@ void ChunkedFunctionExecutionResponse::handleChunk(
const int SECURE_PART_LEN = 5 + 8;
bool readPart = true;
LOGDEBUG(
- "ChunkedFunctionExecutionResponse::handleChunk chunkLen = %d & partLen = "
+ "ChunkedFunctionExecutionResponse::handleChunk chunkLen = %d & partLen "
+ "= "
"%d ",
chunkLen, partLen);
if (partType == DSCode::JavaSerializable) {
@@ -3592,7 +3599,8 @@ void ChunkedFunctionExecutionResponse::handleChunk(
// skip first part i.e JavaSerializable.
TcrMessageHelper::skipParts(m_msg, input, 1);
- // read the second part which is string in usual manner, first its length.
+ // read the second part which is string in usual manner, first its
+ // length.
partLen = input.readInt32();
// then isObject byte
@@ -3601,8 +3609,8 @@ void ChunkedFunctionExecutionResponse::handleChunk(
startLen = input.getBytesRead(); // reset from here need to look value
// part + memberid AND -1 for array type
- // Since it is contained as a part of other results, read arrayType which
- // is arrayList = 65.
+ // Since it is contained as a part of other results, read arrayType
+ // which is arrayList = 65.
input.read();
// read and ignore its len which is 2
diff --git a/cppcache/src/ThinClientRegion.hpp b/cppcache/src/ThinClientRegion.hpp
index 94fbac0..59dd51b 100644
--- a/cppcache/src/ThinClientRegion.hpp
+++ b/cppcache/src/ThinClientRegion.hpp
@@ -133,7 +133,7 @@ class ThinClientRegion : public LocalRegion {
std::vector<std::shared_ptr<CacheableString>> getInterestListRegex()
const override;
- void receiveNotification(TcrMessage* msg);
+ void receiveNotification(const TcrMessage& msg);
static GfErrType handleServerException(const std::string& func,
const std::string& exceptionMsg);
@@ -257,7 +257,7 @@ class ThinClientRegion : public LocalRegion {
bool attemptFailover = true);
GfErrType unregisterRegexNoThrowLocalDestroy(const std::string& regex,
bool attemptFailover = true);
- GfErrType clientNotificationHandler(TcrMessage& msg);
+ GfErrType clientNotificationHandler(const TcrMessage& msg);
virtual void localInvalidateRegion_internal();