You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2021/03/24 18:07:21 UTC
[geode-native] branch develop updated: GEODE-9056: Replace
ACE_Semaphore (#772)
This is an automated email from the ASF dual-hosted git repository.
bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git
The following commit(s) were added to refs/heads/develop by this push:
new 332c578 GEODE-9056: Replace ACE_Semaphore (#772)
332c578 is described below
commit 332c578c720c96ecf898eb9fa6e2d420a921f0de
Author: Mario Salazar de Torres <ma...@est.tech>
AuthorDate: Wed Mar 24 19:04:32 2021 +0100
GEODE-9056: Replace ACE_Semaphore (#772)
- In order to progress in the task of getting rid of ACE library, a
replacement is needed for ACE_Semaphore.
- Currently there is no suitable Boost alternative and C++
implementation is not in the standard until C++20. Hence STL like
implementation was added as binary_semaphore.
- Replaced all instances of ACE_Semaphore by binary_semaphore.
- Fixed ill semaphore logic in several cases in which running token was
checked twice without any specific need.
---
cppcache/src/CqService.cpp | 12 ++--
cppcache/src/CqService.hpp | 3 +-
cppcache/src/TcrChunkedContext.hpp | 17 +++---
cppcache/src/TcrConnection.cpp | 4 +-
cppcache/src/TcrConnection.hpp | 3 +-
cppcache/src/TcrConnectionManager.cpp | 68 +++++++++++------------
cppcache/src/TcrConnectionManager.hpp | 12 ++--
cppcache/src/TcrEndpoint.cpp | 24 ++++----
cppcache/src/TcrEndpoint.hpp | 12 ++--
cppcache/src/TcrMessage.cpp | 2 +-
cppcache/src/TcrMessage.hpp | 2 +-
cppcache/src/TcrPoolEndPoint.cpp | 6 +-
cppcache/src/TcrPoolEndPoint.hpp | 4 +-
cppcache/src/ThinClientBaseDM.cpp | 12 +++-
cppcache/src/ThinClientPoolDM.cpp | 52 ++++++++---------
cppcache/src/ThinClientPoolDM.hpp | 9 +--
cppcache/src/ThinClientPoolHADM.cpp | 20 ++++---
cppcache/src/ThinClientPoolHADM.hpp | 4 +-
cppcache/src/ThinClientRedundancyManager.cpp | 15 +++--
cppcache/src/ThinClientRedundancyManager.hpp | 3 +-
cppcache/src/util/concurrent/binary_semaphore.cpp | 40 +++++++++++++
cppcache/src/util/concurrent/binary_semaphore.hpp | 46 +++++++++++++++
22 files changed, 235 insertions(+), 135 deletions(-)
diff --git a/cppcache/src/CqService.cpp b/cppcache/src/CqService.cpp
index b105e7d..faad396 100644
--- a/cppcache/src/CqService.cpp
+++ b/cppcache/src/CqService.cpp
@@ -38,7 +38,7 @@ CqService::CqService(ThinClientBaseDM* tccdm,
StatisticsFactory* statisticsFactory)
: m_tccdm(tccdm),
m_statisticsFactory(statisticsFactory),
- m_notificationSema(1),
+ notification_semaphore_{1},
m_stats(std::make_shared<CqServiceVsdStats>(m_statisticsFactory)) {
assert(nullptr != m_tccdm);
@@ -78,9 +78,9 @@ void CqService::updateStats() {
bool CqService::checkAndAcquireLock() {
if (m_running) {
- m_notificationSema.acquire();
+ notification_semaphore_.acquire();
if (m_running == false) {
- m_notificationSema.release();
+ notification_semaphore_.release();
return false;
}
return true;
@@ -343,9 +343,9 @@ std::shared_ptr<CqServiceStatistics> CqService::getCqServiceStatistics() {
void CqService::closeCqService() {
if (m_running) {
m_running = false;
- m_notificationSema.acquire();
+ notification_semaphore_.acquire();
cleanup();
- m_notificationSema.release();
+ notification_semaphore_.release();
}
}
void CqService::closeAllCqs() {
@@ -385,7 +385,7 @@ void CqService::receiveNotification(TcrMessage* msg) {
invokeCqListeners(msg->getCqs(), msg->getMessageTypeForCq(), msg->getKey(),
msg->getValue(), msg->getDeltaBytes(), msg->getEventId());
_GEODE_SAFE_DELETE(msg);
- m_notificationSema.release();
+ notification_semaphore_.release();
}
/**
diff --git a/cppcache/src/CqService.hpp b/cppcache/src/CqService.hpp
index 76e34ca..7a6df07 100644
--- a/cppcache/src/CqService.hpp
+++ b/cppcache/src/CqService.hpp
@@ -34,6 +34,7 @@
#include "ErrType.hpp"
#include "Queue.hpp"
#include "TcrMessage.hpp"
+#include "util/concurrent/binary_semaphore.hpp"
#include "util/synchronized_map.hpp"
namespace apache {
@@ -52,7 +53,7 @@ class TcrEndpoint;
class CqService : public std::enable_shared_from_this<CqService> {
ThinClientBaseDM* m_tccdm;
statistics::StatisticsFactory* m_statisticsFactory;
- ACE_Semaphore m_notificationSema;
+ binary_semaphore notification_semaphore_;
bool m_running;
synchronized_map<std::unordered_map<std::string, std::shared_ptr<CqQuery>>,
diff --git a/cppcache/src/TcrChunkedContext.hpp b/cppcache/src/TcrChunkedContext.hpp
index 00e22d1..0b9eec8 100644
--- a/cppcache/src/TcrChunkedContext.hpp
+++ b/cppcache/src/TcrChunkedContext.hpp
@@ -27,6 +27,7 @@
#include "AppDomainContext.hpp"
#include "Utils.hpp"
+#include "util/concurrent/binary_semaphore.hpp"
namespace apache {
namespace geode {
@@ -38,7 +39,7 @@ namespace client {
*/
class TcrChunkedResult {
private:
- ACE_Semaphore* m_finalizeSema;
+ binary_semaphore* finalize_semaphore_;
std::shared_ptr<Exception> m_ex;
bool m_inSameThread;
@@ -52,13 +53,13 @@ class TcrChunkedResult {
public:
inline TcrChunkedResult()
- : m_finalizeSema(nullptr),
+ : finalize_semaphore_(nullptr),
m_ex(nullptr),
m_inSameThread(false),
m_dsmemId(0) {}
virtual ~TcrChunkedResult() noexcept {}
- void setFinalizeSemaphore(ACE_Semaphore* finalizeSema) {
- m_finalizeSema = finalizeSema;
+ void setFinalizeSemaphore(binary_semaphore* finalizeSema) {
+ finalize_semaphore_ = finalizeSema;
}
virtual void setEndpointMemId(uint16_t dsmemId) { m_dsmemId = dsmemId; }
uint16_t getEndpointMemId() { return m_dsmemId; }
@@ -83,8 +84,8 @@ class TcrChunkedResult {
m_inSameThread = true;
return;
}
- if (m_finalizeSema != nullptr) {
- m_finalizeSema->release();
+ if (finalize_semaphore_ != nullptr) {
+ finalize_semaphore_->release();
} else {
throw NullPointerException("TcrChunkedResult::finalize: null semaphore");
}
@@ -96,8 +97,8 @@ class TcrChunkedResult {
*/
virtual void waitFinalize() const {
if (m_inSameThread) return;
- if (m_finalizeSema != nullptr) {
- m_finalizeSema->acquire();
+ if (finalize_semaphore_ != nullptr) {
+ finalize_semaphore_->acquire();
} else {
throw NullPointerException(
"TcrChunkedResult::waitFinalize: null semaphore");
diff --git a/cppcache/src/TcrConnection.cpp b/cppcache/src/TcrConnection.cpp
index 932073e..8dd173e 100644
--- a/cppcache/src/TcrConnection.cpp
+++ b/cppcache/src/TcrConnection.cpp
@@ -110,7 +110,7 @@ TcrConnection::TcrConnection(const TcrConnectionManager& connectionManager)
m_hasServerQueue(NON_REDUNDANT_SERVER),
m_queueSize(0),
m_port(0),
- m_chunksProcessSema(0),
+ chunks_process_semaphore_(0),
m_isBeingUsed(false),
m_isUsed(0),
m_poolDM(nullptr) {}
@@ -755,7 +755,7 @@ void TcrConnection::readMessageChunked(TcrMessageReply& reply,
reply.setTransId(responseHeader.transactionId);
// Initialize the chunk processing
- reply.startProcessChunk(m_chunksProcessSema);
+ reply.startProcessChunk(chunks_process_semaphore_);
// indicate an end to chunk processing and wait for processing
// to end even if reading the chunks fails in middle
diff --git a/cppcache/src/TcrConnection.hpp b/cppcache/src/TcrConnection.hpp
index acb974e..4889cda 100644
--- a/cppcache/src/TcrConnection.hpp
+++ b/cppcache/src/TcrConnection.hpp
@@ -29,6 +29,7 @@
#include "Connector.hpp"
#include "TcrMessage.hpp"
+#include "util/concurrent/binary_semaphore.hpp"
#include "util/synchronized_set.hpp"
#define DEFAULT_TIMEOUT_RETRIES 12
@@ -355,7 +356,7 @@ class TcrConnection {
uint16_t m_port;
// semaphore to synchronize with the chunked response processing thread
- ACE_Semaphore m_chunksProcessSema;
+ binary_semaphore chunks_process_semaphore_;
std::chrono::steady_clock::time_point m_creationTime;
std::chrono::steady_clock::time_point m_lastAccessed;
diff --git a/cppcache/src/TcrConnectionManager.cpp b/cppcache/src/TcrConnectionManager.cpp
index 7e7adeb..7449e2c 100644
--- a/cppcache/src/TcrConnectionManager.cpp
+++ b/cppcache/src/TcrConnectionManager.cpp
@@ -48,15 +48,15 @@ const char *TcrConnectionManager::NC_CleanUp = "NC CleanUp";
TcrConnectionManager::TcrConnectionManager(CacheImpl *cache)
: m_cache(cache),
m_initGuard(false),
- m_failoverSema(0),
+ failover_semaphore_(0),
m_failoverTask(nullptr),
- m_cleanupSema(0),
+ cleanup_semaphore_(0),
m_cleanupTask(nullptr),
m_pingTaskId(-1),
m_servermonitorTaskId(-1),
// Create the queues with flag to not delete the objects
- m_notifyCleanupSemaList(false),
- m_redundancySema(0),
+ notify_cleanup_semaphore_list_(false),
+ redundancy_semaphore_(0),
m_redundancyTask(nullptr),
m_isDurable(false),
m_isNetDown(false) {
@@ -72,7 +72,7 @@ void TcrConnectionManager::init(bool isPool) {
}
auto &props = m_cache->getDistributedSystem().getSystemProperties();
m_isDurable = !props.durableClientId().empty();
- auto pingInterval = (props.pingInterval() / 2);
+ auto pingInterval = props.pingInterval();
if (!isPool) {
ACE_Event_Handler *connectionChecker =
new ExpiryHandler_T<TcrConnectionManager>(
@@ -120,7 +120,7 @@ void TcrConnectionManager::close() {
if (m_failoverTask != nullptr) {
m_failoverTask->stopNoblock();
- m_failoverSema.release();
+ failover_semaphore_.release();
m_failoverTask->wait();
m_failoverTask = nullptr;
}
@@ -135,7 +135,7 @@ void TcrConnectionManager::readyForEvents() {
TcrConnectionManager::~TcrConnectionManager() {
if (m_cleanupTask != nullptr) {
m_cleanupTask->stopNoblock();
- m_cleanupSema.release();
+ cleanup_semaphore_.release();
m_cleanupTask->wait();
// Clean notification lists if something remains in there; see bug #250
cleanNotificationLists();
@@ -207,9 +207,9 @@ TcrEndpoint *TcrConnectionManager::addRefToTcrEndpoint(std::string endpointName,
const auto &find = m_endpoints.find(endpointName);
if (find == m_endpoints.end()) {
// this endpoint does not exist
- ep = std::make_shared<TcrEndpoint>(endpointName, m_cache, m_failoverSema,
- m_cleanupSema, m_redundancySema, dm,
- false);
+ ep = std::make_shared<TcrEndpoint>(endpointName, m_cache,
+ failover_semaphore_, cleanup_semaphore_,
+ redundancy_semaphore_, dm, false);
m_endpoints.emplace(endpointName, ep);
} else {
ep = find->second;
@@ -276,22 +276,21 @@ int TcrConnectionManager::checkConnection(const ACE_Time_Value &,
int TcrConnectionManager::checkRedundancy(const ACE_Time_Value &,
const void *) {
- m_redundancySema.release();
+ redundancy_semaphore_.release();
return 0;
}
void TcrConnectionManager::failover(std::atomic<bool> &isRunning) {
LOGFINE("TcrConnectionManager: starting failover thread");
+
+ failover_semaphore_.acquire();
while (isRunning) {
- m_failoverSema.acquire();
- if (isRunning && !m_isNetDown) {
+ if (!m_isNetDown) {
try {
std::lock_guard<decltype(m_distMngrsLock)> guard(m_distMngrsLock);
for (const auto &it : m_distMngrs) {
it->failover();
}
- while (m_failoverSema.tryacquire() != -1) {
- }
} catch (const Exception &e) {
LOGERROR(e.what());
} catch (const std::exception &e) {
@@ -302,7 +301,10 @@ void TcrConnectionManager::failover(std::atomic<bool> &isRunning) {
"different endpoint");
}
}
+
+ failover_semaphore_.acquire();
}
+
LOGFINE("TcrConnectionManager: ending failover thread");
}
@@ -409,42 +411,36 @@ void TcrConnectionManager::revive() {
void TcrConnectionManager::redundancy(std::atomic<bool> &isRunning) {
LOGFINE("Starting subscription maintain redundancy thread.");
+ redundancy_semaphore_.acquire();
+
while (isRunning) {
- m_redundancySema.acquire();
- if (isRunning && !m_isNetDown) {
+ if (!m_isNetDown) {
m_redundancyManager->maintainRedundancyLevel();
- while (m_redundancySema.tryacquire() != -1) {
- }
}
+
+ redundancy_semaphore_.acquire();
}
LOGFINE("Ending subscription maintain redundancy thread.");
}
void TcrConnectionManager::addNotificationForDeletion(
Task<TcrEndpoint> *notifyReceiver, TcrConnection *notifyConnection,
- ACE_Semaphore ¬ifyCleanupSema) {
+ binary_semaphore ¬ifyCleanupSema) {
std::lock_guard<decltype(m_notificationLock)> guard(m_notificationLock);
m_connectionReleaseList.put(notifyConnection);
m_receiverReleaseList.put(notifyReceiver);
- m_notifyCleanupSemaList.put(¬ifyCleanupSema);
+ notify_cleanup_semaphore_list_.put(¬ifyCleanupSema);
}
void TcrConnectionManager::cleanup(std::atomic<bool> &isRunning) {
LOGFINE("TcrConnectionManager: starting cleanup thread");
- do {
- // If we block on acquire, the queue must be empty (precondition).
- if (m_receiverReleaseList.size() == 0) {
- LOGDEBUG(
- "TcrConnectionManager::cleanup(): waiting to acquire cleanup "
- "semaphore.");
- m_cleanupSema.acquire();
- }
- cleanNotificationLists();
- while (m_cleanupSema.tryacquire() != -1) {
- }
+ cleanup_semaphore_.acquire();
- } while (isRunning);
+ while (isRunning) {
+ cleanNotificationLists();
+ cleanup_semaphore_.acquire();
+ }
LOGFINE("TcrConnectionManager: ending cleanup thread");
// Postcondition - all notification channels should be cleaned up by the end
@@ -454,7 +450,7 @@ void TcrConnectionManager::cleanup(std::atomic<bool> &isRunning) {
void TcrConnectionManager::cleanNotificationLists() {
Task<TcrEndpoint> *notifyReceiver;
TcrConnection *notifyConnection;
- ACE_Semaphore *notifyCleanupSema;
+ binary_semaphore *semaphore;
while (true) {
{
@@ -462,12 +458,12 @@ void TcrConnectionManager::cleanNotificationLists() {
notifyReceiver = m_receiverReleaseList.get();
if (!notifyReceiver) break;
notifyConnection = m_connectionReleaseList.get();
- notifyCleanupSema = m_notifyCleanupSemaList.get();
+ semaphore = notify_cleanup_semaphore_list_.get();
}
notifyReceiver->wait();
//_GEODE_SAFE_DELETE(notifyReceiver);
_GEODE_SAFE_DELETE(notifyConnection);
- notifyCleanupSema->release();
+ semaphore->release();
}
}
diff --git a/cppcache/src/TcrConnectionManager.hpp b/cppcache/src/TcrConnectionManager.hpp
index 393d472..ae509d7 100644
--- a/cppcache/src/TcrConnectionManager.hpp
+++ b/cppcache/src/TcrConnectionManager.hpp
@@ -89,7 +89,7 @@ class TcrConnectionManager {
void addNotificationForDeletion(Task<TcrEndpoint>* notifyReceiver,
TcrConnection* notifyConnection,
- ACE_Semaphore& notifyCleanupSema);
+ binary_semaphore& notifyCleanupSema);
void processMarker();
@@ -107,7 +107,7 @@ class TcrConnectionManager {
TcrHADistributionManager* theHADM = nullptr,
ThinClientRegion* region = nullptr);
- inline void triggerRedundancyThread() { m_redundancySema.release(); }
+ inline void triggerRedundancyThread() { redundancy_semaphore_.release(); }
inline void acquireRedundancyLock() {
m_redundancyManager->acquireRedundancyLock();
@@ -142,7 +142,7 @@ class TcrConnectionManager {
std::list<ThinClientBaseDM*> m_distMngrs;
std::recursive_mutex m_distMngrsLock;
- ACE_Semaphore m_failoverSema;
+ binary_semaphore failover_semaphore_;
std::unique_ptr<Task<TcrConnectionManager>> m_failoverTask;
bool removeRefToEndpoint(TcrEndpoint* ep, bool keepEndpoint = false);
@@ -152,16 +152,16 @@ class TcrConnectionManager {
void initializeHAEndpoints(const char* endpointsStr);
void removeHAEndpoints();
- ACE_Semaphore m_cleanupSema;
+ binary_semaphore cleanup_semaphore_;
std::unique_ptr<Task<TcrConnectionManager>> m_cleanupTask;
ExpiryTaskManager::id_type m_pingTaskId;
ExpiryTaskManager::id_type m_servermonitorTaskId;
Queue<Task<TcrEndpoint>*> m_receiverReleaseList;
Queue<TcrConnection*> m_connectionReleaseList;
- Queue<ACE_Semaphore*> m_notifyCleanupSemaList;
+ Queue<binary_semaphore*> notify_cleanup_semaphore_list_;
- ACE_Semaphore m_redundancySema;
+ binary_semaphore redundancy_semaphore_;
std::unique_ptr<Task<TcrConnectionManager>> m_redundancyTask;
std::recursive_mutex m_notificationLock;
bool m_isDurable;
diff --git a/cppcache/src/TcrEndpoint.cpp b/cppcache/src/TcrEndpoint.cpp
index f4f8924..6b3f75d 100644
--- a/cppcache/src/TcrEndpoint.cpp
+++ b/cppcache/src/TcrEndpoint.cpp
@@ -39,9 +39,9 @@ namespace client {
const char* TcrEndpoint::NC_Notification = "NC Notification";
TcrEndpoint::TcrEndpoint(const std::string& name, CacheImpl* cacheImpl,
- ACE_Semaphore& failoverSema,
- ACE_Semaphore& cleanupSema,
- ACE_Semaphore& redundancySema, ThinClientBaseDM* DM,
+ binary_semaphore& failoverSema,
+ binary_semaphore& cleanupSema,
+ binary_semaphore& redundancySema, ThinClientBaseDM* DM,
bool isMultiUserMode)
: m_notifyConnection(nullptr),
m_notifyReceiver(nullptr),
@@ -53,12 +53,12 @@ TcrEndpoint::TcrEndpoint(const std::string& name, CacheImpl* cacheImpl,
m_needToConnectInLock(false),
m_isQueueHosted(false),
m_uniqueId(0),
- m_failoverSema(failoverSema),
- m_cleanupSema(cleanupSema),
- m_redundancySema(redundancySema),
+ failover_semaphore_(failoverSema),
+ cleanup_semaphore_(cleanupSema),
+ redundancy_semaphore_(redundancySema),
m_baseDM(DM),
m_name(name),
- m_notificationCleanupSema(0),
+ notification_cleanup_semaphore_(0),
m_numberOfTimesFailed(0),
m_numRegions(0),
m_pingTimeouts(0),
@@ -102,7 +102,7 @@ TcrEndpoint::~TcrEndpoint() {
while (m_notifyCount > 0) {
LOGDEBUG("TcrEndpoint::~TcrEndpoint(): reducing notify count at %d",
m_notifyCount);
- m_notificationCleanupSema.acquire();
+ notification_cleanup_semaphore_.acquire();
m_notifyCount--;
}
LOGFINE("Connection to %s deleted", m_name.c_str());
@@ -1162,8 +1162,8 @@ void TcrEndpoint::setConnectionStatus(bool status) {
}
void TcrEndpoint::triggerRedundancyThread() {
- m_failoverSema.release();
- m_redundancySema.release();
+ failover_semaphore_.release();
+ redundancy_semaphore_.release();
}
void TcrEndpoint::closeConnection(TcrConnection*& conn) {
@@ -1197,9 +1197,9 @@ void TcrEndpoint::closeNotification() {
m_notifyReceiver->stopNoblock();
TcrConnectionManager& tccm = m_cacheImpl->tcrConnectionManager();
tccm.addNotificationForDeletion(m_notifyReceiver.get(), m_notifyConnection,
- m_notificationCleanupSema);
+ notification_cleanup_semaphore_);
m_notifyCount++;
- m_cleanupSema.release();
+ cleanup_semaphore_.release();
m_isQueueHosted = false;
LOGFINEST(
"Added susbcription channel for deletion and "
diff --git a/cppcache/src/TcrEndpoint.hpp b/cppcache/src/TcrEndpoint.hpp
index 43b062a..bfbda3c 100644
--- a/cppcache/src/TcrEndpoint.hpp
+++ b/cppcache/src/TcrEndpoint.hpp
@@ -52,8 +52,8 @@ class TcrEndpoint : public std::enable_shared_from_this<TcrEndpoint> {
public:
TcrEndpoint(
const std::string& name, CacheImpl* cacheImpl,
- ACE_Semaphore& failoverSema, ACE_Semaphore& cleanupSema,
- ACE_Semaphore& redundancySema, ThinClientBaseDM* dm = nullptr,
+ binary_semaphore& failoverSema, binary_semaphore& cleanupSema,
+ binary_semaphore& redundancySema, ThinClientBaseDM* dm = nullptr,
bool isMultiUserMode = false); // TODO: need to look for endpoint case
virtual ~TcrEndpoint();
@@ -205,16 +205,16 @@ class TcrEndpoint : public std::enable_shared_from_this<TcrEndpoint> {
private:
int64_t m_uniqueId;
- ACE_Semaphore& m_failoverSema;
- ACE_Semaphore& m_cleanupSema;
- ACE_Semaphore& m_redundancySema;
+ binary_semaphore& failover_semaphore_;
+ binary_semaphore& cleanup_semaphore_;
+ binary_semaphore& redundancy_semaphore_;
ThinClientBaseDM* m_baseDM;
std::string m_name;
std::list<ThinClientBaseDM*> m_distMgrs;
std::recursive_mutex m_endpointAuthenticationLock;
std::recursive_mutex m_connectionLock;
std::recursive_mutex m_distMgrsLock;
- ACE_Semaphore m_notificationCleanupSema;
+ binary_semaphore notification_cleanup_semaphore_;
synchronized_set<std::unordered_set<uint16_t>> m_ports;
int32_t m_numberOfTimesFailed;
int m_numRegions;
diff --git a/cppcache/src/TcrMessage.cpp b/cppcache/src/TcrMessage.cpp
index fcf621a..e8002bd 100644
--- a/cppcache/src/TcrMessage.cpp
+++ b/cppcache/src/TcrMessage.cpp
@@ -906,7 +906,7 @@ void TcrMessage::writeMessageLength() {
// the beginning.
}
-void TcrMessage::startProcessChunk(ACE_Semaphore& finalizeSema) {
+void TcrMessage::startProcessChunk(binary_semaphore& finalizeSema) {
if (m_msgTypeRequest == TcrMessage::EXECUTECQ_MSG_TYPE ||
m_msgTypeRequest == TcrMessage::STOPCQ_MSG_TYPE ||
m_msgTypeRequest == TcrMessage::CLOSECQ_MSG_TYPE ||
diff --git a/cppcache/src/TcrMessage.hpp b/cppcache/src/TcrMessage.hpp
index 9230600..82d5d2c 100644
--- a/cppcache/src/TcrMessage.hpp
+++ b/cppcache/src/TcrMessage.hpp
@@ -182,7 +182,7 @@ class TcrMessage {
const SerializationRegistry& serializationRegistry,
MemberListForVersionStamp& memberListForVersionStamp);
- void startProcessChunk(ACE_Semaphore& finalizeSema);
+ void startProcessChunk(binary_semaphore& finalizeSema);
// nullptr chunk means that this is the last chunk
void processChunk(const std::vector<uint8_t>& chunk, int32_t chunkLen,
uint16_t endpointmemId,
diff --git a/cppcache/src/TcrPoolEndPoint.cpp b/cppcache/src/TcrPoolEndPoint.cpp
index 14154ad..c270929 100644
--- a/cppcache/src/TcrPoolEndPoint.cpp
+++ b/cppcache/src/TcrPoolEndPoint.cpp
@@ -26,9 +26,9 @@ namespace geode {
namespace client {
TcrPoolEndPoint::TcrPoolEndPoint(const std::string& name, CacheImpl* cache,
- ACE_Semaphore& failoverSema,
- ACE_Semaphore& cleanupSema,
- ACE_Semaphore& redundancySema,
+ binary_semaphore& failoverSema,
+ binary_semaphore& cleanupSema,
+ binary_semaphore& redundancySema,
ThinClientPoolDM* dm)
: TcrEndpoint(name, cache, failoverSema, cleanupSema, redundancySema, dm),
m_dm(dm) {}
diff --git a/cppcache/src/TcrPoolEndPoint.hpp b/cppcache/src/TcrPoolEndPoint.hpp
index 61b20e2..3fce7d8 100644
--- a/cppcache/src/TcrPoolEndPoint.hpp
+++ b/cppcache/src/TcrPoolEndPoint.hpp
@@ -32,8 +32,8 @@ class ThinClientPoolDM;
class TcrPoolEndPoint : public TcrEndpoint {
public:
TcrPoolEndPoint(const std::string& name, CacheImpl* cache,
- ACE_Semaphore& failoverSema, ACE_Semaphore& cleanupSema,
- ACE_Semaphore& redundancySema, ThinClientPoolDM* dm);
+ binary_semaphore& failoverSema, binary_semaphore& cleanupSema,
+ binary_semaphore& redundancySema, ThinClientPoolDM* dm);
ThinClientPoolDM* getPoolHADM() const override;
bool checkDupAndAdd(std::shared_ptr<EventId> eventid) override;
diff --git a/cppcache/src/ThinClientBaseDM.cpp b/cppcache/src/ThinClientBaseDM.cpp
index a47be6d..ebf9641 100644
--- a/cppcache/src/ThinClientBaseDM.cpp
+++ b/cppcache/src/ThinClientBaseDM.cpp
@@ -217,13 +217,23 @@ void ThinClientBaseDM::processChunks(std::atomic<bool>& isRunning) {
TcrChunkedContext* chunk;
LOGFINE("Starting chunk process thread for region %s",
(m_region ? m_region->getFullPath().c_str() : "(null)"));
+
+ std::chrono::milliseconds wait_for_chunk{100};
+ chunk = m_chunks.getFor(wait_for_chunk);
+
while (isRunning) {
- chunk = m_chunks.getFor(std::chrono::microseconds(100000));
if (chunk) {
chunk->handleChunk(false);
_GEODE_SAFE_DELETE(chunk);
}
+
+ chunk = m_chunks.getFor(wait_for_chunk);
}
+
+ if (chunk) {
+ _GEODE_SAFE_DELETE(chunk);
+ }
+
LOGFINE("Ending chunk process thread for region %s",
(m_region ? m_region->getFullPath().c_str() : "(null)"));
}
diff --git a/cppcache/src/ThinClientPoolDM.cpp b/cppcache/src/ThinClientPoolDM.cpp
index aaccde2..80b464d 100644
--- a/cppcache/src/ThinClientPoolDM.cpp
+++ b/cppcache/src/ThinClientPoolDM.cpp
@@ -140,9 +140,9 @@ ThinClientPoolDM::ThinClientPoolDM(const char* name,
m_poolName(name),
m_stats(nullptr),
m_sticky(false),
- m_updateLocatorListSema(0),
- m_pingSema(0),
- m_cliCallbackSema(0),
+ update_locators_semaphore_(0),
+ ping_semaphore_(0),
+ cli_callback_semaphore_(0),
m_isDestroyed(false),
m_destroyPending(false),
m_destroyPendingHADM(false),
@@ -151,7 +151,7 @@ ThinClientPoolDM::ThinClientPoolDM(const char* name,
m_poolSize(0),
m_numRegions(0),
m_server(0),
- m_connSema(0),
+ conn_semaphore_(0),
m_connManageTask(nullptr),
m_pingTask(nullptr),
m_updateLocatorListTask(nullptr),
@@ -282,7 +282,7 @@ void ThinClientPoolDM::startBackgroundThreads() {
m_cliCallbackTask->start();
}
- const auto& pingInterval = getPingInterval() / 2;
+ const auto& pingInterval = getPingInterval();
if (pingInterval > std::chrono::seconds::zero()) {
LOGDEBUG(
"ThinClientPoolDM::startBackgroundThreads: Scheduling ping task at %ld",
@@ -386,7 +386,7 @@ void ThinClientPoolDM::manageConnections(std::atomic<bool>& isRunning) {
LOGFINE("ThinClientPoolDM: starting manageConnections thread");
while (isRunning) {
- m_connSema.acquire();
+ conn_semaphore_.acquire();
if (isRunning) {
try {
LOGFINE(
@@ -768,7 +768,7 @@ void ThinClientPoolDM::stopPingThread() {
if (m_pingTask) {
LOGFINE("ThinClientPoolDM::destroy(): Closing ping thread.");
m_pingTask->stopNoblock();
- m_pingSema.release();
+ ping_semaphore_.release();
m_pingTask->wait();
m_pingTask = nullptr;
if (m_pingTaskId >= 0) {
@@ -782,7 +782,7 @@ void ThinClientPoolDM::stopUpdateLocatorListThread() {
if (m_updateLocatorListTask) {
LOGFINE("ThinClientPoolDM::destroy(): Closing updateLocatorList thread.");
m_updateLocatorListTask->stopNoblock();
- m_updateLocatorListSema.release();
+ update_locators_semaphore_.release();
m_updateLocatorListTask->wait();
m_updateLocatorListTask = nullptr;
if (m_updateLocatorListTaskId >= 0) {
@@ -796,7 +796,7 @@ void ThinClientPoolDM::stopCliCallbackThread() {
if (m_cliCallbackTask) {
LOGFINE("ThinClientPoolDM::destroy(): Closing cliCallback thread.");
m_cliCallbackTask->stopNoblock();
- m_cliCallbackSema.release();
+ cli_callback_semaphore_.release();
m_cliCallbackTask->wait();
m_cliCallbackTask = nullptr;
}
@@ -825,7 +825,7 @@ void ThinClientPoolDM::destroy(bool keepAlive) {
auto cacheImpl = m_connManager.getCacheImpl();
if (m_connManageTask) {
m_connManageTask->stopNoblock();
- m_connSema.release();
+ conn_semaphore_.release();
m_connManageTask->wait();
m_connManageTask = nullptr;
if (m_connManageTaskId >= 0) {
@@ -1594,7 +1594,7 @@ void ThinClientPoolDM::removeEPConnections(int numConn,
// Raise Semaphore for manage thread
if (triggerManageConn) {
- m_connSema.release();
+ conn_semaphore_.release();
}
}
@@ -1734,7 +1734,7 @@ GfErrType ThinClientPoolDM::createPoolConnectionToAEndPoint(
getStats().incPoolConnects();
getStats().setCurPoolConnections(m_poolSize);
}
- m_connSema.release();
+ conn_semaphore_.release();
return error;
}
@@ -1743,7 +1743,7 @@ void ThinClientPoolDM::reducePoolSize(int num) {
LOGFINE("removing connection %d , pool-size =%d", num, m_poolSize.load());
m_poolSize -= num;
if (m_poolSize <= 0) {
- if (m_cliCallbackTask != nullptr) m_cliCallbackSema.release();
+ if (m_cliCallbackTask != nullptr) cli_callback_semaphore_.release();
}
}
@@ -1825,7 +1825,7 @@ GfErrType ThinClientPoolDM::createPoolConnection(
break;
}
}
- m_connSema.release();
+ conn_semaphore_.release();
// if a fatal error occurred earlier and we don't have
// a connection then return this saved error
if (fatal && !conn && error != GF_NOERR) {
@@ -2067,7 +2067,7 @@ void ThinClientPoolDM::updateLocatorList(std::atomic<bool>& isRunning) {
LOGFINE("Starting updateLocatorList thread for pool %s", m_poolName.c_str());
while (isRunning) {
- m_updateLocatorListSema.acquire();
+ update_locators_semaphore_.acquire();
if (isRunning && !m_connManager.isNetDown()) {
(m_locHelper)->updateLocators(getServerGroup());
}
@@ -2079,12 +2079,13 @@ void ThinClientPoolDM::updateLocatorList(std::atomic<bool>& isRunning) {
void ThinClientPoolDM::pingServer(std::atomic<bool>& isRunning) {
LOGFINE("Starting ping thread for pool %s", m_poolName.c_str());
+ ping_semaphore_.acquire();
while (isRunning) {
- m_pingSema.acquire();
- if (isRunning && !m_connManager.isNetDown()) {
+ if (!m_connManager.isNetDown()) {
pingServerLocal();
- m_pingSema.acquire();
}
+
+ ping_semaphore_.acquire();
}
LOGFINE("Ending ping thread for pool %s", m_poolName.c_str());
@@ -2093,7 +2094,7 @@ void ThinClientPoolDM::pingServer(std::atomic<bool>& isRunning) {
void ThinClientPoolDM::cliCallback(std::atomic<bool>& isRunning) {
LOGFINE("Starting cliCallback thread for pool %s", m_poolName.c_str());
while (isRunning) {
- m_cliCallbackSema.acquire();
+ cli_callback_semaphore_.acquire();
if (isRunning) {
LOGFINE("Clearing Pdx Type Registry");
// this call for csharp client
@@ -2101,24 +2102,24 @@ void ThinClientPoolDM::cliCallback(std::atomic<bool>& isRunning) {
*(m_connManager.getCacheImpl()->getCache()));
// this call for cpp client
m_connManager.getCacheImpl()->getPdxTypeRegistry()->clear();
- m_cliCallbackSema.acquire();
+ cli_callback_semaphore_.acquire();
}
}
LOGFINE("Ending cliCallback thread for pool %s", m_poolName.c_str());
}
int ThinClientPoolDM::doPing(const ACE_Time_Value&, const void*) {
- m_pingSema.release();
+ ping_semaphore_.release();
return 0;
}
int ThinClientPoolDM::doUpdateLocatorList(const ACE_Time_Value&, const void*) {
- m_updateLocatorListSema.release();
+ update_locators_semaphore_.release();
return 0;
}
int ThinClientPoolDM::doManageConnections(const ACE_Time_Value&, const void*) {
- m_connSema.release();
+ conn_semaphore_.release();
return 0;
}
@@ -2435,8 +2436,9 @@ bool ThinClientPoolDM::checkDupAndAdd(std::shared_ptr<EventId> eventid) {
std::shared_ptr<TcrEndpoint> ThinClientPoolDM::createEP(
const char* endpointName) {
return std::make_shared<TcrPoolEndPoint>(
- endpointName, m_connManager.getCacheImpl(), m_connManager.m_failoverSema,
- m_connManager.m_cleanupSema, m_connManager.m_redundancySema, this);
+ endpointName, m_connManager.getCacheImpl(),
+ m_connManager.failover_semaphore_, m_connManager.cleanup_semaphore_,
+ m_connManager.redundancy_semaphore_, this);
}
GfErrType FunctionExecution::execute() {
diff --git a/cppcache/src/ThinClientPoolDM.hpp b/cppcache/src/ThinClientPoolDM.hpp
index ce1f8d8..63576e0 100644
--- a/cppcache/src/ThinClientPoolDM.hpp
+++ b/cppcache/src/ThinClientPoolDM.hpp
@@ -47,6 +47,7 @@
#include "ThinClientStickyManager.hpp"
#include "ThreadPool.hpp"
#include "UserAttributes.hpp"
+#include "util/concurrent/binary_semaphore.hpp"
namespace apache {
namespace geode {
@@ -196,9 +197,9 @@ class ThinClientPoolDM
PoolStats* m_stats;
bool m_sticky;
void netDown();
- ACE_Semaphore m_updateLocatorListSema;
- ACE_Semaphore m_pingSema;
- ACE_Semaphore m_cliCallbackSema;
+ binary_semaphore update_locators_semaphore_;
+ binary_semaphore ping_semaphore_;
+ binary_semaphore cli_callback_semaphore_;
volatile bool m_isDestroyed;
volatile bool m_destroyPending;
volatile bool m_destroyPendingHADM;
@@ -289,7 +290,7 @@ class ThinClientPoolDM
unsigned m_server;
// Manage Connection thread
- ACE_Semaphore m_connSema;
+ binary_semaphore conn_semaphore_;
std::unique_ptr<Task<ThinClientPoolDM>> m_connManageTask;
std::unique_ptr<Task<ThinClientPoolDM>> m_pingTask;
std::unique_ptr<Task<ThinClientPoolDM>> m_updateLocatorListTask;
diff --git a/cppcache/src/ThinClientPoolHADM.cpp b/cppcache/src/ThinClientPoolHADM.cpp
index b940569..f89db1e 100644
--- a/cppcache/src/ThinClientPoolHADM.cpp
+++ b/cppcache/src/ThinClientPoolHADM.cpp
@@ -33,7 +33,7 @@ ThinClientPoolHADM::ThinClientPoolHADM(const char* name,
TcrConnectionManager& connManager)
: ThinClientPoolDM(name, poolAttr, connManager),
m_theTcrConnManager(connManager),
- m_redundancySema(0),
+ redundancy_semaphore_(0),
m_redundancyTask(nullptr),
m_servermonitorTaskId(-1) {
m_redundancyManager = std::unique_ptr<ThinClientRedundancyManager>(
@@ -144,19 +144,20 @@ bool ThinClientPoolHADM::postFailoverAction(TcrEndpoint*) {
void ThinClientPoolHADM::redundancy(std::atomic<bool>& isRunning) {
LOGFINE("ThinClientPoolHADM: Starting maintain redundancy thread.");
+
+ redundancy_semaphore_.acquire();
while (isRunning) {
- m_redundancySema.acquire();
- if (isRunning && !m_connManager.isNetDown()) {
+ if (!m_connManager.isNetDown()) {
m_redundancyManager->maintainRedundancyLevel();
- while (m_redundancySema.tryacquire() != -1) {
- }
}
+
+ redundancy_semaphore_.acquire();
}
LOGFINE("ThinClientPoolHADM: Ending maintain redundancy thread.");
}
int ThinClientPoolHADM::checkRedundancy(const ACE_Time_Value&, const void*) {
- m_redundancySema.release();
+ redundancy_semaphore_.release();
return 0;
}
@@ -188,7 +189,7 @@ void ThinClientPoolHADM::sendNotificationCloseMsgs() {
m_servermonitorTaskId);
}
m_redundancyTask->stopNoblock();
- m_redundancySema.release();
+ redundancy_semaphore_.release();
m_redundancyTask->wait();
m_redundancyTask = nullptr;
m_redundancyManager->sendNotificationCloseMsgs();
@@ -309,8 +310,9 @@ void ThinClientPoolHADM::sendNotConMesToAllregions() {
std::shared_ptr<TcrEndpoint> ThinClientPoolHADM::createEP(
const char* endpointName) {
return std::make_shared<TcrPoolEndPoint>(
- endpointName, m_connManager.getCacheImpl(), m_connManager.m_failoverSema,
- m_connManager.m_cleanupSema, m_redundancySema, this);
+ endpointName, m_connManager.getCacheImpl(),
+ m_connManager.failover_semaphore_, m_connManager.cleanup_semaphore_,
+ redundancy_semaphore_, this);
}
} // namespace client
diff --git a/cppcache/src/ThinClientPoolHADM.hpp b/cppcache/src/ThinClientPoolHADM.hpp
index 5bd45b6..dfdf86d 100644
--- a/cppcache/src/ThinClientPoolHADM.hpp
+++ b/cppcache/src/ThinClientPoolHADM.hpp
@@ -84,7 +84,7 @@ class ThinClientPoolHADM : public ThinClientPoolDM {
GfErrType sendRequestToPrimary(TcrMessage& request, TcrMessageReply& reply);
- void triggerRedundancyThread() override { m_redundancySema.release(); }
+ void triggerRedundancyThread() override { redundancy_semaphore_.release(); }
bool isReadyForEvent() const;
@@ -107,7 +107,7 @@ class ThinClientPoolHADM : public ThinClientPoolDM {
std::unique_ptr<ThinClientRedundancyManager> m_redundancyManager;
TcrConnectionManager& m_theTcrConnManager;
- ACE_Semaphore m_redundancySema;
+ binary_semaphore redundancy_semaphore_;
std::unique_ptr<Task<ThinClientPoolHADM>> m_redundancyTask;
void redundancy(std::atomic<bool>& isRunning);
diff --git a/cppcache/src/ThinClientRedundancyManager.cpp b/cppcache/src/ThinClientRedundancyManager.cpp
index 432cc4b..5a5cf63 100644
--- a/cppcache/src/ThinClientRedundancyManager.cpp
+++ b/cppcache/src/ThinClientRedundancyManager.cpp
@@ -56,6 +56,7 @@ ThinClientRedundancyManager::ThinClientRedundancyManager(
m_locators(nullptr),
m_servers(nullptr),
m_periodicAckTask(nullptr),
+ periodic_ack_semaphore_(1),
m_processEventIdMapTaskId(-1),
m_nextAckInc(0),
m_HAenabled(false) {}
@@ -682,7 +683,7 @@ void ThinClientRedundancyManager::close() {
m_processEventIdMapTaskId);
}
m_periodicAckTask->stopNoblock();
- m_periodicAckSema.release();
+ periodic_ack_semaphore_.release();
m_periodicAckTask->wait();
m_periodicAckTask = nullptr;
}
@@ -1116,18 +1117,16 @@ void ThinClientRedundancyManager::readyForEvents() {
int ThinClientRedundancyManager::processEventIdMap(const ACE_Time_Value&,
const void*) {
- m_periodicAckSema.release();
+ periodic_ack_semaphore_.release();
return 0;
}
void ThinClientRedundancyManager::periodicAck(std::atomic<bool>& isRunning) {
+ periodic_ack_semaphore_.acquire();
+
while (isRunning) {
- m_periodicAckSema.acquire();
- if (isRunning) {
- doPeriodicAck();
- while (m_periodicAckSema.tryacquire() != -1) {
- }
- }
+ doPeriodicAck();
+ periodic_ack_semaphore_.acquire();
}
}
diff --git a/cppcache/src/ThinClientRedundancyManager.hpp b/cppcache/src/ThinClientRedundancyManager.hpp
index c048e0d..747d40a 100644
--- a/cppcache/src/ThinClientRedundancyManager.hpp
+++ b/cppcache/src/ThinClientRedundancyManager.hpp
@@ -34,6 +34,7 @@
#include "ServerLocation.hpp"
#include "Task.hpp"
#include "TcrMessage.hpp"
+#include "util/concurrent/binary_semaphore.hpp"
#include "util/synchronized_map.hpp"
namespace apache {
@@ -136,7 +137,7 @@ class ThinClientRedundancyManager {
inline bool isDurable();
int processEventIdMap(const ACE_Time_Value&, const void*);
std::unique_ptr<Task<ThinClientRedundancyManager>> m_periodicAckTask;
- ACE_Semaphore m_periodicAckSema;
+ binary_semaphore periodic_ack_semaphore_;
ExpiryTaskManager::id_type
m_processEventIdMapTaskId; // periodic check eventid map for notify ack
// and/or expiry
diff --git a/cppcache/src/util/concurrent/binary_semaphore.cpp b/cppcache/src/util/concurrent/binary_semaphore.cpp
new file mode 100644
index 0000000..f9dd465
--- /dev/null
+++ b/cppcache/src/util/concurrent/binary_semaphore.cpp
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "binary_semaphore.hpp"
+
+namespace apache {
+namespace geode {
+namespace client {
+
+binary_semaphore::binary_semaphore(bool released) : released_(released) {}
+
+void binary_semaphore::release() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ released_ = true;
+ cv_.notify_one();
+}
+
+void binary_semaphore::acquire() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ cv_.wait(lock, [this]() { return released_; });
+ released_ = false;
+}
+
+} // namespace client
+} // namespace geode
+} // namespace apache
diff --git a/cppcache/src/util/concurrent/binary_semaphore.hpp b/cppcache/src/util/concurrent/binary_semaphore.hpp
new file mode 100644
index 0000000..5d1e802
--- /dev/null
+++ b/cppcache/src/util/concurrent/binary_semaphore.hpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#ifndef GEODE_UTIL_CONCURRENT_BINARY_SEMAPHORE_H_
+#define GEODE_UTIL_CONCURRENT_BINARY_SEMAPHORE_H_
+
+#include <condition_variable>
+#include <mutex>
+
+namespace apache {
+namespace geode {
+namespace client {
+class binary_semaphore {
+ public:
+ explicit binary_semaphore(bool released);
+
+ void release();
+ void acquire();
+
+ protected:
+ bool released_;
+ std::mutex mutex_;
+ std::condition_variable cv_;
+};
+
+} // namespace client
+} // namespace geode
+} // namespace apache
+
+#endif /* GEODE_UTIL_CONCURRENT_BINARY_SEMAPHORE_H_ */