You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2022/02/17 21:26:22 UTC

[geode-native] branch develop updated: GEODE-9324: Remove ACE_Task references (#812)

This is an automated email from the ASF dual-hosted git repository.

bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3876082  GEODE-9324: Remove ACE_Task references (#812)
3876082 is described below

commit 3876082529b0c74f516a8a7ee168cd0fa288080e
Author: Mario Salazar de Torres <ma...@est.tech>
AuthorDate: Thu Feb 17 22:24:58 2022 +0100

    GEODE-9324: Remove ACE_Task references (#812)
    
     - Removed all references to ACE_Task
     - Changed variable names to comply with styleguides
---
 cppcache/integration-test/CMakeLists.txt           |   3 +
 cppcache/integration-test/CacheHelper.cpp          |  13 +-
 cppcache/integration-test/ClientCleanup.cpp        |  36 +++
 cppcache/integration-test/ClientCleanup.hpp        |  35 +++
 .../integration-test/ThinClientSecurityHelper.hpp  | 116 +++++----
 .../integration-test/ThinClientTransactions.hpp    | 270 +++++++++++----------
 .../integration-test/ThinClientTransactionsXA.hpp  | 266 ++++++++++----------
 .../integration-test/ThinClientVersionedOps.hpp    |  16 +-
 cppcache/integration-test/TimeBomb.cpp             |  60 +++++
 cppcache/integration-test/TimeBomb.hpp             | 106 ++------
 cppcache/integration-test/fw_dunit.cpp             |  19 +-
 cppcache/integration-test/fw_dunit.hpp             |   3 +
 cppcache/integration-test/testExpiration.cpp       |   1 +
 .../integration-test/testOverflowPutGetSqLite.cpp  |  34 +--
 .../testRegionAccessThreadSafe.cpp                 |  53 ++--
 cppcache/integration-test/testSpinLock.cpp         | 138 ++++++-----
 .../integration-test/testThinClientCqFailover.cpp  |  31 +--
 .../testThinClientCqHAFailover.cpp                 |  30 +--
 .../testThinClientHAQueryFailover.cpp              |  27 +--
 .../testThinClientPoolAttrTest.cpp                 |  39 +--
 .../testThinClientRemoteQueryFailover.cpp          |  25 +-
 .../testThinClientRemoteQueryFailoverPdx.cpp       |  25 +-
 22 files changed, 733 insertions(+), 613 deletions(-)

diff --git a/cppcache/integration-test/CMakeLists.txt b/cppcache/integration-test/CMakeLists.txt
index 8dc9973..66dede7 100644
--- a/cppcache/integration-test/CMakeLists.txt
+++ b/cppcache/integration-test/CMakeLists.txt
@@ -18,9 +18,12 @@ project(nativeclient.tests.cppcache LANGUAGES CXX)
 
 add_library(test-cppcache-utils STATIC
   fw_dunit.cpp
+  ClientCleanup.cpp
   CacheHelper.cpp
   CacheableWrapper.cpp
+  TimeBomb.cpp
 )
+
 target_link_libraries(test-cppcache-utils
   PRIVATE
     integration-framework
diff --git a/cppcache/integration-test/CacheHelper.cpp b/cppcache/integration-test/CacheHelper.cpp
index 1e632a1..e65f15b 100644
--- a/cppcache/integration-test/CacheHelper.cpp
+++ b/cppcache/integration-test/CacheHelper.cpp
@@ -1016,11 +1016,14 @@ void CacheHelper::initServer(int instance, const std::string &xml,
                              const char * /*unused*/, bool ssl,
                              bool enableDelta, bool, bool testServerGC,
                              bool untrustedCert, bool useSecurityManager) {
-  if (!isServerCleanupCallbackRegistered &&
-      gClientCleanup.registerCallback(&CacheHelper::cleanupServerInstances)) {
+  if (!isServerCleanupCallbackRegistered) {
     isServerCleanupCallbackRegistered = true;
+    gClientCleanup.registerCallback(
+        []() { CacheHelper::cleanupServerInstances(); });
+
     std::cout << "TimeBomb registered server cleanupcallback \n";
   }
+
   std::cout << "Inside initServer added\n";
 
   static const auto gfjavaenv = Utils::getEnv("GFJAVA");
@@ -1424,9 +1427,11 @@ void CacheHelper::initLocator(int instance, bool ssl, bool, int dsId,
                               bool useSecurityManager) {
   static const auto gfjavaenv = Utils::getEnv("GFJAVA");
 
-  if (!isLocatorCleanupCallbackRegistered &&
-      gClientCleanup.registerCallback(&CacheHelper::cleanupLocatorInstances)) {
+  if (!isLocatorCleanupCallbackRegistered) {
     isLocatorCleanupCallbackRegistered = true;
+
+    gClientCleanup.registerCallback(
+        []() { CacheHelper::cleanupLocatorInstances(); });
   }
 
   std::string currDir = boost::filesystem::current_path().string();
diff --git a/cppcache/integration-test/ClientCleanup.cpp b/cppcache/integration-test/ClientCleanup.cpp
new file mode 100644
index 0000000..2727174
--- /dev/null
+++ b/cppcache/integration-test/ClientCleanup.cpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ClientCleanup.hpp"
+
+#include <iostream>
+
+void ClientCleanup::trigger() {
+  for (auto& callback : callbacks_) {
+    try {
+      callback();
+    } catch (std::exception& e) {
+      std::clog << "Exception while executing cleanup: " << e.what()
+                << std::endl
+                << std::flush;
+    }
+  }
+}
+
+void ClientCleanup::registerCallback(std::function<void()> callback) {
+  callbacks_.emplace_back(callback);
+}
diff --git a/cppcache/integration-test/ClientCleanup.hpp b/cppcache/integration-test/ClientCleanup.hpp
new file mode 100644
index 0000000..33cb949
--- /dev/null
+++ b/cppcache/integration-test/ClientCleanup.hpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#ifndef GEODE_INTEGRATION_TEST_CLIENT_CLEANUP_H_
+#define GEODE_INTEGRATION_TEST_CLIENT_CLEANUP_H_
+
+#include <functional>
+#include <vector>
+
+class ClientCleanup {
+ public:
+  void trigger();
+  void registerCallback(std::function<void()> callback);
+
+ protected:
+  std::vector<std::function<void()>> callbacks_;
+};
+
+#endif  // GEODE_INTEGRATION_TEST_CLIENT_CLEANUP_H_
diff --git a/cppcache/integration-test/ThinClientSecurityHelper.hpp b/cppcache/integration-test/ThinClientSecurityHelper.hpp
index c8ef291..35f32bc 100644
--- a/cppcache/integration-test/ThinClientSecurityHelper.hpp
+++ b/cppcache/integration-test/ThinClientSecurityHelper.hpp
@@ -170,46 +170,54 @@ void initClientAuth(char UserType) {
   }
 }
 
-// This putThread class is used in
+// This PutThread class is used in
 // testThinClientTracking,testThinClientTicket304, testThinClientTicket317
 
-class putThread : public ACE_Task_Base {
+class PutThread {
  public:
-  explicit putThread(std::shared_ptr<Region> r, bool regInt = false,
-                     int waitTime = 0) {
-    m_reg = r;
-    m_regInt = regInt;
-    m_numthreads = 1;
-    m_numops = 0;
-    m_isCallBack = false;
-    m_sameKey = false;
-    m_waitTime = waitTime;
+  explicit PutThread(std::shared_ptr<Region> r, bool regInt = false,
+                     int waitTime = 0)
+      : region_{r},
+        numOps_{0},
+        numThreads_{1},
+        isCallback_{false},
+        sameKey_{false},
+        regInt_{regInt},
+        waitTime_{waitTime} {
   }
 
   void setParams(int opcode, int numofops, int numthreads,
                  bool isCallBack = false, bool sameKey = false,
                  int waitTime = 0) {  //
-    m_opcode = opcode;
-    m_numops = numofops;
-    m_numthreads = numthreads;
-    m_isCallBack = isCallBack;
-    m_sameKey = sameKey;
-    m_waitTime = waitTime;
+    opcode_ = opcode;
+    numOps_ = numofops;
+    numThreads_ = numthreads;
+    isCallback_ = isCallBack;
+    sameKey_ = sameKey;
+    waitTime_ = waitTime;
   }
 
   void start() {
-    m_run = true;
-    activate(THR_NEW_LWP | THR_JOINABLE, m_numthreads);
+    for(auto i = 0; i < numThreads_; ++i) {
+      threads_.emplace_back([this](){
+        run();
+      });
+    }
+  }
+
+  void wait() {
+    for(auto& thread : threads_) {
+      if(thread.joinable()) {
+        thread.join();
+      }
+    }
   }
 
   void stop() {
-    if (m_run) {
-      m_run = false;
       wait();
-    }
   }
 
-  int svc() override {
+  void run() {
     int ops = 0;
     std::string key_str;
     std::shared_ptr<CacheableKey> key;
@@ -217,49 +225,49 @@ class putThread : public ACE_Task_Base {
     std::vector<std::shared_ptr<CacheableKey>> keys0;
 
     auto pid = boost::this_process::get_id();
-    if (m_regInt) {
-      m_reg->registerAllKeys(false, true);
+    if (regInt_) {
+      region_->registerAllKeys(false, true);
     }
-    if (m_waitTime != 0) {
-      std::this_thread::sleep_for(std::chrono::seconds{m_waitTime});
+    if (waitTime_ != 0) {
+      std::this_thread::sleep_for(std::chrono::seconds{waitTime_});
     }
-    while (ops++ < m_numops) {
-      if (m_sameKey) {
+    while (ops++ < numOps_) {
+      if (sameKey_) {
         key_str = "key-1";
       } else {
         key_str = "key-" + std::to_string(ops);
       }
 
       key = CacheableKey::create(key_str);
-      if (m_opcode == 0) {
+      if (opcode_ == 0) {
         std::string value_str;
 
-        if (m_isCallBack) {
+        if (isCallback_) {
           auto boolptr = CacheableBoolean::create("true");
           value_str = "client1-value" + std::to_string(ops);
           value = CacheableString::create(value_str);
-          m_reg->put(key, value, boolptr);
+          region_->put(key, value, boolptr);
         } else {
           value_str = "client2-value" + std::to_string(ops);
           value = CacheableString::create(value_str);
-          m_reg->put(key, value);
+          region_->put(key, value);
         }
-      } else if (m_opcode == 1) {
-        m_reg->get(key);
-      } else if (m_opcode == 5) {
+      } else if (opcode_ == 1) {
+        region_->get(key);
+      } else if (opcode_ == 5) {
         keys0.push_back(key);
-        if (ops == m_numops) {
-          m_reg->registerKeys(keys0, false, true);
+        if (ops == numOps_) {
+          region_->registerKeys(keys0, false, true);
         }
-      } else if (m_opcode == 6) {
-        m_reg->registerRegex("key-[1-3]", false, true);
+      } else if (opcode_ == 6) {
+        region_->registerRegex("key-[1-3]", false, true);
       } else {
         try {
-          if (m_isCallBack) {
+          if (isCallback_) {
             auto boolptr = CacheableBoolean::create("true");
-            m_reg->destroy(key, boolptr);
+            region_->destroy(key, boolptr);
           } else {
-            m_reg->destroy(key);
+            region_->destroy(key);
           }
         } catch (Exception& ex) {
           auto tid =
@@ -268,18 +276,20 @@ class putThread : public ACE_Task_Base {
         }
       }
     }
-    return 0;
   }
 
-  std::shared_ptr<Region> m_reg;
-  bool m_run;
-  int m_opcode;
-  int m_numops;
-  int m_numthreads;
-  bool m_isCallBack;
-  bool m_sameKey;
-  bool m_regInt;
-  int m_waitTime;
+ protected:
+  std::shared_ptr<Region> region_;
+
+  int opcode_;
+  int numOps_;
+  int numThreads_;
+  bool isCallback_;
+  bool sameKey_;
+  bool regInt_;
+  int waitTime_;
+
+  std::vector<std::thread> threads_;
 };
 
 }  // namespace
diff --git a/cppcache/integration-test/ThinClientTransactions.hpp b/cppcache/integration-test/ThinClientTransactions.hpp
index d1f24a4..5ff3d01 100644
--- a/cppcache/integration-test/ThinClientTransactions.hpp
+++ b/cppcache/integration-test/ThinClientTransactions.hpp
@@ -21,11 +21,9 @@
 #define GEODE_INTEGRATION_TEST_THINCLIENTTRANSACTIONS_H_
 
 #include "fw_dunit.hpp"
-#include <ace/Auto_Event.h>
-#include <ace/OS.h>
-#include <ace/High_Res_Timer.h>
 
 #include <string>
+
 #include <geode/TransactionId.hpp>
 #include <geode/CacheTransactionManager.hpp>
 
@@ -33,9 +31,11 @@
 #define ROOT_SCOPE DISTRIBUTED_ACK
 
 #include "CacheHelper.hpp"
+#include "util/concurrent/binary_semaphore.hpp"
 
 namespace {  // NOLINT(google-build-namespaces)
 
+using apache::geode::client::binary_semaphore;
 using apache::geode::client::CacheableKey;
 using apache::geode::client::CacheableString;
 using apache::geode::client::CacheHelper;
@@ -358,40 +358,34 @@ const bool NO_ACK = false;
 #define THREADERRORCHECK(x, y) \
   do {                         \
     if (!(x)) {                \
-      m_isFailed = true;       \
-      m_error = y;             \
-      return -1;               \
+      failed_ = true;          \
+      error_ = y;              \
+      return;                  \
     }                          \
   } while (0)
 
-class SuspendTransactionThread : public ACE_Task_Base {
- private:
-  TransactionId* m_suspendedTransaction;
-  bool m_sleep;
-  ACE_Auto_Event* m_txEvent;
-
+class SuspendTransactionThread {
  public:
-  SuspendTransactionThread(bool sleep, ACE_Auto_Event* txEvent)
-      : m_suspendedTransaction(nullptr), m_sleep(sleep), m_txEvent(txEvent) {}
+  SuspendTransactionThread(bool sleep, binary_semaphore& event)
+      : sleep_{sleep}, sem_{event} {}
 
-  int svc(void) override {
+  void run() {
     LOG(" In SuspendTransactionThread");
-
     auto txManager = getHelper()->getCache()->getCacheTransactionManager();
 
     txManager->begin();
-
     createEntry(regionNames[0], keys[4], vals[4]);
     createEntry(regionNames[1], keys[5], vals[5]);
 
-    m_suspendedTransaction = &txManager->getTransactionId();
+    tx_ = &txManager->getTransactionId();
+
+    if (sleep_) {
+      sem_.acquire();
 
-    if (m_sleep) {
-      m_txEvent->wait();
-      std::this_thread::sleep_for(std::chrono::seconds(5));
+      std::this_thread::sleep_for(std::chrono::seconds{5});
     }
 
-    m_suspendedTransaction = &txManager->suspend();
+    tx_ = &txManager->suspend();
     LOG(" Out SuspendTransactionThread");
 
     getHelper()
@@ -399,32 +393,35 @@ class SuspendTransactionThread : public ACE_Task_Base {
         ->getPoolManager()
         .find("__TESTPOOL1_")
         ->releaseThreadLocalConnection();
+  }
 
-    return 0;
+  void start() {
+    thread_ = std::thread{[this]() { run(); }};
   }
-  void start() { activate(); }
-  void stop() { wait(); }
-  TransactionId& getSuspendedTx() { return *m_suspendedTransaction; }
+
+  void stop() {
+    if (thread_.joinable()) {
+      thread_.join();
+    }
+  }
+
+  TransactionId& getSuspendedTx() { return *tx_; }
+
+ protected:
+  bool sleep_;
+  std::thread thread_;
+  binary_semaphore& sem_;
+  TransactionId* tx_{nullptr};
 };
-class ResumeTransactionThread : public ACE_Task_Base {
- private:
-  TransactionId& m_suspendedTransaction;
-  bool m_commit;
-  bool m_tryResumeWithSleep;
-  bool m_isFailed;
-  std::string m_error;
-  ACE_Auto_Event* m_txEvent;
 
+class ResumeTransactionThread {
  public:
-  ResumeTransactionThread(TransactionId& suspendedTransaction, bool commit,
-                          bool tryResumeWithSleep, ACE_Auto_Event* txEvent)
-      : m_suspendedTransaction(suspendedTransaction),
-        m_commit(commit),
-        m_tryResumeWithSleep(tryResumeWithSleep),
-        m_isFailed(false),
-        m_txEvent(txEvent) {}
-
-  int svc(void) override {
+
+  ResumeTransactionThread(TransactionId& tx, bool commit, bool sleep,
+                          binary_semaphore& event)
+      : tx_(tx), commit_{commit}, sleep_(sleep), failed_{false}, sem_{event} {}
+
+  void run() {
     LOG("In ResumeTransactionThread");
 
     auto regPtr0 = getHelper()->getRegion(regionNames[0]);
@@ -448,25 +445,25 @@ class ResumeTransactionThread : public ACE_Task_Base {
                      "found in region.");
 
     auto txManager = getHelper()->getCache()->getCacheTransactionManager();
-    if (m_tryResumeWithSleep) {
-      THREADERRORCHECK(!txManager->isSuspended(m_suspendedTransaction),
+    if (sleep_) {
+      THREADERRORCHECK(!txManager->isSuspended(tx_),
                        "In ResumeTransactionThread - the transaction should "
                        "NOT be in suspended state");
     } else {
-      THREADERRORCHECK(txManager->isSuspended(m_suspendedTransaction),
+      THREADERRORCHECK(txManager->isSuspended(tx_),
                        "In ResumeTransactionThread - the transaction should be "
                        "in suspended state");
     }
 
     THREADERRORCHECK(
-        txManager->exists(m_suspendedTransaction),
+        txManager->exists(tx_),
         "In ResumeTransactionThread - the transaction should exist");
 
-    if (m_tryResumeWithSleep) {
-      m_txEvent->signal();
-      txManager->tryResume(m_suspendedTransaction, std::chrono::seconds(30));
+    if (sleep_) {
+      sem_.release();
+      txManager->tryResume(tx_, std::chrono::seconds(30));
     } else {
-      txManager->resume(m_suspendedTransaction);
+      txManager->resume(tx_);
     }
 
     THREADERRORCHECK(
@@ -478,7 +475,7 @@ class ResumeTransactionThread : public ACE_Task_Base {
 
     createEntry(regionNames[1], keys[6], vals[6]);
 
-    if (m_commit) {
+    if (commit_) {
       txManager->commit();
       THREADERRORCHECK(
           regPtr0->containsKeyOnServer(keyPtr4),
@@ -503,7 +500,7 @@ class ResumeTransactionThread : public ACE_Task_Base {
                        "found in region.");
     }
 
-    if (m_commit) {
+    if (commit_) {
       regPtr1->destroy(keyPtr6);
       regPtr1->destroy(keyPtr5);
       regPtr0->destroy(keyPtr4);
@@ -537,18 +534,38 @@ class ResumeTransactionThread : public ACE_Task_Base {
         LOG("Got expected EntryNotFoundException for keyPtr4");
       }
     }
+
     getHelper()
         ->getCache()
         ->getPoolManager()
         .find("__TESTPOOL1_")
         ->releaseThreadLocalConnection();
     LOG(" Out ResumeTransactionThread");
-    return 0;
   }
-  void start() { activate(); }
-  void stop() { wait(); }
-  bool isFailed() { return m_isFailed; }
-  std::string getError() { return m_error; }
+
+  void start() {
+    thread_ = std::thread{[this]() { run(); }};
+  }
+
+  void stop() {
+    if (thread_.joinable()) {
+      thread_.join();
+    }
+  }
+
+  bool isFailed() { return failed_; }
+
+  std::string getError() { return error_; }
+
+ protected:
+  TransactionId& tx_;
+  bool commit_;
+  bool sleep_;
+  bool failed_;
+  std::string error_;
+
+  std::thread thread_;
+  binary_semaphore& sem_;
 };
 
 DUNIT_TASK_DEFINITION(SERVER1, CreateServer1)
@@ -769,75 +786,78 @@ DUNIT_TASK_DEFINITION(CLIENT1, SuspendResumeRollback)
 END_TASK_DEFINITION
 DUNIT_TASK_DEFINITION(CLIENT1, SuspendResumeInThread)
   {
-    // start suspend thread  and resume thread and rollback immedidately
-    LOG("start suspend thread  and resume thread and rollback immedidately");
-    ACE_Auto_Event txEvent;
-
-    SuspendTransactionThread* suspendTh =
-        new SuspendTransactionThread(false, &txEvent);
-    suspendTh->activate();
-    std::this_thread::sleep_for(std::chrono::seconds(2));
-    ResumeTransactionThread* resumeTh = new ResumeTransactionThread(
-        suspendTh->getSuspendedTx(), false, false, &txEvent);
-    resumeTh->activate();
-
-    suspendTh->wait();
-    delete suspendTh;
-    resumeTh->wait();
-    ASSERT(!resumeTh->isFailed(), resumeTh->getError());
-    delete resumeTh;
-
-    // start suspend thread  and resume thread and commit immedidately
-    LOG("start suspend thread  and resume thread and commit immedidately");
-    suspendTh = new SuspendTransactionThread(false, &txEvent);
-    suspendTh->activate();
-    std::this_thread::sleep_for(std::chrono::seconds(2));
-    resumeTh = new ResumeTransactionThread(suspendTh->getSuspendedTx(), true,
-                                           false, &txEvent);
-    resumeTh->activate();
-
-    suspendTh->wait();
-    delete suspendTh;
-    resumeTh->wait();
-    ASSERT(!resumeTh->isFailed(), resumeTh->getError());
-    delete resumeTh;
-
-    // start suspend thread  and tryresume thread with rollback. make tryResume
-    // to
-    // sleep
-    LOG("start suspend thread  and tryresume thread with rollback. make "
+    LOG("Start suspend thread and resume thread and rollback immedidately");
+
+    {
+      binary_semaphore event{0};
+      SuspendTransactionThread suspend{false, event};
+      suspend.start();
+
+      std::this_thread::sleep_for(std::chrono::seconds{2});
+      ResumeTransactionThread resume{suspend.getSuspendedTx(), false, false,
+                                     event};
+      resume.start();
+
+      suspend.stop();
+      resume.stop();
+
+      ASSERT(!resume.isFailed(), resume.getError());
+    }
+
+    LOG("Start suspend thread  and resume thread and commit immedidately");
+    {
+      binary_semaphore event{0};
+      SuspendTransactionThread suspend{false, event};
+
+      suspend.start();
+      std::this_thread::sleep_for(std::chrono::seconds{2});
+      ResumeTransactionThread resume{suspend.getSuspendedTx(), true, false,
+                                     event};
+      resume.start();
+
+      suspend.stop();
+      resume.stop();
+      ASSERT(!resume.isFailed(), resume.getError());
+    }
+
+    LOG("Start suspend thread  and tryresume thread with rollback. make "
         "tryResume to sleep");
-    suspendTh = new SuspendTransactionThread(true, &txEvent);
-    suspendTh->activate();
-    std::this_thread::sleep_for(std::chrono::seconds(2));
-    resumeTh = new ResumeTransactionThread(suspendTh->getSuspendedTx(), false,
-                                           true, &txEvent);
-    resumeTh->activate();
-
-    suspendTh->wait();
-    delete suspendTh;
-    resumeTh->wait();
-    ASSERT(!resumeTh->isFailed(), resumeTh->getError());
-    delete resumeTh;
-
-    // start suspend thread  and tryresume thread with commit. make tryResume to
-    // sleep
-    LOG("start suspend thread  and tryresume thread with commit. make "
+    {
+      binary_semaphore event{0};
+      SuspendTransactionThread suspend{true, event};
+      suspend.start();
+
+      std::this_thread::sleep_for(std::chrono::seconds{2});
+      ResumeTransactionThread resume{suspend.getSuspendedTx(), false, true,
+                                     event};
+      resume.start();
+
+      suspend.stop();
+      resume.stop();
+
+      ASSERT(!resume.isFailed(), resume.getError());
+    }
+
+    LOG("Start suspend thread  and tryresume thread with commit. make "
         "tryResume to sleep");
-    suspendTh = new SuspendTransactionThread(true, &txEvent);
-    suspendTh->activate();
-    std::this_thread::sleep_for(std::chrono::seconds(2));
-    LOG("suspendTh->activate();");
-
-    resumeTh = new ResumeTransactionThread(suspendTh->getSuspendedTx(), true,
-                                           true, &txEvent);
-    resumeTh->activate();
-
-    suspendTh->wait();
-    delete suspendTh;
-    resumeTh->wait();
-    ASSERT(!resumeTh->isFailed(), resumeTh->getError());
-    delete resumeTh;
+
+    {
+      binary_semaphore event{0};
+      SuspendTransactionThread suspend{true, event};
+      suspend.start();
+
+      std::this_thread::sleep_for(std::chrono::seconds{2});
+      LOG("suspendTh->activate();");
+
+      ResumeTransactionThread resume{suspend.getSuspendedTx(), true, true,
+                                     event};
+      resume.start();
+
+      suspend.stop();
+      resume.stop();
+
+      ASSERT(!resume.isFailed(), resume.getError());
+    }
   }
 END_TASK_DEFINITION
 
diff --git a/cppcache/integration-test/ThinClientTransactionsXA.hpp b/cppcache/integration-test/ThinClientTransactionsXA.hpp
index faf335e..e3563f7 100644
--- a/cppcache/integration-test/ThinClientTransactionsXA.hpp
+++ b/cppcache/integration-test/ThinClientTransactionsXA.hpp
@@ -21,9 +21,6 @@
 #define GEODE_INTEGRATION_TEST_THINCLIENTTRANSACTIONSXA_H_
 
 #include "fw_dunit.hpp"
-#include <ace/Auto_Event.h>
-#include <ace/OS.h>
-#include <ace/High_Res_Timer.h>
 
 #include <string>
 #include <geode/TransactionId.hpp>
@@ -34,8 +31,11 @@
 
 #include "CacheHelper.hpp"
 
+#include "util/concurrent/binary_semaphore.hpp"
+
 namespace {  // NOLINT(google-build-namespaces)
 
+using apache::geode::client::binary_semaphore;
 using apache::geode::client::CacheableKey;
 using apache::geode::client::CacheableString;
 using apache::geode::client::CacheHelper;
@@ -185,8 +185,8 @@ void createPooledRegion(const std::string& name, bool ackMode,
                         bool cachingEnable = true) {
   LOG("createRegion_Pool() entered.");
   std::cout << "Creating region --  " << name << " ackMode is " << ackMode
-            << "\n"
-            << std::flush;
+            << "\n" << std::flush;
+
   auto regPtr =
       getHelper()->createPooledRegion(name, ackMode, locators, poolname,
                                       cachingEnable, clientNotificationEnabled);
@@ -200,9 +200,10 @@ void createPooledRegionSticky(const std::string& name, bool ackMode,
                               bool clientNotificationEnabled = false,
                               bool cachingEnable = true) {
   LOG("createRegion_Pool() entered.");
+
   std::cout << "Creating region --  " << name << " ackMode is " << ackMode
-            << "\n"
-            << std::flush;
+            << "\n" << std::flush;
+
   auto regPtr = getHelper()->createPooledRegionSticky(
       name, ackMode, locators, poolname, cachingEnable,
       clientNotificationEnabled);
@@ -355,23 +356,18 @@ const bool NO_ACK = false;
 #define THREADERRORCHECK(x, y) \
   do {                         \
     if (!(x)) {                \
-      m_isFailed = true;       \
-      m_error = y;             \
-      return -1;               \
+      failed_ = true;          \
+      error_ = y;              \
+      return;                  \
     }                          \
   } while (0)
 
-class SuspendTransactionThread : public ACE_Task_Base {
- private:
-  TransactionId* m_suspendedTransaction;
-  bool m_sleep;
-  ACE_Auto_Event* m_txEvent;
-
+class SuspendTransactionThread {
  public:
-  SuspendTransactionThread(bool sleep, ACE_Auto_Event* txEvent)
-      : m_suspendedTransaction(nullptr), m_sleep(sleep), m_txEvent(txEvent) {}
+  SuspendTransactionThread(bool sleep, binary_semaphore& event)
+      : sleep_{sleep}, tx_{nullptr}, sem_(event) {}
 
-  int svc(void) override {
+  void run() {
     LOG(" In SuspendTransactionThread");
 
     auto txManager = getHelper()->getCache()->getCacheTransactionManager();
@@ -380,14 +376,14 @@ class SuspendTransactionThread : public ACE_Task_Base {
 
     createEntry(regionNames[0], keys[4], vals[4]);
 
-    m_suspendedTransaction = &txManager->getTransactionId();
+    tx_ = &txManager->getTransactionId();
 
-    if (m_sleep) {
-      m_txEvent->wait();
-      std::this_thread::sleep_for(std::chrono::seconds(5));
+    if (sleep_) {
+      sem_.acquire();
+      std::this_thread::sleep_for(std::chrono::seconds{5});
     }
 
-    m_suspendedTransaction = &txManager->suspend();
+    tx_ = &txManager->suspend();
     LOG(" Out SuspendTransactionThread");
 
     getHelper()
@@ -395,32 +391,38 @@ class SuspendTransactionThread : public ACE_Task_Base {
         ->getPoolManager()
         .find("__TESTPOOL1_")
         ->releaseThreadLocalConnection();
+  }
 
-    return 0;
+  void start() {
+    thread_ = std::thread{[this]() { run(); }};
   }
-  void start() { activate(); }
-  void stop() { wait(); }
-  TransactionId& getSuspendedTx() { return *m_suspendedTransaction; }
+
+  void stop() {
+    if (thread_.joinable()) {
+      thread_.join();
+    }
+  }
+
+  TransactionId& getSuspendedTx() { return *tx_; }
+
+ protected:
+  bool sleep_;
+  TransactionId* tx_;
+  std::thread thread_;
+  binary_semaphore& sem_;
 };
-class ResumeTransactionThread : public ACE_Task_Base {
- private:
-  TransactionId& m_suspendedTransaction;
-  bool m_commit;
-  bool m_tryResumeWithSleep;
-  bool m_isFailed;
-  std::string m_error;
-  ACE_Auto_Event* m_txEvent;
 
+class ResumeTransactionThread {
  public:
   ResumeTransactionThread(TransactionId& suspendedTransaction, bool commit,
-                          bool tryResumeWithSleep, ACE_Auto_Event* txEvent)
-      : m_suspendedTransaction(suspendedTransaction),
-        m_commit(commit),
-        m_tryResumeWithSleep(tryResumeWithSleep),
-        m_isFailed(false),
-        m_txEvent(txEvent) {}
-
-  int svc(void) override {
+                          bool tryResumeWithSleep, binary_semaphore& event)
+      : commit_{commit},
+        sleep_{tryResumeWithSleep},
+        failed_{false},
+        tx_{suspendedTransaction},
+        sem_{event} {}
+
+  void run() {
     LOG("In ResumeTransactionThread");
 
     auto regPtr0 = getHelper()->getRegion(regionNames[0]);
@@ -436,32 +438,32 @@ class ResumeTransactionThread : public ACE_Task_Base {
                      "found in region.");
 
     auto txManager = getHelper()->getCache()->getCacheTransactionManager();
-    if (m_tryResumeWithSleep) {
-      THREADERRORCHECK(!txManager->isSuspended(m_suspendedTransaction),
+    if (sleep_) {
+      THREADERRORCHECK(!txManager->isSuspended(tx_),
                        "In ResumeTransactionThread - the transaction should "
                        "NOT be in suspended state");
     } else {
-      THREADERRORCHECK(txManager->isSuspended(m_suspendedTransaction),
+      THREADERRORCHECK(txManager->isSuspended(tx_),
                        "In ResumeTransactionThread - the transaction should be "
                        "in suspended state");
     }
 
     THREADERRORCHECK(
-        txManager->exists(m_suspendedTransaction),
+        txManager->exists(tx_),
         "In ResumeTransactionThread - the transaction should exist");
 
-    if (m_tryResumeWithSleep) {
-      m_txEvent->signal();
-      txManager->tryResume(m_suspendedTransaction, std::chrono::seconds(30));
+    if (sleep_) {
+      sem_.release();
+      txManager->tryResume(tx_, std::chrono::seconds{30});
     } else {
-      txManager->resume(m_suspendedTransaction);
+      txManager->resume(tx_);
     }
 
     THREADERRORCHECK(
         regPtr0->containsKeyOnServer(keyPtr4),
         "In ResumeTransactionThread - Key should have been found in region.");
 
-    if (m_commit) {
+    if (commit_) {
       txManager->prepare();
       txManager->commit();
       THREADERRORCHECK(
@@ -475,7 +477,7 @@ class ResumeTransactionThread : public ACE_Task_Base {
                        "found in region.");
     }
 
-    if (m_commit) {
+    if (commit_) {
       regPtr0->destroy(keyPtr4);
 
       THREADERRORCHECK(!regPtr0->containsKeyOnServer(keyPtr4),
@@ -489,18 +491,36 @@ class ResumeTransactionThread : public ACE_Task_Base {
         LOG("Got expected EntryNotFoundException for keyPtr4");
       }
     }
+
     getHelper()
         ->getCache()
         ->getPoolManager()
         .find("__TESTPOOL1_")
         ->releaseThreadLocalConnection();
-    LOG(" Out ResumeTransactionThread");
-    return 0;
+    LOG("Out ResumeTransactionThread");
+  }
+
+  void start() {
+    thread_ = std::thread{[this]() { run(); }};
+  }
+
+  void stop() {
+    if (thread_.joinable()) {
+      thread_.join();
+    }
   }
-  void start() { activate(); }
-  void stop() { wait(); }
-  bool isFailed() { return m_isFailed; }
-  std::string getError() { return m_error; }
+
+  bool isFailed() { return failed_; }
+  const std::string& getError() { return error_; }
+
+ protected:
+  bool commit_;
+  bool sleep_;
+  bool failed_;
+  std::string error_;
+  TransactionId& tx_;
+  std::thread thread_;
+  binary_semaphore& sem_;
 };
 
 DUNIT_TASK_DEFINITION(SERVER1, CreateServer1)
@@ -682,71 +702,71 @@ DUNIT_TASK_DEFINITION(CLIENT1, SuspendResumeRollback)
 END_TASK_DEFINITION
 DUNIT_TASK_DEFINITION(CLIENT1, SuspendResumeInThread)
   {
-    // start suspend thread  and resume thread and rollback immedidately
-    LOG("start suspend thread  and resume thread and rollback immedidately");
-    ACE_Auto_Event txEvent;
-
-    SuspendTransactionThread* suspendTh =
-        new SuspendTransactionThread(false, &txEvent);
-    suspendTh->activate();
-    std::this_thread::sleep_for(std::chrono::seconds(2));
-    ResumeTransactionThread* resumeTh = new ResumeTransactionThread(
-        suspendTh->getSuspendedTx(), false, false, &txEvent);
-    resumeTh->activate();
-
-    suspendTh->wait();
-    delete suspendTh;
-    resumeTh->wait();
-    delete resumeTh;
-
-    // start suspend thread  and resume thread and commit immedidately
-    LOG("start suspend thread  and resume thread and commit immedidately");
-    suspendTh = new SuspendTransactionThread(false, &txEvent);
-    suspendTh->activate();
-    std::this_thread::sleep_for(std::chrono::seconds(2));
-    resumeTh = new ResumeTransactionThread(suspendTh->getSuspendedTx(), true,
-                                           false, &txEvent);
-    resumeTh->activate();
-
-    suspendTh->wait();
-    delete suspendTh;
-    resumeTh->wait();
-    delete resumeTh;
-
-    // start suspend thread  and tryresume thread with rollback. make tryResume
-    // to
-    // sleep
-    LOG("start suspend thread  and tryresume thread with rollback. make "
+    LOG("Start suspend thread  and resume thread and rollback immediately");
+    {
+      binary_semaphore event{0};
+      SuspendTransactionThread suspend{false, event};
+
+      suspend.start();
+      std::this_thread::sleep_for(std::chrono::seconds{2});
+
+      ResumeTransactionThread resume{suspend.getSuspendedTx(), false, false,
+                                     event};
+      resume.start();
+
+      suspend.stop();
+      resume.stop();
+    }
+
+    LOG("Start suspend thread  and resume thread and commit immediately");
+    {
+      binary_semaphore event{0};
+      SuspendTransactionThread suspend{false, event};
+
+      suspend.start();
+      std::this_thread::sleep_for(std::chrono::seconds{2});
+
+      ResumeTransactionThread resume{suspend.getSuspendedTx(), true, false,
+                                     event};
+      resume.start();
+
+      suspend.stop();
+      resume.stop();
+    }
+
+    LOG("Start suspend thread  and tryresume thread with rollback. make "
         "tryResume to sleep");
-    suspendTh = new SuspendTransactionThread(true, &txEvent);
-    suspendTh->activate();
-    std::this_thread::sleep_for(std::chrono::seconds(2));
-    resumeTh = new ResumeTransactionThread(suspendTh->getSuspendedTx(), false,
-                                           true, &txEvent);
-    resumeTh->activate();
-
-    suspendTh->wait();
-    delete suspendTh;
-    resumeTh->wait();
-    delete resumeTh;
-
-    // start suspend thread  and tryresume thread with commit. make tryResume to
-    // sleep
-    LOG("start suspend thread  and tryresume thread with commit. make "
+    {
+      binary_semaphore event{0};
+      SuspendTransactionThread suspend{true, event};
+
+      suspend.start();
+      std::this_thread::sleep_for(std::chrono::seconds{2});
+
+      ResumeTransactionThread resume{suspend.getSuspendedTx(), false, true,
+                                     event};
+      resume.start();
+
+      suspend.stop();
+      resume.stop();
+    }
+
+    LOG("Start suspend thread  and tryresume thread with commit. make "
         "tryResume to sleep");
-    suspendTh = new SuspendTransactionThread(true, &txEvent);
-    suspendTh->activate();
-    std::this_thread::sleep_for(std::chrono::seconds(2));
-    LOG("suspendTh->activate();");
-
-    resumeTh = new ResumeTransactionThread(suspendTh->getSuspendedTx(), true,
-                                           true, &txEvent);
-    resumeTh->activate();
-
-    suspendTh->wait();
-    delete suspendTh;
-    resumeTh->wait();
-    delete resumeTh;
+    {
+      binary_semaphore event{0};
+      SuspendTransactionThread suspend{true, event};
+
+      suspend.start();
+      std::this_thread::sleep_for(std::chrono::seconds{2});
+
+      ResumeTransactionThread resume{suspend.getSuspendedTx(), true, true,
+                                     event};
+      resume.start();
+
+      suspend.stop();
+      resume.stop();
+    }
   }
 END_TASK_DEFINITION
 
diff --git a/cppcache/integration-test/ThinClientVersionedOps.hpp b/cppcache/integration-test/ThinClientVersionedOps.hpp
index 95e970c..cc37642 100644
--- a/cppcache/integration-test/ThinClientVersionedOps.hpp
+++ b/cppcache/integration-test/ThinClientVersionedOps.hpp
@@ -32,10 +32,10 @@
 
 // This is the test for tracking work. bug#304
 
-putThread *thread1 = nullptr;
-putThread *thread2 = nullptr;
-putThread *thread3 = nullptr;
-putThread *thread4 = nullptr;
+PutThread* thread1 = nullptr;
+PutThread* thread2 = nullptr;
+PutThread* thread3 = nullptr;
+PutThread* thread4 = nullptr;
 
 const char *regNames[] = {"DistRegionAck", "DistRegionNoAck"};
 const char *group1 = "A";
@@ -185,7 +185,7 @@ END_TASK_DEFINITION
 DUNIT_TASK_DEFINITION(CLIENT1, threadPutonClient1)
   {
     auto rptr = getHelper()->getRegion(regNames[0]);
-    thread4 = new putThread(rptr, false);
+    thread4 = new PutThread(rptr, false);
     thread4->setParams(0, 10, 1, true, false, 0);
     thread4->start();
     LOG("Task: threadPutonClient1 Done");
@@ -309,7 +309,7 @@ END_TASK_DEFINITION
 DUNIT_TASK_DEFINITION(CLIENT1, PutOnClient1)
   {
     auto rptr = getHelper()->getRegion(regNames[0]);
-    thread1 = new putThread(rptr, false);
+    thread1 = new PutThread(rptr, false);
     thread1->setParams(0, 5, 1, true, false, 1);
     thread1->start();
     LOG("PUT ops done on client 1");
@@ -319,7 +319,7 @@ END_TASK_DEFINITION
 DUNIT_TASK_DEFINITION(CLIENT2, PutOnClient2)
   {
     auto rptr = getHelper()->getRegion(regNames[0]);
-    thread2 = new putThread(rptr, false);
+    thread2 = new PutThread(rptr, false);
     thread2->setParams(0, 5, 1, false, false, 0);  // 0, 5, 1, false, false, 0
     thread2->start();
     LOG("PUT ops done on client 2");
@@ -329,7 +329,7 @@ END_TASK_DEFINITION
 DUNIT_TASK_DEFINITION(CLIENT1, testServerGC)
   {
     auto rptr = getHelper()->getRegion(regNames[0]);
-    thread3 = new putThread(rptr, false);
+    thread3 = new PutThread(rptr, false);
     thread3->setParams(0, 5000, 1, true, false, 0);  // 0, 5, 1, false, false, 0
     thread3->start();
     LOG("5000 PUT ops done on client 1");
diff --git a/cppcache/integration-test/TimeBomb.cpp b/cppcache/integration-test/TimeBomb.cpp
new file mode 100644
index 0000000..9bb987a
--- /dev/null
+++ b/cppcache/integration-test/TimeBomb.cpp
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TimeBomb.hpp"
+
+#include <iostream>
+
+TimeBomb::TimeBomb(const std::chrono::milliseconds& sleep,
+                   std::function<void()> cleanup)
+    : enabled_{false}, callback_{cleanup}, sleep_{sleep} {}
+
+TimeBomb::~TimeBomb() noexcept {
+  if (enabled_) {
+    disarm();
+  }
+}
+
+void TimeBomb::arm() {
+  enabled_ = true;
+  thread_ = std::thread{[this] { run(); }};
+}
+
+void TimeBomb::disarm() {
+  enabled_ = false;
+  cv_.notify_all();
+  thread_.join();
+}
+
+void TimeBomb::run() {
+  std::clog << "TimeBomb armed to trigger in " << sleep_.count() << " ms"
+            << std::endl;
+  {
+    std::unique_lock<decltype(mutex_)> lock{mutex_};
+    cv_.wait_for(lock, sleep_);
+  }
+
+  if (enabled_) {
+    std::clog << "####### ERROR: TIMEBOMB WENT OFF, TEST TIMED OUT ########"
+              << std::endl
+              << std::flush;
+    callback_();
+    exit(-1);
+  } else {
+    std::clog << "###### TIMEBOMB Disabled ######" << std::endl << std::flush;
+  }
+}
diff --git a/cppcache/integration-test/TimeBomb.hpp b/cppcache/integration-test/TimeBomb.hpp
index ae466f9..c54dedd 100644
--- a/cppcache/integration-test/TimeBomb.hpp
+++ b/cppcache/integration-test/TimeBomb.hpp
@@ -20,103 +20,31 @@
 #ifndef GEODE_INTEGRATION_TEST_TIMEBOMB_H_
 #define GEODE_INTEGRATION_TEST_TIMEBOMB_H_
 
-#include <iostream>
+#include <condition_variable>
+#include <functional>
+#include <mutex>
 #include <thread>
 
-#include <ace/Task.h>
-#include <assert.h>
-
-
-#include "Utils.hpp"
-
-#define MAX_CLIENT 10
-
-class ClientCleanup {
- private:
-  void (*m_cleanupCallback[MAX_CLIENT])();
-  int m_numberOfClient;
-
+class TimeBomb {
  public:
-  ClientCleanup() : m_numberOfClient(0) {}
-
-  void callClientCleanup() {
-    std::cout << "callClientCleanup ... " << m_numberOfClient << "\n";
-    for (int i = 0; i < m_numberOfClient; i++) {
-      try {
-        m_cleanupCallback[i]();
-      } catch (...) {
-      }
-    }
-  }
-
-  bool registerCallback(void (*cleanupFunc)()) {
-    if (m_numberOfClient < MAX_CLIENT) {
-      m_cleanupCallback[m_numberOfClient++] = cleanupFunc;
-      return true;
-    }
-    return false;
-  }
-};
-
-// Automatic stack variable that exits the process after
-// a time specified in the environment.
-
-class TimeBomb : public ACE_Task_Base {
- private:
-  // UNUSED int m_numberOfClient;
-  void (*m_cleanupCallback)();
-  void callClientCleanup() {
-    if (m_cleanupCallback != nullptr) m_cleanupCallback();
-  }
-
- public:
-  std::chrono::seconds m_sleep;
-
-  explicit TimeBomb(void (*cleanupFunc)() = nullptr)
-      : m_sleep(0) /* UNUSED , m_numberOfClient( -1 )*/
-  {
-    std::string sleepEnv = apache::geode::client::Utils::getEnv("TIMEBOMB");
-    if (!sleepEnv.empty()) {
-      m_sleep = std::chrono::seconds{std::stoi(sleepEnv)};
-    }
-
-    m_cleanupCallback = cleanupFunc;
-    arm();  // starting
-  }
-
-  int arm() {
-    int thrAttrs = THR_NEW_LWP | THR_DETACHED;
-#ifndef WIN32
-    thrAttrs |= THR_INHERIT_SCHED;
-#endif
-    return activate(thrAttrs, 1);
-  }
-
-  int svc() override {
-    if (m_sleep == std::chrono::seconds{}) {
-      std::cout << "###### TIMEBOMB Disabled. ######\n";
-      fflush(stdout);
-      return 0;
-    }
-
-    auto start = std::chrono::steady_clock::now();
-    decltype(start) now;
+  explicit TimeBomb(const std::chrono::milliseconds& sleep,
+                    std::function<void()> cleanup);
 
-    do {
-      std::this_thread::sleep_for(std::chrono::seconds(1));
-      now = std::chrono::steady_clock::now();
-    } while ((now - start) < m_sleep);
+  ~TimeBomb() noexcept;
 
-    std::cout << "####### ERROR: TIMEBOMB WENT OFF, TEST TIMED OUT ########\n";
-    fflush(stdout);
+  void arm();
+  void disarm();
 
-    callClientCleanup();
+  void run();
 
-    exit(-1);
-    return 0;
-  }
+ protected:
+  bool enabled_;
+  std::mutex mutex_;
+  std::condition_variable cv_;
 
-  ~TimeBomb() noexcept override = default;
+  std::thread thread_;
+  std::function<void()> callback_;
+  std::chrono::milliseconds sleep_;
 };
 
 #endif  // GEODE_INTEGRATION_TEST_TIMEBOMB_H_
diff --git a/cppcache/integration-test/fw_dunit.cpp b/cppcache/integration-test/fw_dunit.cpp
index aa2ac4d..c102be7 100644
--- a/cppcache/integration-test/fw_dunit.cpp
+++ b/cppcache/integration-test/fw_dunit.cpp
@@ -44,6 +44,9 @@
 #define __DUNIT_NO_MAIN__
 #include "fw_dunit.hpp"
 
+#include "Utils.hpp"
+
+namespace bp = boost::process;
 namespace bip = boost::interprocess;
 
 static std::string g_programName;
@@ -720,15 +723,17 @@ void log(std::string s, int lineno, const char * /*filename*/) {
             << std::flush;
 }
 
-void cleanup() { gClientCleanup.callClientCleanup(); }
-
 int dmain(int argc, char *argv[]) {
+  using apache::geode::client::Utils;
+
 #ifdef USE_SMARTHEAP
   MemRegisterTask();
 #endif
   setupCRTOutput();
-  TimeBomb tb(&cleanup);
-  // tb->arm(); // leak this on purpose.
+  auto timebomb = std::chrono::seconds{std::stoi(Utils::getEnv("TIMEBOMB"))};
+  TimeBomb tb(timebomb, []() { gClientCleanup.trigger(); });
+  tb.arm();
+
   try {
     g_programName = argv[0];
     const ACE_TCHAR options[] = ACE_TEXT("s:m:");
@@ -774,13 +779,13 @@ int dmain(int argc, char *argv[]) {
 
       fflush(stdout);
     }
+
     std::cout << "final worker id " << workerId << ", result " << result
               << "\n";
     std::cout << "before calling cleanup " << workerId << "\n";
-    gClientCleanup.callClientCleanup();
+    gClientCleanup.trigger();
     std::cout << "after calling cleanup\n";
     return result;
-
   } catch (dunit::TestException &te) {
     te.print();
   } catch (apache::geode::client::testframework::FwkException &fe) {
@@ -794,7 +799,7 @@ int dmain(int argc, char *argv[]) {
               << std::flush;
   }
 
-  gClientCleanup.callClientCleanup();
+  gClientCleanup.trigger();
   return 1;
 }
 
diff --git a/cppcache/integration-test/fw_dunit.hpp b/cppcache/integration-test/fw_dunit.hpp
index eb9e486..13c9fe2 100644
--- a/cppcache/integration-test/fw_dunit.hpp
+++ b/cppcache/integration-test/fw_dunit.hpp
@@ -118,9 +118,12 @@ END_TASK(validate)
 #include <iostream>
 #include <string>
 
+#include <geode/Exception.hpp>
+
 #include <boost/interprocess/managed_shared_memory.hpp>
 
 #include <signal.h>
+#include "ClientCleanup.hpp"
 #include "TimeBomb.hpp"
 
 #define ASSERT(x, y)                                     \
diff --git a/cppcache/integration-test/testExpiration.cpp b/cppcache/integration-test/testExpiration.cpp
index 76834b2..655b361 100644
--- a/cppcache/integration-test/testExpiration.cpp
+++ b/cppcache/integration-test/testExpiration.cpp
@@ -17,6 +17,7 @@
 
 #include <geode/ExpirationAction.hpp>
 #include <geode/Region.hpp>
+#include <geode/SystemProperties.hpp>
 
 #include "fw_helper.hpp"
 #include "CacheRegionHelper.hpp"
diff --git a/cppcache/integration-test/testOverflowPutGetSqLite.cpp b/cppcache/integration-test/testOverflowPutGetSqLite.cpp
index c019a8a..98ea64f 100644
--- a/cppcache/integration-test/testOverflowPutGetSqLite.cpp
+++ b/cppcache/integration-test/testOverflowPutGetSqLite.cpp
@@ -247,28 +247,34 @@ void testEntryInvalidate(std::shared_ptr<Region> &regionPtr, uint32_t num) {
   ASSERT(v.size() == num, "size of key vec not equal");
 }
 
-class PutThread : public ACE_Task_Base {
- private:
-  std::shared_ptr<Region> m_regPtr;
-  int m_min;
-  int m_max;
-
+class PutThread {
  public:
-  PutThread(std::shared_ptr<Region> &regPtr, int min, int max)
-      : m_regPtr(regPtr), m_min(min), m_max(max) {}
+  PutThread(std::shared_ptr<Region> region, int min, int max)
+      : min_{min}, max_{max}, region_{region} {}
 
-  int svc(void) override {
+  void run() {
     /** put some values into the cache. */
-    doNput(m_regPtr, m_max, m_min);
+    doNput(region_, max_, min_);
     /** do some gets... printing what we find in the cache. */
-    doNget(m_regPtr, m_max, m_min);
+    doNget(region_, max_, min_);
     LOG("Completed doNget");
-    return 0;
   }
 
-  void start() { activate(); }
+  void start() {
+    thread_ = std::thread{[this]() { run(); }};
+  }
+
+  void stop() {
+    if (thread_.joinable()) {
+      thread_.join();
+    }
+  }
 
-  void stop() { wait(); }
+ protected:
+  int min_;
+  int max_;
+  std::thread thread_;
+  std::shared_ptr<Region> region_;
 };
 
 void verifyGetAll(std::shared_ptr<Region> region, int startIndex) {
diff --git a/cppcache/integration-test/testRegionAccessThreadSafe.cpp b/cppcache/integration-test/testRegionAccessThreadSafe.cpp
index 52fbbe4..6943c95 100644
--- a/cppcache/integration-test/testRegionAccessThreadSafe.cpp
+++ b/cppcache/integration-test/testRegionAccessThreadSafe.cpp
@@ -16,32 +16,27 @@
  */
 #include "fw_dunit.hpp"
 #include "ThinClientHelper.hpp"
-#include <ace/Task.h>
+
+#include <atomic>
 
 using apache::geode::client::Exception;
 
-class GetRegionThread : public ACE_Task_Base {
+class GetRegionThread {
  public:
-  bool m_running;
-  std::string m_path;
-  std::string m_subPath;
-  bool m_regionCreateDone;
-  bool m_subRegionCreateDone;
-  std::recursive_mutex mutex_;
   GetRegionThread(const char *path, const char *subPath)
       : m_running(false),
         m_path(path),
         m_subPath(subPath),
         m_regionCreateDone(false),
         m_subRegionCreateDone(false) {}
-  int svc(void) override {
-    while (m_running == true) {
+
+  void run() {
+    while (m_running) {
       SLEEP(40);
       try {
         auto rptr = getHelper()->getRegion(m_path.c_str());
         if (rptr != nullptr) {
-          std::lock_guard<decltype(mutex_)> guard{mutex_};
-          ASSERT(m_regionCreateDone == true, "regionCreate Not Done");
+          ASSERT(m_regionCreateDone, "regionCreate Not Done");
         }
       } catch (Exception &ex) {
         LOG(ex.what());
@@ -56,9 +51,8 @@ class GetRegionThread : public ACE_Task_Base {
       try {
         auto rptr = getHelper()->getRegion(m_subPath.c_str());
         if (rptr != nullptr) {
-          std::lock_guard<decltype(mutex_)> guard{mutex_};
-          ASSERT(m_subRegionCreateDone == true, "subRegionCreate Not Done");
-          return 0;
+          ASSERT(m_subRegionCreateDone, "subRegionCreate Not Done");
+          return;
         }
       } catch (Exception &ex) {
         LOG(ex.what());
@@ -68,24 +62,31 @@ class GetRegionThread : public ACE_Task_Base {
         LOG("getRegion: unknown exception");
       }
     }
-    return 0;
-  }
-  void setRegionFlag() {
-    std::lock_guard<decltype(mutex_)> guard{mutex_};
-    m_regionCreateDone = true;
-  }
-  void setSubRegionFlag() {
-    std::lock_guard<decltype(mutex_)> guard{mutex_};
-    m_subRegionCreateDone = true;
   }
+
+  void setRegionFlag() { m_regionCreateDone = true; }
+
+  void setSubRegionFlag() { m_subRegionCreateDone = true; }
+
   void start() {
     m_running = true;
-    activate();
+    thread_ = std::thread{[this]() { run(); }};
   }
+
   void stop() {
     m_running = false;
-    wait();
+    if (thread_.joinable()) {
+      thread_.join();
+    }
   }
+
+ protected:
+  bool m_running;
+  std::string m_path;
+  std::thread thread_;
+  std::string m_subPath;
+  std::atomic<bool> m_regionCreateDone;
+  std::atomic<bool> m_subRegionCreateDone;
 };
 
 static int numberOfLocators = 1;
diff --git a/cppcache/integration-test/testSpinLock.cpp b/cppcache/integration-test/testSpinLock.cpp
index 14c4efe..b2f2242 100644
--- a/cppcache/integration-test/testSpinLock.cpp
+++ b/cppcache/integration-test/testSpinLock.cpp
@@ -18,42 +18,13 @@
 #include "fw_dunit.hpp"
 
 #include <mutex>
-#include <condition_variable>
-#include <util/concurrent/spinlock_mutex.hpp>
 
-#include <ace/Task.h>
-#include <ace/Time_Value.h>
-#include <ace/Guard_T.h>
+#include "util/concurrent/binary_semaphore.hpp"
+#include "util/concurrent/spinlock_mutex.hpp"
 
 namespace {  // NOLINT(google-build-namespaces)
 
-class semaphore {
- public:
-  explicit semaphore(bool released) : released_(released) {}
-
-  void release() {
-    std::lock_guard<std::mutex> lock(mutex_);
-    released_ = true;
-    cv_.notify_one();
-  }
-
-  void acquire() {
-    std::unique_lock<std::mutex> lock(mutex_);
-    cv_.wait(lock, [this]() { return released_; });
-    released_ = false;
-  }
-
-  semaphore& operator=(const semaphore& other) {
-    released_ = other.released_;
-    return *this;
-  }
-
- protected:
-  bool released_;
-  std::mutex mutex_;
-  std::condition_variable cv_;
-};
-
+using apache::geode::client::binary_semaphore;
 using apache::geode::util::concurrent::spinlock_mutex;
 
 DUNIT_TASK(s1p1, Basic)
@@ -63,56 +34,86 @@ DUNIT_TASK(s1p1, Basic)
   }
 END_TASK(Basic)
 
-semaphore triggerA{0};
-semaphore triggerB{0};
-semaphore triggerM{0};
-
 spinlock_mutex lock;
 std::chrono::steady_clock::time_point btime;
 
-class ThreadA : public ACE_Task_Base {
+class ThreadA {
  public:
-  ThreadA() : ACE_Task_Base() {}
-
-  int svc() override {
-    {
-      std::lock_guard<spinlock_mutex> lk(lock);
-      LOG("ThreadA: Acquired lock x.");
-      triggerM.release();
-      triggerA.acquire();
+  ThreadA(binary_semaphore& triggerA, binary_semaphore& triggerM)
+      : triggerA_{triggerA}, triggerM_{triggerM} {}
+
+  ~ThreadA() { stop(); }
+
+  void start() {
+    thread_ = std::thread{[this]() { run(); }};
+  }
+
+  void stop() {
+    if (thread_.joinable()) {
+      thread_.join();
     }
+  }
+
+ protected:
+  void run() {
+    std::lock_guard<spinlock_mutex> lk(lock);
+    LOG("ThreadA: Acquired lock x.");
+    triggerM_.release();
+    triggerA_.acquire();
+
     LOG("ThreadA: Released lock.");
-    return 0;
   }
+
+ protected:
+  std::thread thread_;
+  binary_semaphore& triggerA_;
+  binary_semaphore& triggerM_;
 };
 
-class ThreadB : public ACE_Task_Base {
+class ThreadB {
  public:
-  ThreadB() : ACE_Task_Base() {}
-
-  int svc() override {
-    triggerB.acquire();
-    {
-      std::lock_guard<spinlock_mutex> lk(lock);
-      btime = std::chrono::steady_clock::now();
-      LOG("ThreadB: Acquired lock.");
-      triggerM.release();
+  ThreadB(binary_semaphore& triggerB, binary_semaphore& triggerM)
+      : triggerB_{triggerB}, triggerM_{triggerM} {}
+
+  ~ThreadB() { stop(); }
+
+  void start() {
+    thread_ = std::thread{[this]() { run(); }};
+  }
+
+  void stop() {
+    if (thread_.joinable()) {
+      thread_.join();
     }
-    return 0;
   }
+
+ protected:
+  void run() {
+    triggerB_.acquire();
+
+    std::lock_guard<spinlock_mutex> lk(lock);
+    btime = std::chrono::steady_clock::now();
+    LOG("ThreadB: Acquired lock.");
+    triggerM_.release();
+  }
+
+ protected:
+  std::thread thread_;
+  binary_semaphore& triggerB_;
+  binary_semaphore& triggerM_;
 };
 
 DUNIT_TASK(s1p1, TwoThreads)
   {
-    triggerA = semaphore{0};
-    triggerB = semaphore{0};
-    triggerM = semaphore{0};
+    binary_semaphore triggerA{0};
+    binary_semaphore triggerB{0};
+    binary_semaphore triggerM{0};
 
-    ThreadA* threadA = new ThreadA();
-    ThreadB* threadB = new ThreadB();
+    ThreadA threadA{triggerA, triggerM};
+    ThreadB threadB{triggerB, triggerM};
 
-    threadA->activate();
-    threadB->activate();
+    threadA.start();
+    threadB.start();
 
     // A runs, locks the spinlock, and triggers me. B is idle.
     triggerM.acquire();
@@ -130,18 +131,15 @@ DUNIT_TASK(s1p1, TwoThreads)
     auto delta =
         std::chrono::duration_cast<std::chrono::milliseconds>(btime - stime)
             .count();
-    std::string msg = "acquire delay was " + std::to_string(delta);
 
-    LOG(msg);
+    LOG("acquire delay was " + std::to_string(delta));
     ASSERT(delta >= 4900, "Expected 5 second or more spinlock delay");
     // Note the test is against 4900 instead of 5000 as there are some
     // measurement
     // issues. Often delta comes back as 4999 on linux.
 
-    threadA->wait();
-    delete threadA;
-    threadB->wait();
-    delete threadB;
+    threadA.stop();
+    threadB.stop();
   }
 END_TASK(TwoThreads)
 
diff --git a/cppcache/integration-test/testThinClientCqFailover.cpp b/cppcache/integration-test/testThinClientCqFailover.cpp
index 460ef2d..2ae52d6 100644
--- a/cppcache/integration-test/testThinClientCqFailover.cpp
+++ b/cppcache/integration-test/testThinClientCqFailover.cpp
@@ -88,29 +88,23 @@ class MyCqListener : public CqListener {
   void close() override { LOG("MyCqListener::close called"); }
 };
 
-class KillServerThread : public ACE_Task_Base {
+class KillServerThread {
  public:
-  bool m_running;
-  MyCqListener *m_listener;
-  explicit KillServerThread(MyCqListener *listener)
-      : m_running(false), m_listener(listener) {}
-  int svc(void) override {
-    while (m_running == true) {
+  void start() {
+    thread_ = std::thread{[]() {
       CacheHelper::closeServer(1);
       LOG("THREAD CLOSED SERVER 1");
-      // m_listener->setFailedOver();
-      m_running = false;
-    }
-    return 0;
-  }
-  void start() {
-    m_running = true;
-    activate();
+    }};
   }
+
   void stop() {
-    m_running = false;
-    wait();
+    if (thread_.joinable()) {
+      thread_.join();
+    }
   }
+
+ protected:
+  std::thread thread_;
 };
 
 void initClientCq(const bool isthinClient) {
@@ -295,7 +289,8 @@ DUNIT_TASK_DEFINITION(CLIENT1, StepThree3)
     ASSERT(cqLstner != nullptr, "listener is nullptr");
     auto myListener = dynamic_cast<MyCqListener *>(cqLstner.get());
     ASSERT(myListener != nullptr, "my listener is nullptr<cast failed>");
-    kst = new KillServerThread(myListener);
+
+    kst = new KillServerThread();
     LOG(std::string("before kill server 1, before=") +
         std::to_string(myListener->getCountBefore()) +
         ", after=" + std::to_string(myListener->getCountAfter()));
diff --git a/cppcache/integration-test/testThinClientCqHAFailover.cpp b/cppcache/integration-test/testThinClientCqHAFailover.cpp
index 7016a7e..31a1874 100644
--- a/cppcache/integration-test/testThinClientCqHAFailover.cpp
+++ b/cppcache/integration-test/testThinClientCqHAFailover.cpp
@@ -87,29 +87,23 @@ class MyCqListener : public CqListener {
   void close() override { LOG("MyCqListener::close called"); }
 };
 
-class KillServerThread : public ACE_Task_Base {
+class KillServerThread {
  public:
-  bool m_running;
-  MyCqListener *m_listener;
-  explicit KillServerThread(MyCqListener *listener)
-      : m_running(false), m_listener(listener) {}
-  int svc(void) override {
-    while (m_running == true) {
+  void start() {
+    thread_ = std::thread{[]() {
       CacheHelper::closeServer(1);
       LOG("THREAD CLOSED SERVER 1");
-      // m_listener->setFailedOver();
-      m_running = false;
-    }
-    return 0;
-  }
-  void start() {
-    m_running = true;
-    activate();
+    }};
   }
+
   void stop() {
-    m_running = false;
-    wait();
+    if (thread_.joinable()) {
+      thread_.join();
+    }
   }
+
+ protected:
+  std::thread thread_;
 };
 
 void initClientCq() {
@@ -315,7 +309,7 @@ DUNIT_TASK_DEFINITION(CLIENT1, StepThree3)
     ASSERT(cqLstner != nullptr, "listener is nullptr");
     MyCqListener *myListener = dynamic_cast<MyCqListener *>(cqLstner.get());
     ASSERT(myListener != nullptr, "my listener is nullptr<cast failed>");
-    kst = new KillServerThread(myListener);
+    kst = new KillServerThread();
     char buf[1024];
     LOG(std::string("before kill server 1, before=") +
         std::to_string(myListener->getCountBefore()) +
diff --git a/cppcache/integration-test/testThinClientHAQueryFailover.cpp b/cppcache/integration-test/testThinClientHAQueryFailover.cpp
index d31d57c..afd2178 100644
--- a/cppcache/integration-test/testThinClientHAQueryFailover.cpp
+++ b/cppcache/integration-test/testThinClientHAQueryFailover.cpp
@@ -58,28 +58,23 @@ static int numberOfLocators = 1;
 const std::string locatorsG =
     CacheHelper::getLocatorHostPort(isLocator, isLocalServer, numberOfLocators);
 
-class KillServerThread : public ACE_Task_Base {
+class KillServerThread {
  public:
-  bool m_running;
-  KillServerThread() : m_running(false) {}
-  int svc(void) override {
-    while (m_running == true) {
-      // CacheHelper::initServer( 2, "cacheserver_remoteoql2.xml");
-      // LOG("THREAD STARTED SERVER 2");
+  void start() {
+    thread_ = std::thread{[]() {
       CacheHelper::closeServer(1);
       LOG("THREAD CLOSED SERVER 1");
-      m_running = false;
-    }
-    return 0;
-  }
-  void start() {
-    m_running = true;
-    activate();
+    }};
   }
+
   void stop() {
-    m_running = false;
-    wait();
+    if (thread_.joinable()) {
+      thread_.join();
+    }
   }
+
+ protected:
+  std::thread thread_;
 };
 
 void initClient() {
diff --git a/cppcache/integration-test/testThinClientPoolAttrTest.cpp b/cppcache/integration-test/testThinClientPoolAttrTest.cpp
index 1e9fd2e..65aa48c 100644
--- a/cppcache/integration-test/testThinClientPoolAttrTest.cpp
+++ b/cppcache/integration-test/testThinClientPoolAttrTest.cpp
@@ -44,17 +44,16 @@ const char *poolName1 = "clientPool";
 const char *serverGroup = "ServerGroup1";
 std::shared_ptr<Cache> cachePtr;
 
-class putThread : public ACE_Task_Base {
- private:
-  std::shared_ptr<Region> regPtr;
-
+class PutThread {
  public:
-  explicit putThread(const char *name) : regPtr(getHelper()->getRegion(name)) {}
+  explicit PutThread() : regPtr{nullptr} {}
+  explicit PutThread(const std::string &name)
+      : regPtr{getHelper()->getRegion(name)} {}
 
-  int svc(void) override {
+  void run() {
     // TODO: No. of connection should be = minConnection
 
-    for (int i = 0; i < 10000; i++) {
+    for (auto i = 0; i < 10000; ++i) {
       try {
         regPtr->put(keys[i % 5], vals[i % 6]);
       } catch (const Exception &) {
@@ -64,11 +63,23 @@ class putThread : public ACE_Task_Base {
       }
       // TODO: Check no. of connection > minConnetion
     }
-    // LOG(" Incremented 100 times by thread.");
-    return 0;
   }
-  void start() { activate(); }
+
+  void start() {
+    thread_ = std::thread{[this]() { run(); }};
+  }
+
+  void wait() {
+    if (thread_.joinable()) {
+      thread_.join();
+    }
+  }
+
   void stop() { wait(); }
+
+ protected:
+  std::shared_ptr<Region> regPtr;
+  std::thread thread_;
 };
 
 void doAttrTestingAndCreatePool(const char *poolNameToUse) {
@@ -270,10 +281,10 @@ DUNIT_TASK(CLIENT1, ClientOp)
            std::string("Pool level not equal to min level. Expected ") +
                std::to_string(min) + ", actual " + std::to_string(level));
 
-    putThread *threads[25];
+    PutThread threads[25];
     for (int thdIdx = 0; thdIdx < 10; thdIdx++) {
-      threads[thdIdx] = new putThread(poolRegNames[0]);
-      threads[thdIdx]->start();
+      threads[thdIdx] = PutThread(poolRegNames[0]);
+      threads[thdIdx].start();
     }
 
     SLEEP(5000);  // wait for threads to become active
@@ -291,7 +302,7 @@ DUNIT_TASK(CLIENT1, ClientOp)
                std::to_string(max) + ", actual " + std::to_string(level));
 
     for (int thdIdx = 0; thdIdx < 10; thdIdx++) {
-      threads[thdIdx]->stop();
+      threads[thdIdx].stop();
     }
 
     // Milli second sleep: IdleTimeout is 5 sec, load conditioning
diff --git a/cppcache/integration-test/testThinClientRemoteQueryFailover.cpp b/cppcache/integration-test/testThinClientRemoteQueryFailover.cpp
index daa08fb..80255bd 100644
--- a/cppcache/integration-test/testThinClientRemoteQueryFailover.cpp
+++ b/cppcache/integration-test/testThinClientRemoteQueryFailover.cpp
@@ -53,26 +53,23 @@ using testobject::PortfolioPdx;
 using testobject::Position;
 using testobject::PositionPdx;
 
-class KillServerThread : public ACE_Task_Base {
+class KillServerThread {
  public:
-  bool m_running;
-  KillServerThread() : m_running(false) {}
-  int svc(void) override {
-    while (m_running == true) {
+  void start() {
+    thread_ = std::thread{[]() {
       CacheHelper::closeServer(1);
       LOG("THREAD CLOSED SERVER 1");
-      m_running = false;
-    }
-    return 0;
-  }
-  void start() {
-    m_running = true;
-    activate();
+    }};
   }
+
   void stop() {
-    m_running = false;
-    wait();
+    if (thread_.joinable()) {
+      thread_.join();
+    }
   }
+
+ protected:
+  std::thread thread_;
 };
 
 bool isLocator = false;
diff --git a/cppcache/integration-test/testThinClientRemoteQueryFailoverPdx.cpp b/cppcache/integration-test/testThinClientRemoteQueryFailoverPdx.cpp
index dbc456a..88ca233 100644
--- a/cppcache/integration-test/testThinClientRemoteQueryFailoverPdx.cpp
+++ b/cppcache/integration-test/testThinClientRemoteQueryFailoverPdx.cpp
@@ -50,26 +50,23 @@ using apache::geode::client::IllegalStateException;
 using apache::geode::client::QueryService;
 using apache::geode::client::SelectResults;
 
-class KillServerThread : public ACE_Task_Base {
+class KillServerThread {
  public:
-  bool m_running;
-  KillServerThread() : m_running(false) {}
-  int svc(void) override {
-    while (m_running == true) {
+  void start() {
+    thread_ = std::thread{[]() {
       CacheHelper::closeServer(1);
       LOG("THREAD CLOSED SERVER 1");
-      m_running = false;
-    }
-    return 0;
-  }
-  void start() {
-    m_running = true;
-    activate();
+    }};
   }
+
   void stop() {
-    m_running = false;
-    wait();
+    if (thread_.joinable()) {
+      thread_.join();
+    }
   }
+
+ protected:
+  std::thread thread_;
 };
 
 bool isLocator = false;