You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2021/03/24 18:07:21 UTC

[geode-native] branch develop updated: GEODE-9056: Replace ACE_Semaphore (#772)

This is an automated email from the ASF dual-hosted git repository.

bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git


The following commit(s) were added to refs/heads/develop by this push:
     new 332c578  GEODE-9056: Replace ACE_Semaphore (#772)
332c578 is described below

commit 332c578c720c96ecf898eb9fa6e2d420a921f0de
Author: Mario Salazar de Torres <ma...@est.tech>
AuthorDate: Wed Mar 24 19:04:32 2021 +0100

    GEODE-9056: Replace ACE_Semaphore (#772)
    
    - In order to progress in the task of getting rid of ACE library, a
       replacement is needed for ACE_Semaphore.
     - Currently there is no suitable Boost alternative and C++
       implementation is not in the standard until C++20. Hence STL like
       implementation was added as binary_semaphore.
     - Replaced all instances of ACE_Semaphore by binary_semaphore.
     - Fixed ill semaphore logic in several cases in which running token was
       checked twice without any specific need.
---
 cppcache/src/CqService.cpp                        | 12 ++--
 cppcache/src/CqService.hpp                        |  3 +-
 cppcache/src/TcrChunkedContext.hpp                | 17 +++---
 cppcache/src/TcrConnection.cpp                    |  4 +-
 cppcache/src/TcrConnection.hpp                    |  3 +-
 cppcache/src/TcrConnectionManager.cpp             | 68 +++++++++++------------
 cppcache/src/TcrConnectionManager.hpp             | 12 ++--
 cppcache/src/TcrEndpoint.cpp                      | 24 ++++----
 cppcache/src/TcrEndpoint.hpp                      | 12 ++--
 cppcache/src/TcrMessage.cpp                       |  2 +-
 cppcache/src/TcrMessage.hpp                       |  2 +-
 cppcache/src/TcrPoolEndPoint.cpp                  |  6 +-
 cppcache/src/TcrPoolEndPoint.hpp                  |  4 +-
 cppcache/src/ThinClientBaseDM.cpp                 | 12 +++-
 cppcache/src/ThinClientPoolDM.cpp                 | 52 ++++++++---------
 cppcache/src/ThinClientPoolDM.hpp                 |  9 +--
 cppcache/src/ThinClientPoolHADM.cpp               | 20 ++++---
 cppcache/src/ThinClientPoolHADM.hpp               |  4 +-
 cppcache/src/ThinClientRedundancyManager.cpp      | 15 +++--
 cppcache/src/ThinClientRedundancyManager.hpp      |  3 +-
 cppcache/src/util/concurrent/binary_semaphore.cpp | 40 +++++++++++++
 cppcache/src/util/concurrent/binary_semaphore.hpp | 46 +++++++++++++++
 22 files changed, 235 insertions(+), 135 deletions(-)

diff --git a/cppcache/src/CqService.cpp b/cppcache/src/CqService.cpp
index b105e7d..faad396 100644
--- a/cppcache/src/CqService.cpp
+++ b/cppcache/src/CqService.cpp
@@ -38,7 +38,7 @@ CqService::CqService(ThinClientBaseDM* tccdm,
                      StatisticsFactory* statisticsFactory)
     : m_tccdm(tccdm),
       m_statisticsFactory(statisticsFactory),
-      m_notificationSema(1),
+      notification_semaphore_{1},
       m_stats(std::make_shared<CqServiceVsdStats>(m_statisticsFactory)) {
   assert(nullptr != m_tccdm);
 
@@ -78,9 +78,9 @@ void CqService::updateStats() {
 
 bool CqService::checkAndAcquireLock() {
   if (m_running) {
-    m_notificationSema.acquire();
+    notification_semaphore_.acquire();
     if (m_running == false) {
-      m_notificationSema.release();
+      notification_semaphore_.release();
       return false;
     }
     return true;
@@ -343,9 +343,9 @@ std::shared_ptr<CqServiceStatistics> CqService::getCqServiceStatistics() {
 void CqService::closeCqService() {
   if (m_running) {
     m_running = false;
-    m_notificationSema.acquire();
+    notification_semaphore_.acquire();
     cleanup();
-    m_notificationSema.release();
+    notification_semaphore_.release();
   }
 }
 void CqService::closeAllCqs() {
@@ -385,7 +385,7 @@ void CqService::receiveNotification(TcrMessage* msg) {
   invokeCqListeners(msg->getCqs(), msg->getMessageTypeForCq(), msg->getKey(),
                     msg->getValue(), msg->getDeltaBytes(), msg->getEventId());
   _GEODE_SAFE_DELETE(msg);
-  m_notificationSema.release();
+  notification_semaphore_.release();
 }
 
 /**
diff --git a/cppcache/src/CqService.hpp b/cppcache/src/CqService.hpp
index 76e34ca..7a6df07 100644
--- a/cppcache/src/CqService.hpp
+++ b/cppcache/src/CqService.hpp
@@ -34,6 +34,7 @@
 #include "ErrType.hpp"
 #include "Queue.hpp"
 #include "TcrMessage.hpp"
+#include "util/concurrent/binary_semaphore.hpp"
 #include "util/synchronized_map.hpp"
 
 namespace apache {
@@ -52,7 +53,7 @@ class TcrEndpoint;
 class CqService : public std::enable_shared_from_this<CqService> {
   ThinClientBaseDM* m_tccdm;
   statistics::StatisticsFactory* m_statisticsFactory;
-  ACE_Semaphore m_notificationSema;
+  binary_semaphore notification_semaphore_;
 
   bool m_running;
   synchronized_map<std::unordered_map<std::string, std::shared_ptr<CqQuery>>,
diff --git a/cppcache/src/TcrChunkedContext.hpp b/cppcache/src/TcrChunkedContext.hpp
index 00e22d1..0b9eec8 100644
--- a/cppcache/src/TcrChunkedContext.hpp
+++ b/cppcache/src/TcrChunkedContext.hpp
@@ -27,6 +27,7 @@
 
 #include "AppDomainContext.hpp"
 #include "Utils.hpp"
+#include "util/concurrent/binary_semaphore.hpp"
 
 namespace apache {
 namespace geode {
@@ -38,7 +39,7 @@ namespace client {
  */
 class TcrChunkedResult {
  private:
-  ACE_Semaphore* m_finalizeSema;
+  binary_semaphore* finalize_semaphore_;
   std::shared_ptr<Exception> m_ex;
   bool m_inSameThread;
 
@@ -52,13 +53,13 @@ class TcrChunkedResult {
 
  public:
   inline TcrChunkedResult()
-      : m_finalizeSema(nullptr),
+      : finalize_semaphore_(nullptr),
         m_ex(nullptr),
         m_inSameThread(false),
         m_dsmemId(0) {}
   virtual ~TcrChunkedResult() noexcept {}
-  void setFinalizeSemaphore(ACE_Semaphore* finalizeSema) {
-    m_finalizeSema = finalizeSema;
+  void setFinalizeSemaphore(binary_semaphore* finalizeSema) {
+    finalize_semaphore_ = finalizeSema;
   }
   virtual void setEndpointMemId(uint16_t dsmemId) { m_dsmemId = dsmemId; }
   uint16_t getEndpointMemId() { return m_dsmemId; }
@@ -83,8 +84,8 @@ class TcrChunkedResult {
       m_inSameThread = true;
       return;
     }
-    if (m_finalizeSema != nullptr) {
-      m_finalizeSema->release();
+    if (finalize_semaphore_ != nullptr) {
+      finalize_semaphore_->release();
     } else {
       throw NullPointerException("TcrChunkedResult::finalize: null semaphore");
     }
@@ -96,8 +97,8 @@ class TcrChunkedResult {
    */
   virtual void waitFinalize() const {
     if (m_inSameThread) return;
-    if (m_finalizeSema != nullptr) {
-      m_finalizeSema->acquire();
+    if (finalize_semaphore_ != nullptr) {
+      finalize_semaphore_->acquire();
     } else {
       throw NullPointerException(
           "TcrChunkedResult::waitFinalize: null semaphore");
diff --git a/cppcache/src/TcrConnection.cpp b/cppcache/src/TcrConnection.cpp
index 932073e..8dd173e 100644
--- a/cppcache/src/TcrConnection.cpp
+++ b/cppcache/src/TcrConnection.cpp
@@ -110,7 +110,7 @@ TcrConnection::TcrConnection(const TcrConnectionManager& connectionManager)
       m_hasServerQueue(NON_REDUNDANT_SERVER),
       m_queueSize(0),
       m_port(0),
-      m_chunksProcessSema(0),
+      chunks_process_semaphore_(0),
       m_isBeingUsed(false),
       m_isUsed(0),
       m_poolDM(nullptr) {}
@@ -755,7 +755,7 @@ void TcrConnection::readMessageChunked(TcrMessageReply& reply,
   reply.setTransId(responseHeader.transactionId);
 
   // Initialize the chunk processing
-  reply.startProcessChunk(m_chunksProcessSema);
+  reply.startProcessChunk(chunks_process_semaphore_);
 
   // indicate an end to chunk processing and wait for processing
   // to end even if reading the chunks fails in middle
diff --git a/cppcache/src/TcrConnection.hpp b/cppcache/src/TcrConnection.hpp
index acb974e..4889cda 100644
--- a/cppcache/src/TcrConnection.hpp
+++ b/cppcache/src/TcrConnection.hpp
@@ -29,6 +29,7 @@
 
 #include "Connector.hpp"
 #include "TcrMessage.hpp"
+#include "util/concurrent/binary_semaphore.hpp"
 #include "util/synchronized_set.hpp"
 
 #define DEFAULT_TIMEOUT_RETRIES 12
@@ -355,7 +356,7 @@ class TcrConnection {
   uint16_t m_port;
 
   // semaphore to synchronize with the chunked response processing thread
-  ACE_Semaphore m_chunksProcessSema;
+  binary_semaphore chunks_process_semaphore_;
 
   std::chrono::steady_clock::time_point m_creationTime;
   std::chrono::steady_clock::time_point m_lastAccessed;
diff --git a/cppcache/src/TcrConnectionManager.cpp b/cppcache/src/TcrConnectionManager.cpp
index 7e7adeb..7449e2c 100644
--- a/cppcache/src/TcrConnectionManager.cpp
+++ b/cppcache/src/TcrConnectionManager.cpp
@@ -48,15 +48,15 @@ const char *TcrConnectionManager::NC_CleanUp = "NC CleanUp";
 TcrConnectionManager::TcrConnectionManager(CacheImpl *cache)
     : m_cache(cache),
       m_initGuard(false),
-      m_failoverSema(0),
+      failover_semaphore_(0),
       m_failoverTask(nullptr),
-      m_cleanupSema(0),
+      cleanup_semaphore_(0),
       m_cleanupTask(nullptr),
       m_pingTaskId(-1),
       m_servermonitorTaskId(-1),
       // Create the queues with flag to not delete the objects
-      m_notifyCleanupSemaList(false),
-      m_redundancySema(0),
+      notify_cleanup_semaphore_list_(false),
+      redundancy_semaphore_(0),
       m_redundancyTask(nullptr),
       m_isDurable(false),
       m_isNetDown(false) {
@@ -72,7 +72,7 @@ void TcrConnectionManager::init(bool isPool) {
   }
   auto &props = m_cache->getDistributedSystem().getSystemProperties();
   m_isDurable = !props.durableClientId().empty();
-  auto pingInterval = (props.pingInterval() / 2);
+  auto pingInterval = props.pingInterval();
   if (!isPool) {
     ACE_Event_Handler *connectionChecker =
         new ExpiryHandler_T<TcrConnectionManager>(
@@ -120,7 +120,7 @@ void TcrConnectionManager::close() {
 
   if (m_failoverTask != nullptr) {
     m_failoverTask->stopNoblock();
-    m_failoverSema.release();
+    failover_semaphore_.release();
     m_failoverTask->wait();
     m_failoverTask = nullptr;
   }
@@ -135,7 +135,7 @@ void TcrConnectionManager::readyForEvents() {
 TcrConnectionManager::~TcrConnectionManager() {
   if (m_cleanupTask != nullptr) {
     m_cleanupTask->stopNoblock();
-    m_cleanupSema.release();
+    cleanup_semaphore_.release();
     m_cleanupTask->wait();
     // Clean notification lists if something remains in there; see bug #250
     cleanNotificationLists();
@@ -207,9 +207,9 @@ TcrEndpoint *TcrConnectionManager::addRefToTcrEndpoint(std::string endpointName,
   const auto &find = m_endpoints.find(endpointName);
   if (find == m_endpoints.end()) {
     // this endpoint does not exist
-    ep = std::make_shared<TcrEndpoint>(endpointName, m_cache, m_failoverSema,
-                                       m_cleanupSema, m_redundancySema, dm,
-                                       false);
+    ep = std::make_shared<TcrEndpoint>(endpointName, m_cache,
+                                       failover_semaphore_, cleanup_semaphore_,
+                                       redundancy_semaphore_, dm, false);
     m_endpoints.emplace(endpointName, ep);
   } else {
     ep = find->second;
@@ -276,22 +276,21 @@ int TcrConnectionManager::checkConnection(const ACE_Time_Value &,
 
 int TcrConnectionManager::checkRedundancy(const ACE_Time_Value &,
                                           const void *) {
-  m_redundancySema.release();
+  redundancy_semaphore_.release();
   return 0;
 }
 
 void TcrConnectionManager::failover(std::atomic<bool> &isRunning) {
   LOGFINE("TcrConnectionManager: starting failover thread");
+
+  failover_semaphore_.acquire();
   while (isRunning) {
-    m_failoverSema.acquire();
-    if (isRunning && !m_isNetDown) {
+    if (!m_isNetDown) {
       try {
         std::lock_guard<decltype(m_distMngrsLock)> guard(m_distMngrsLock);
         for (const auto &it : m_distMngrs) {
           it->failover();
         }
-        while (m_failoverSema.tryacquire() != -1) {
-        }
       } catch (const Exception &e) {
         LOGERROR(e.what());
       } catch (const std::exception &e) {
@@ -302,7 +301,10 @@ void TcrConnectionManager::failover(std::atomic<bool> &isRunning) {
             "different endpoint");
       }
     }
+
+    failover_semaphore_.acquire();
   }
+
   LOGFINE("TcrConnectionManager: ending failover thread");
 }
 
@@ -409,42 +411,36 @@ void TcrConnectionManager::revive() {
 
 void TcrConnectionManager::redundancy(std::atomic<bool> &isRunning) {
   LOGFINE("Starting subscription maintain redundancy thread.");
+  redundancy_semaphore_.acquire();
+
   while (isRunning) {
-    m_redundancySema.acquire();
-    if (isRunning && !m_isNetDown) {
+    if (!m_isNetDown) {
       m_redundancyManager->maintainRedundancyLevel();
-      while (m_redundancySema.tryacquire() != -1) {
-      }
     }
+
+    redundancy_semaphore_.acquire();
   }
   LOGFINE("Ending subscription maintain redundancy thread.");
 }
 
 void TcrConnectionManager::addNotificationForDeletion(
     Task<TcrEndpoint> *notifyReceiver, TcrConnection *notifyConnection,
-    ACE_Semaphore &notifyCleanupSema) {
+    binary_semaphore &notifyCleanupSema) {
   std::lock_guard<decltype(m_notificationLock)> guard(m_notificationLock);
   m_connectionReleaseList.put(notifyConnection);
   m_receiverReleaseList.put(notifyReceiver);
-  m_notifyCleanupSemaList.put(&notifyCleanupSema);
+  notify_cleanup_semaphore_list_.put(&notifyCleanupSema);
 }
 
 void TcrConnectionManager::cleanup(std::atomic<bool> &isRunning) {
   LOGFINE("TcrConnectionManager: starting cleanup thread");
-  do {
-    //  If we block on acquire, the queue must be empty (precondition).
-    if (m_receiverReleaseList.size() == 0) {
-      LOGDEBUG(
-          "TcrConnectionManager::cleanup(): waiting to acquire cleanup "
-          "semaphore.");
-      m_cleanupSema.acquire();
-    }
-    cleanNotificationLists();
 
-    while (m_cleanupSema.tryacquire() != -1) {
-    }
+  cleanup_semaphore_.acquire();
 
-  } while (isRunning);
+  while (isRunning) {
+    cleanNotificationLists();
+    cleanup_semaphore_.acquire();
+  }
 
   LOGFINE("TcrConnectionManager: ending cleanup thread");
   //  Postcondition - all notification channels should be cleaned up by the end
@@ -454,7 +450,7 @@ void TcrConnectionManager::cleanup(std::atomic<bool> &isRunning) {
 void TcrConnectionManager::cleanNotificationLists() {
   Task<TcrEndpoint> *notifyReceiver;
   TcrConnection *notifyConnection;
-  ACE_Semaphore *notifyCleanupSema;
+  binary_semaphore *semaphore;
 
   while (true) {
     {
@@ -462,12 +458,12 @@ void TcrConnectionManager::cleanNotificationLists() {
       notifyReceiver = m_receiverReleaseList.get();
       if (!notifyReceiver) break;
       notifyConnection = m_connectionReleaseList.get();
-      notifyCleanupSema = m_notifyCleanupSemaList.get();
+      semaphore = notify_cleanup_semaphore_list_.get();
     }
     notifyReceiver->wait();
     //_GEODE_SAFE_DELETE(notifyReceiver);
     _GEODE_SAFE_DELETE(notifyConnection);
-    notifyCleanupSema->release();
+    semaphore->release();
   }
 }
 
diff --git a/cppcache/src/TcrConnectionManager.hpp b/cppcache/src/TcrConnectionManager.hpp
index 393d472..ae509d7 100644
--- a/cppcache/src/TcrConnectionManager.hpp
+++ b/cppcache/src/TcrConnectionManager.hpp
@@ -89,7 +89,7 @@ class TcrConnectionManager {
 
   void addNotificationForDeletion(Task<TcrEndpoint>* notifyReceiver,
                                   TcrConnection* notifyConnection,
-                                  ACE_Semaphore& notifyCleanupSema);
+                                  binary_semaphore& notifyCleanupSema);
 
   void processMarker();
 
@@ -107,7 +107,7 @@ class TcrConnectionManager {
       TcrHADistributionManager* theHADM = nullptr,
       ThinClientRegion* region = nullptr);
 
-  inline void triggerRedundancyThread() { m_redundancySema.release(); }
+  inline void triggerRedundancyThread() { redundancy_semaphore_.release(); }
 
   inline void acquireRedundancyLock() {
     m_redundancyManager->acquireRedundancyLock();
@@ -142,7 +142,7 @@ class TcrConnectionManager {
   std::list<ThinClientBaseDM*> m_distMngrs;
   std::recursive_mutex m_distMngrsLock;
 
-  ACE_Semaphore m_failoverSema;
+  binary_semaphore failover_semaphore_;
   std::unique_ptr<Task<TcrConnectionManager>> m_failoverTask;
 
   bool removeRefToEndpoint(TcrEndpoint* ep, bool keepEndpoint = false);
@@ -152,16 +152,16 @@ class TcrConnectionManager {
   void initializeHAEndpoints(const char* endpointsStr);
   void removeHAEndpoints();
 
-  ACE_Semaphore m_cleanupSema;
+  binary_semaphore cleanup_semaphore_;
   std::unique_ptr<Task<TcrConnectionManager>> m_cleanupTask;
 
   ExpiryTaskManager::id_type m_pingTaskId;
   ExpiryTaskManager::id_type m_servermonitorTaskId;
   Queue<Task<TcrEndpoint>*> m_receiverReleaseList;
   Queue<TcrConnection*> m_connectionReleaseList;
-  Queue<ACE_Semaphore*> m_notifyCleanupSemaList;
+  Queue<binary_semaphore*> notify_cleanup_semaphore_list_;
 
-  ACE_Semaphore m_redundancySema;
+  binary_semaphore redundancy_semaphore_;
   std::unique_ptr<Task<TcrConnectionManager>> m_redundancyTask;
   std::recursive_mutex m_notificationLock;
   bool m_isDurable;
diff --git a/cppcache/src/TcrEndpoint.cpp b/cppcache/src/TcrEndpoint.cpp
index f4f8924..6b3f75d 100644
--- a/cppcache/src/TcrEndpoint.cpp
+++ b/cppcache/src/TcrEndpoint.cpp
@@ -39,9 +39,9 @@ namespace client {
 const char* TcrEndpoint::NC_Notification = "NC Notification";
 
 TcrEndpoint::TcrEndpoint(const std::string& name, CacheImpl* cacheImpl,
-                         ACE_Semaphore& failoverSema,
-                         ACE_Semaphore& cleanupSema,
-                         ACE_Semaphore& redundancySema, ThinClientBaseDM* DM,
+                         binary_semaphore& failoverSema,
+                         binary_semaphore& cleanupSema,
+                         binary_semaphore& redundancySema, ThinClientBaseDM* DM,
                          bool isMultiUserMode)
     : m_notifyConnection(nullptr),
       m_notifyReceiver(nullptr),
@@ -53,12 +53,12 @@ TcrEndpoint::TcrEndpoint(const std::string& name, CacheImpl* cacheImpl,
       m_needToConnectInLock(false),
       m_isQueueHosted(false),
       m_uniqueId(0),
-      m_failoverSema(failoverSema),
-      m_cleanupSema(cleanupSema),
-      m_redundancySema(redundancySema),
+      failover_semaphore_(failoverSema),
+      cleanup_semaphore_(cleanupSema),
+      redundancy_semaphore_(redundancySema),
       m_baseDM(DM),
       m_name(name),
-      m_notificationCleanupSema(0),
+      notification_cleanup_semaphore_(0),
       m_numberOfTimesFailed(0),
       m_numRegions(0),
       m_pingTimeouts(0),
@@ -102,7 +102,7 @@ TcrEndpoint::~TcrEndpoint() {
   while (m_notifyCount > 0) {
     LOGDEBUG("TcrEndpoint::~TcrEndpoint(): reducing notify count at %d",
              m_notifyCount);
-    m_notificationCleanupSema.acquire();
+    notification_cleanup_semaphore_.acquire();
     m_notifyCount--;
   }
   LOGFINE("Connection to %s deleted", m_name.c_str());
@@ -1162,8 +1162,8 @@ void TcrEndpoint::setConnectionStatus(bool status) {
 }
 
 void TcrEndpoint::triggerRedundancyThread() {
-  m_failoverSema.release();
-  m_redundancySema.release();
+  failover_semaphore_.release();
+  redundancy_semaphore_.release();
 }
 
 void TcrEndpoint::closeConnection(TcrConnection*& conn) {
@@ -1197,9 +1197,9 @@ void TcrEndpoint::closeNotification() {
   m_notifyReceiver->stopNoblock();
   TcrConnectionManager& tccm = m_cacheImpl->tcrConnectionManager();
   tccm.addNotificationForDeletion(m_notifyReceiver.get(), m_notifyConnection,
-                                  m_notificationCleanupSema);
+                                  notification_cleanup_semaphore_);
   m_notifyCount++;
-  m_cleanupSema.release();
+  cleanup_semaphore_.release();
   m_isQueueHosted = false;
   LOGFINEST(
       "Added susbcription channel for deletion and "
diff --git a/cppcache/src/TcrEndpoint.hpp b/cppcache/src/TcrEndpoint.hpp
index 43b062a..bfbda3c 100644
--- a/cppcache/src/TcrEndpoint.hpp
+++ b/cppcache/src/TcrEndpoint.hpp
@@ -52,8 +52,8 @@ class TcrEndpoint : public std::enable_shared_from_this<TcrEndpoint> {
  public:
   TcrEndpoint(
       const std::string& name, CacheImpl* cacheImpl,
-      ACE_Semaphore& failoverSema, ACE_Semaphore& cleanupSema,
-      ACE_Semaphore& redundancySema, ThinClientBaseDM* dm = nullptr,
+      binary_semaphore& failoverSema, binary_semaphore& cleanupSema,
+      binary_semaphore& redundancySema, ThinClientBaseDM* dm = nullptr,
       bool isMultiUserMode = false);  // TODO: need to look for endpoint case
 
   virtual ~TcrEndpoint();
@@ -205,16 +205,16 @@ class TcrEndpoint : public std::enable_shared_from_this<TcrEndpoint> {
 
  private:
   int64_t m_uniqueId;
-  ACE_Semaphore& m_failoverSema;
-  ACE_Semaphore& m_cleanupSema;
-  ACE_Semaphore& m_redundancySema;
+  binary_semaphore& failover_semaphore_;
+  binary_semaphore& cleanup_semaphore_;
+  binary_semaphore& redundancy_semaphore_;
   ThinClientBaseDM* m_baseDM;
   std::string m_name;
   std::list<ThinClientBaseDM*> m_distMgrs;
   std::recursive_mutex m_endpointAuthenticationLock;
   std::recursive_mutex m_connectionLock;
   std::recursive_mutex m_distMgrsLock;
-  ACE_Semaphore m_notificationCleanupSema;
+  binary_semaphore notification_cleanup_semaphore_;
   synchronized_set<std::unordered_set<uint16_t>> m_ports;
   int32_t m_numberOfTimesFailed;
   int m_numRegions;
diff --git a/cppcache/src/TcrMessage.cpp b/cppcache/src/TcrMessage.cpp
index fcf621a..e8002bd 100644
--- a/cppcache/src/TcrMessage.cpp
+++ b/cppcache/src/TcrMessage.cpp
@@ -906,7 +906,7 @@ void TcrMessage::writeMessageLength() {
                                            // the beginning.
 }
 
-void TcrMessage::startProcessChunk(ACE_Semaphore& finalizeSema) {
+void TcrMessage::startProcessChunk(binary_semaphore& finalizeSema) {
   if (m_msgTypeRequest == TcrMessage::EXECUTECQ_MSG_TYPE ||
       m_msgTypeRequest == TcrMessage::STOPCQ_MSG_TYPE ||
       m_msgTypeRequest == TcrMessage::CLOSECQ_MSG_TYPE ||
diff --git a/cppcache/src/TcrMessage.hpp b/cppcache/src/TcrMessage.hpp
index 9230600..82d5d2c 100644
--- a/cppcache/src/TcrMessage.hpp
+++ b/cppcache/src/TcrMessage.hpp
@@ -182,7 +182,7 @@ class TcrMessage {
                const SerializationRegistry& serializationRegistry,
                MemberListForVersionStamp& memberListForVersionStamp);
 
-  void startProcessChunk(ACE_Semaphore& finalizeSema);
+  void startProcessChunk(binary_semaphore& finalizeSema);
   // nullptr chunk means that this is the last chunk
   void processChunk(const std::vector<uint8_t>& chunk, int32_t chunkLen,
                     uint16_t endpointmemId,
diff --git a/cppcache/src/TcrPoolEndPoint.cpp b/cppcache/src/TcrPoolEndPoint.cpp
index 14154ad..c270929 100644
--- a/cppcache/src/TcrPoolEndPoint.cpp
+++ b/cppcache/src/TcrPoolEndPoint.cpp
@@ -26,9 +26,9 @@ namespace geode {
 namespace client {
 
 TcrPoolEndPoint::TcrPoolEndPoint(const std::string& name, CacheImpl* cache,
-                                 ACE_Semaphore& failoverSema,
-                                 ACE_Semaphore& cleanupSema,
-                                 ACE_Semaphore& redundancySema,
+                                 binary_semaphore& failoverSema,
+                                 binary_semaphore& cleanupSema,
+                                 binary_semaphore& redundancySema,
                                  ThinClientPoolDM* dm)
     : TcrEndpoint(name, cache, failoverSema, cleanupSema, redundancySema, dm),
       m_dm(dm) {}
diff --git a/cppcache/src/TcrPoolEndPoint.hpp b/cppcache/src/TcrPoolEndPoint.hpp
index 61b20e2..3fce7d8 100644
--- a/cppcache/src/TcrPoolEndPoint.hpp
+++ b/cppcache/src/TcrPoolEndPoint.hpp
@@ -32,8 +32,8 @@ class ThinClientPoolDM;
 class TcrPoolEndPoint : public TcrEndpoint {
  public:
   TcrPoolEndPoint(const std::string& name, CacheImpl* cache,
-                  ACE_Semaphore& failoverSema, ACE_Semaphore& cleanupSema,
-                  ACE_Semaphore& redundancySema, ThinClientPoolDM* dm);
+                  binary_semaphore& failoverSema, binary_semaphore& cleanupSema,
+                  binary_semaphore& redundancySema, ThinClientPoolDM* dm);
   ThinClientPoolDM* getPoolHADM() const override;
 
   bool checkDupAndAdd(std::shared_ptr<EventId> eventid) override;
diff --git a/cppcache/src/ThinClientBaseDM.cpp b/cppcache/src/ThinClientBaseDM.cpp
index a47be6d..ebf9641 100644
--- a/cppcache/src/ThinClientBaseDM.cpp
+++ b/cppcache/src/ThinClientBaseDM.cpp
@@ -217,13 +217,23 @@ void ThinClientBaseDM::processChunks(std::atomic<bool>& isRunning) {
   TcrChunkedContext* chunk;
   LOGFINE("Starting chunk process thread for region %s",
           (m_region ? m_region->getFullPath().c_str() : "(null)"));
+
+  std::chrono::milliseconds wait_for_chunk{100};
+  chunk = m_chunks.getFor(wait_for_chunk);
+
   while (isRunning) {
-    chunk = m_chunks.getFor(std::chrono::microseconds(100000));
     if (chunk) {
       chunk->handleChunk(false);
       _GEODE_SAFE_DELETE(chunk);
     }
+
+    chunk = m_chunks.getFor(wait_for_chunk);
   }
+
+  if (chunk) {
+    _GEODE_SAFE_DELETE(chunk);
+  }
+
   LOGFINE("Ending chunk process thread for region %s",
           (m_region ? m_region->getFullPath().c_str() : "(null)"));
 }
diff --git a/cppcache/src/ThinClientPoolDM.cpp b/cppcache/src/ThinClientPoolDM.cpp
index aaccde2..80b464d 100644
--- a/cppcache/src/ThinClientPoolDM.cpp
+++ b/cppcache/src/ThinClientPoolDM.cpp
@@ -140,9 +140,9 @@ ThinClientPoolDM::ThinClientPoolDM(const char* name,
       m_poolName(name),
       m_stats(nullptr),
       m_sticky(false),
-      m_updateLocatorListSema(0),
-      m_pingSema(0),
-      m_cliCallbackSema(0),
+      update_locators_semaphore_(0),
+      ping_semaphore_(0),
+      cli_callback_semaphore_(0),
       m_isDestroyed(false),
       m_destroyPending(false),
       m_destroyPendingHADM(false),
@@ -151,7 +151,7 @@ ThinClientPoolDM::ThinClientPoolDM(const char* name,
       m_poolSize(0),
       m_numRegions(0),
       m_server(0),
-      m_connSema(0),
+      conn_semaphore_(0),
       m_connManageTask(nullptr),
       m_pingTask(nullptr),
       m_updateLocatorListTask(nullptr),
@@ -282,7 +282,7 @@ void ThinClientPoolDM::startBackgroundThreads() {
     m_cliCallbackTask->start();
   }
 
-  const auto& pingInterval = getPingInterval() / 2;
+  const auto& pingInterval = getPingInterval();
   if (pingInterval > std::chrono::seconds::zero()) {
     LOGDEBUG(
         "ThinClientPoolDM::startBackgroundThreads: Scheduling ping task at %ld",
@@ -386,7 +386,7 @@ void ThinClientPoolDM::manageConnections(std::atomic<bool>& isRunning) {
   LOGFINE("ThinClientPoolDM: starting manageConnections thread");
 
   while (isRunning) {
-    m_connSema.acquire();
+    conn_semaphore_.acquire();
     if (isRunning) {
       try {
         LOGFINE(
@@ -768,7 +768,7 @@ void ThinClientPoolDM::stopPingThread() {
   if (m_pingTask) {
     LOGFINE("ThinClientPoolDM::destroy(): Closing ping thread.");
     m_pingTask->stopNoblock();
-    m_pingSema.release();
+    ping_semaphore_.release();
     m_pingTask->wait();
     m_pingTask = nullptr;
     if (m_pingTaskId >= 0) {
@@ -782,7 +782,7 @@ void ThinClientPoolDM::stopUpdateLocatorListThread() {
   if (m_updateLocatorListTask) {
     LOGFINE("ThinClientPoolDM::destroy(): Closing updateLocatorList thread.");
     m_updateLocatorListTask->stopNoblock();
-    m_updateLocatorListSema.release();
+    update_locators_semaphore_.release();
     m_updateLocatorListTask->wait();
     m_updateLocatorListTask = nullptr;
     if (m_updateLocatorListTaskId >= 0) {
@@ -796,7 +796,7 @@ void ThinClientPoolDM::stopCliCallbackThread() {
   if (m_cliCallbackTask) {
     LOGFINE("ThinClientPoolDM::destroy(): Closing cliCallback thread.");
     m_cliCallbackTask->stopNoblock();
-    m_cliCallbackSema.release();
+    cli_callback_semaphore_.release();
     m_cliCallbackTask->wait();
     m_cliCallbackTask = nullptr;
   }
@@ -825,7 +825,7 @@ void ThinClientPoolDM::destroy(bool keepAlive) {
     auto cacheImpl = m_connManager.getCacheImpl();
     if (m_connManageTask) {
       m_connManageTask->stopNoblock();
-      m_connSema.release();
+      conn_semaphore_.release();
       m_connManageTask->wait();
       m_connManageTask = nullptr;
       if (m_connManageTaskId >= 0) {
@@ -1594,7 +1594,7 @@ void ThinClientPoolDM::removeEPConnections(int numConn,
 
   // Raise Semaphore for manage thread
   if (triggerManageConn) {
-    m_connSema.release();
+    conn_semaphore_.release();
   }
 }
 
@@ -1734,7 +1734,7 @@ GfErrType ThinClientPoolDM::createPoolConnectionToAEndPoint(
     getStats().incPoolConnects();
     getStats().setCurPoolConnections(m_poolSize);
   }
-  m_connSema.release();
+  conn_semaphore_.release();
 
   return error;
 }
@@ -1743,7 +1743,7 @@ void ThinClientPoolDM::reducePoolSize(int num) {
   LOGFINE("removing connection %d ,  pool-size =%d", num, m_poolSize.load());
   m_poolSize -= num;
   if (m_poolSize <= 0) {
-    if (m_cliCallbackTask != nullptr) m_cliCallbackSema.release();
+    if (m_cliCallbackTask != nullptr) cli_callback_semaphore_.release();
   }
 }
 
@@ -1825,7 +1825,7 @@ GfErrType ThinClientPoolDM::createPoolConnection(
       break;
     }
   }
-  m_connSema.release();
+  conn_semaphore_.release();
   // if a fatal error occurred earlier and we don't have
   // a connection then return this saved error
   if (fatal && !conn && error != GF_NOERR) {
@@ -2067,7 +2067,7 @@ void ThinClientPoolDM::updateLocatorList(std::atomic<bool>& isRunning) {
   LOGFINE("Starting updateLocatorList thread for pool %s", m_poolName.c_str());
 
   while (isRunning) {
-    m_updateLocatorListSema.acquire();
+    update_locators_semaphore_.acquire();
     if (isRunning && !m_connManager.isNetDown()) {
       (m_locHelper)->updateLocators(getServerGroup());
     }
@@ -2079,12 +2079,13 @@ void ThinClientPoolDM::updateLocatorList(std::atomic<bool>& isRunning) {
 void ThinClientPoolDM::pingServer(std::atomic<bool>& isRunning) {
   LOGFINE("Starting ping thread for pool %s", m_poolName.c_str());
 
+  ping_semaphore_.acquire();
   while (isRunning) {
-    m_pingSema.acquire();
-    if (isRunning && !m_connManager.isNetDown()) {
+    if (!m_connManager.isNetDown()) {
       pingServerLocal();
-      m_pingSema.acquire();
     }
+
+    ping_semaphore_.acquire();
   }
 
   LOGFINE("Ending ping thread for pool %s", m_poolName.c_str());
@@ -2093,7 +2094,7 @@ void ThinClientPoolDM::pingServer(std::atomic<bool>& isRunning) {
 void ThinClientPoolDM::cliCallback(std::atomic<bool>& isRunning) {
   LOGFINE("Starting cliCallback thread for pool %s", m_poolName.c_str());
   while (isRunning) {
-    m_cliCallbackSema.acquire();
+    cli_callback_semaphore_.acquire();
     if (isRunning) {
       LOGFINE("Clearing Pdx Type Registry");
       // this call for csharp client
@@ -2101,24 +2102,24 @@ void ThinClientPoolDM::cliCallback(std::atomic<bool>& isRunning) {
           *(m_connManager.getCacheImpl()->getCache()));
       // this call for cpp client
       m_connManager.getCacheImpl()->getPdxTypeRegistry()->clear();
-      m_cliCallbackSema.acquire();
+      cli_callback_semaphore_.acquire();
     }
   }
   LOGFINE("Ending cliCallback thread for pool %s", m_poolName.c_str());
 }
 
 int ThinClientPoolDM::doPing(const ACE_Time_Value&, const void*) {
-  m_pingSema.release();
+  ping_semaphore_.release();
   return 0;
 }
 
 int ThinClientPoolDM::doUpdateLocatorList(const ACE_Time_Value&, const void*) {
-  m_updateLocatorListSema.release();
+  update_locators_semaphore_.release();
   return 0;
 }
 
 int ThinClientPoolDM::doManageConnections(const ACE_Time_Value&, const void*) {
-  m_connSema.release();
+  conn_semaphore_.release();
   return 0;
 }
 
@@ -2435,8 +2436,9 @@ bool ThinClientPoolDM::checkDupAndAdd(std::shared_ptr<EventId> eventid) {
 std::shared_ptr<TcrEndpoint> ThinClientPoolDM::createEP(
     const char* endpointName) {
   return std::make_shared<TcrPoolEndPoint>(
-      endpointName, m_connManager.getCacheImpl(), m_connManager.m_failoverSema,
-      m_connManager.m_cleanupSema, m_connManager.m_redundancySema, this);
+      endpointName, m_connManager.getCacheImpl(),
+      m_connManager.failover_semaphore_, m_connManager.cleanup_semaphore_,
+      m_connManager.redundancy_semaphore_, this);
 }
 
 GfErrType FunctionExecution::execute() {
diff --git a/cppcache/src/ThinClientPoolDM.hpp b/cppcache/src/ThinClientPoolDM.hpp
index ce1f8d8..63576e0 100644
--- a/cppcache/src/ThinClientPoolDM.hpp
+++ b/cppcache/src/ThinClientPoolDM.hpp
@@ -47,6 +47,7 @@
 #include "ThinClientStickyManager.hpp"
 #include "ThreadPool.hpp"
 #include "UserAttributes.hpp"
+#include "util/concurrent/binary_semaphore.hpp"
 
 namespace apache {
 namespace geode {
@@ -196,9 +197,9 @@ class ThinClientPoolDM
   PoolStats* m_stats;
   bool m_sticky;
   void netDown();
-  ACE_Semaphore m_updateLocatorListSema;
-  ACE_Semaphore m_pingSema;
-  ACE_Semaphore m_cliCallbackSema;
+  binary_semaphore update_locators_semaphore_;
+  binary_semaphore ping_semaphore_;
+  binary_semaphore cli_callback_semaphore_;
   volatile bool m_isDestroyed;
   volatile bool m_destroyPending;
   volatile bool m_destroyPendingHADM;
@@ -289,7 +290,7 @@ class ThinClientPoolDM
   unsigned m_server;
 
   // Manage Connection thread
-  ACE_Semaphore m_connSema;
+  binary_semaphore conn_semaphore_;
   std::unique_ptr<Task<ThinClientPoolDM>> m_connManageTask;
   std::unique_ptr<Task<ThinClientPoolDM>> m_pingTask;
   std::unique_ptr<Task<ThinClientPoolDM>> m_updateLocatorListTask;
diff --git a/cppcache/src/ThinClientPoolHADM.cpp b/cppcache/src/ThinClientPoolHADM.cpp
index b940569..f89db1e 100644
--- a/cppcache/src/ThinClientPoolHADM.cpp
+++ b/cppcache/src/ThinClientPoolHADM.cpp
@@ -33,7 +33,7 @@ ThinClientPoolHADM::ThinClientPoolHADM(const char* name,
                                        TcrConnectionManager& connManager)
     : ThinClientPoolDM(name, poolAttr, connManager),
       m_theTcrConnManager(connManager),
-      m_redundancySema(0),
+      redundancy_semaphore_(0),
       m_redundancyTask(nullptr),
       m_servermonitorTaskId(-1) {
   m_redundancyManager = std::unique_ptr<ThinClientRedundancyManager>(
@@ -144,19 +144,20 @@ bool ThinClientPoolHADM::postFailoverAction(TcrEndpoint*) {
 
 void ThinClientPoolHADM::redundancy(std::atomic<bool>& isRunning) {
   LOGFINE("ThinClientPoolHADM: Starting maintain redundancy thread.");
+
+  redundancy_semaphore_.acquire();
   while (isRunning) {
-    m_redundancySema.acquire();
-    if (isRunning && !m_connManager.isNetDown()) {
+    if (!m_connManager.isNetDown()) {
       m_redundancyManager->maintainRedundancyLevel();
-      while (m_redundancySema.tryacquire() != -1) {
-      }
     }
+
+    redundancy_semaphore_.acquire();
   }
   LOGFINE("ThinClientPoolHADM: Ending maintain redundancy thread.");
 }
 
 int ThinClientPoolHADM::checkRedundancy(const ACE_Time_Value&, const void*) {
-  m_redundancySema.release();
+  redundancy_semaphore_.release();
   return 0;
 }
 
@@ -188,7 +189,7 @@ void ThinClientPoolHADM::sendNotificationCloseMsgs() {
           m_servermonitorTaskId);
     }
     m_redundancyTask->stopNoblock();
-    m_redundancySema.release();
+    redundancy_semaphore_.release();
     m_redundancyTask->wait();
     m_redundancyTask = nullptr;
     m_redundancyManager->sendNotificationCloseMsgs();
@@ -309,8 +310,9 @@ void ThinClientPoolHADM::sendNotConMesToAllregions() {
 std::shared_ptr<TcrEndpoint> ThinClientPoolHADM::createEP(
     const char* endpointName) {
   return std::make_shared<TcrPoolEndPoint>(
-      endpointName, m_connManager.getCacheImpl(), m_connManager.m_failoverSema,
-      m_connManager.m_cleanupSema, m_redundancySema, this);
+      endpointName, m_connManager.getCacheImpl(),
+      m_connManager.failover_semaphore_, m_connManager.cleanup_semaphore_,
+      redundancy_semaphore_, this);
 }
 
 }  // namespace client
diff --git a/cppcache/src/ThinClientPoolHADM.hpp b/cppcache/src/ThinClientPoolHADM.hpp
index 5bd45b6..dfdf86d 100644
--- a/cppcache/src/ThinClientPoolHADM.hpp
+++ b/cppcache/src/ThinClientPoolHADM.hpp
@@ -84,7 +84,7 @@ class ThinClientPoolHADM : public ThinClientPoolDM {
 
   GfErrType sendRequestToPrimary(TcrMessage& request, TcrMessageReply& reply);
 
-  void triggerRedundancyThread() override { m_redundancySema.release(); }
+  void triggerRedundancyThread() override { redundancy_semaphore_.release(); }
 
   bool isReadyForEvent() const;
 
@@ -107,7 +107,7 @@ class ThinClientPoolHADM : public ThinClientPoolDM {
   std::unique_ptr<ThinClientRedundancyManager> m_redundancyManager;
 
   TcrConnectionManager& m_theTcrConnManager;
-  ACE_Semaphore m_redundancySema;
+  binary_semaphore redundancy_semaphore_;
   std::unique_ptr<Task<ThinClientPoolHADM>> m_redundancyTask;
 
   void redundancy(std::atomic<bool>& isRunning);
diff --git a/cppcache/src/ThinClientRedundancyManager.cpp b/cppcache/src/ThinClientRedundancyManager.cpp
index 432cc4b..5a5cf63 100644
--- a/cppcache/src/ThinClientRedundancyManager.cpp
+++ b/cppcache/src/ThinClientRedundancyManager.cpp
@@ -56,6 +56,7 @@ ThinClientRedundancyManager::ThinClientRedundancyManager(
       m_locators(nullptr),
       m_servers(nullptr),
       m_periodicAckTask(nullptr),
+      periodic_ack_semaphore_(1),
       m_processEventIdMapTaskId(-1),
       m_nextAckInc(0),
       m_HAenabled(false) {}
@@ -682,7 +683,7 @@ void ThinClientRedundancyManager::close() {
           m_processEventIdMapTaskId);
     }
     m_periodicAckTask->stopNoblock();
-    m_periodicAckSema.release();
+    periodic_ack_semaphore_.release();
     m_periodicAckTask->wait();
     m_periodicAckTask = nullptr;
   }
@@ -1116,18 +1117,16 @@ void ThinClientRedundancyManager::readyForEvents() {
 
 int ThinClientRedundancyManager::processEventIdMap(const ACE_Time_Value&,
                                                    const void*) {
-  m_periodicAckSema.release();
+  periodic_ack_semaphore_.release();
   return 0;
 }
 
 void ThinClientRedundancyManager::periodicAck(std::atomic<bool>& isRunning) {
+  periodic_ack_semaphore_.acquire();
+
   while (isRunning) {
-    m_periodicAckSema.acquire();
-    if (isRunning) {
-      doPeriodicAck();
-      while (m_periodicAckSema.tryacquire() != -1) {
-      }
-    }
+    doPeriodicAck();
+    periodic_ack_semaphore_.acquire();
   }
 }
 
diff --git a/cppcache/src/ThinClientRedundancyManager.hpp b/cppcache/src/ThinClientRedundancyManager.hpp
index c048e0d..747d40a 100644
--- a/cppcache/src/ThinClientRedundancyManager.hpp
+++ b/cppcache/src/ThinClientRedundancyManager.hpp
@@ -34,6 +34,7 @@
 #include "ServerLocation.hpp"
 #include "Task.hpp"
 #include "TcrMessage.hpp"
+#include "util/concurrent/binary_semaphore.hpp"
 #include "util/synchronized_map.hpp"
 
 namespace apache {
@@ -136,7 +137,7 @@ class ThinClientRedundancyManager {
   inline bool isDurable();
   int processEventIdMap(const ACE_Time_Value&, const void*);
   std::unique_ptr<Task<ThinClientRedundancyManager>> m_periodicAckTask;
-  ACE_Semaphore m_periodicAckSema;
+  binary_semaphore periodic_ack_semaphore_;
   ExpiryTaskManager::id_type
       m_processEventIdMapTaskId;  // periodic check eventid map for notify ack
                                   // and/or expiry
diff --git a/cppcache/src/util/concurrent/binary_semaphore.cpp b/cppcache/src/util/concurrent/binary_semaphore.cpp
new file mode 100644
index 0000000..f9dd465
--- /dev/null
+++ b/cppcache/src/util/concurrent/binary_semaphore.cpp
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "binary_semaphore.hpp"
+
+namespace apache {
+namespace geode {
+namespace client {
+
+binary_semaphore::binary_semaphore(bool released) : released_(released) {}
+
+void binary_semaphore::release() {
+  std::lock_guard<std::mutex> lock(mutex_);
+  released_ = true;
+  cv_.notify_one();
+}
+
+void binary_semaphore::acquire() {
+  std::unique_lock<std::mutex> lock(mutex_);
+  cv_.wait(lock, [this]() { return released_; });
+  released_ = false;
+}
+
+}  // namespace client
+}  // namespace geode
+}  // namespace apache
diff --git a/cppcache/src/util/concurrent/binary_semaphore.hpp b/cppcache/src/util/concurrent/binary_semaphore.hpp
new file mode 100644
index 0000000..5d1e802
--- /dev/null
+++ b/cppcache/src/util/concurrent/binary_semaphore.hpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#ifndef GEODE_UTIL_CONCURRENT_BINARY_SEMAPHORE_H_
+#define GEODE_UTIL_CONCURRENT_BINARY_SEMAPHORE_H_
+
+#include <condition_variable>
+#include <mutex>
+
+namespace apache {
+namespace geode {
+namespace client {
+class binary_semaphore {
+ public:
+  explicit binary_semaphore(bool released);
+
+  void release();
+  void acquire();
+
+ protected:
+  bool released_;
+  std::mutex mutex_;
+  std::condition_variable cv_;
+};
+
+}  // namespace client
+}  // namespace geode
+}  // namespace apache
+
+#endif /* GEODE_UTIL_CONCURRENT_BINARY_SEMAPHORE_H_ */