You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2022/02/17 21:26:22 UTC
[geode-native] branch develop updated: GEODE-9324: Remove ACE_Task references (#812)
This is an automated email from the ASF dual-hosted git repository.
bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git
The following commit(s) were added to refs/heads/develop by this push:
new 3876082 GEODE-9324: Remove ACE_Task references (#812)
3876082 is described below
commit 3876082529b0c74f516a8a7ee168cd0fa288080e
Author: Mario Salazar de Torres <ma...@est.tech>
AuthorDate: Thu Feb 17 22:24:58 2022 +0100
GEODE-9324: Remove ACE_Task references (#812)
- Removed all references to ACE_Task
- Changed variable names to comply with styleguides
---
cppcache/integration-test/CMakeLists.txt | 3 +
cppcache/integration-test/CacheHelper.cpp | 13 +-
cppcache/integration-test/ClientCleanup.cpp | 36 +++
cppcache/integration-test/ClientCleanup.hpp | 35 +++
.../integration-test/ThinClientSecurityHelper.hpp | 116 +++++----
.../integration-test/ThinClientTransactions.hpp | 270 +++++++++++----------
.../integration-test/ThinClientTransactionsXA.hpp | 266 ++++++++++----------
.../integration-test/ThinClientVersionedOps.hpp | 16 +-
cppcache/integration-test/TimeBomb.cpp | 60 +++++
cppcache/integration-test/TimeBomb.hpp | 106 ++------
cppcache/integration-test/fw_dunit.cpp | 19 +-
cppcache/integration-test/fw_dunit.hpp | 3 +
cppcache/integration-test/testExpiration.cpp | 1 +
.../integration-test/testOverflowPutGetSqLite.cpp | 34 +--
.../testRegionAccessThreadSafe.cpp | 53 ++--
cppcache/integration-test/testSpinLock.cpp | 138 ++++++-----
.../integration-test/testThinClientCqFailover.cpp | 31 +--
.../testThinClientCqHAFailover.cpp | 30 +--
.../testThinClientHAQueryFailover.cpp | 27 +--
.../testThinClientPoolAttrTest.cpp | 39 +--
.../testThinClientRemoteQueryFailover.cpp | 25 +-
.../testThinClientRemoteQueryFailoverPdx.cpp | 25 +-
22 files changed, 733 insertions(+), 613 deletions(-)
diff --git a/cppcache/integration-test/CMakeLists.txt b/cppcache/integration-test/CMakeLists.txt
index 8dc9973..66dede7 100644
--- a/cppcache/integration-test/CMakeLists.txt
+++ b/cppcache/integration-test/CMakeLists.txt
@@ -18,9 +18,12 @@ project(nativeclient.tests.cppcache LANGUAGES CXX)
add_library(test-cppcache-utils STATIC
fw_dunit.cpp
+ ClientCleanup.cpp
CacheHelper.cpp
CacheableWrapper.cpp
+ TimeBomb.cpp
)
+
target_link_libraries(test-cppcache-utils
PRIVATE
integration-framework
diff --git a/cppcache/integration-test/CacheHelper.cpp b/cppcache/integration-test/CacheHelper.cpp
index 1e632a1..e65f15b 100644
--- a/cppcache/integration-test/CacheHelper.cpp
+++ b/cppcache/integration-test/CacheHelper.cpp
@@ -1016,11 +1016,14 @@ void CacheHelper::initServer(int instance, const std::string &xml,
const char * /*unused*/, bool ssl,
bool enableDelta, bool, bool testServerGC,
bool untrustedCert, bool useSecurityManager) {
- if (!isServerCleanupCallbackRegistered &&
- gClientCleanup.registerCallback(&CacheHelper::cleanupServerInstances)) {
+ if (!isServerCleanupCallbackRegistered) {
isServerCleanupCallbackRegistered = true;
+ gClientCleanup.registerCallback(
+ []() { CacheHelper::cleanupServerInstances(); });
+
std::cout << "TimeBomb registered server cleanupcallback \n";
}
+
std::cout << "Inside initServer added\n";
static const auto gfjavaenv = Utils::getEnv("GFJAVA");
@@ -1424,9 +1427,11 @@ void CacheHelper::initLocator(int instance, bool ssl, bool, int dsId,
bool useSecurityManager) {
static const auto gfjavaenv = Utils::getEnv("GFJAVA");
- if (!isLocatorCleanupCallbackRegistered &&
- gClientCleanup.registerCallback(&CacheHelper::cleanupLocatorInstances)) {
+ if (!isLocatorCleanupCallbackRegistered) {
isLocatorCleanupCallbackRegistered = true;
+
+ gClientCleanup.registerCallback(
+ []() { CacheHelper::cleanupLocatorInstances(); });
}
std::string currDir = boost::filesystem::current_path().string();
diff --git a/cppcache/integration-test/ClientCleanup.cpp b/cppcache/integration-test/ClientCleanup.cpp
new file mode 100644
index 0000000..2727174
--- /dev/null
+++ b/cppcache/integration-test/ClientCleanup.cpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ClientCleanup.hpp"
+
+#include <iostream>
+
+void ClientCleanup::trigger() {
+ for (auto& callback : callbacks_) {
+ try {
+ callback();
+ } catch (std::exception& e) {
+ std::clog << "Exception while executing cleanup: " << e.what()
+ << std::endl
+ << std::flush;
+ }
+ }
+}
+
+void ClientCleanup::registerCallback(std::function<void()> callback) {
+ callbacks_.emplace_back(callback);
+}
diff --git a/cppcache/integration-test/ClientCleanup.hpp b/cppcache/integration-test/ClientCleanup.hpp
new file mode 100644
index 0000000..33cb949
--- /dev/null
+++ b/cppcache/integration-test/ClientCleanup.hpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#ifndef GEODE_INTEGRATION_TEST_CLIENT_CLEANUP_H_
+#define GEODE_INTEGRATION_TEST_CLIENT_CLEANUP_H_
+
+#include <functional>
+#include <vector>
+
+class ClientCleanup {
+ public:
+ void trigger();
+ void registerCallback(std::function<void()> callback);
+
+ protected:
+ std::vector<std::function<void()>> callbacks_;
+};
+
+#endif // GEODE_INTEGRATION_TEST_CLIENT_CLEANUP_H_
diff --git a/cppcache/integration-test/ThinClientSecurityHelper.hpp b/cppcache/integration-test/ThinClientSecurityHelper.hpp
index c8ef291..35f32bc 100644
--- a/cppcache/integration-test/ThinClientSecurityHelper.hpp
+++ b/cppcache/integration-test/ThinClientSecurityHelper.hpp
@@ -170,46 +170,54 @@ void initClientAuth(char UserType) {
}
}
-// This putThread class is used in
+// This PutThread class is used in
// testThinClientTracking,testThinClientTicket304, testThinClientTicket317
-class putThread : public ACE_Task_Base {
+class PutThread {
public:
- explicit putThread(std::shared_ptr<Region> r, bool regInt = false,
- int waitTime = 0) {
- m_reg = r;
- m_regInt = regInt;
- m_numthreads = 1;
- m_numops = 0;
- m_isCallBack = false;
- m_sameKey = false;
- m_waitTime = waitTime;
+ explicit PutThread(std::shared_ptr<Region> r, bool regInt = false,
+ int waitTime = 0)
+ : region_{r},
+ numOps_{0},
+ numThreads_{1},
+ isCallback_{false},
+ sameKey_{false},
+ regInt_{regInt},
+ waitTime_{waitTime} {
}
void setParams(int opcode, int numofops, int numthreads,
bool isCallBack = false, bool sameKey = false,
int waitTime = 0) { //
- m_opcode = opcode;
- m_numops = numofops;
- m_numthreads = numthreads;
- m_isCallBack = isCallBack;
- m_sameKey = sameKey;
- m_waitTime = waitTime;
+ opcode_ = opcode;
+ numOps_ = numofops;
+ numThreads_ = numthreads;
+ isCallback_ = isCallBack;
+ sameKey_ = sameKey;
+ waitTime_ = waitTime;
}
void start() {
- m_run = true;
- activate(THR_NEW_LWP | THR_JOINABLE, m_numthreads);
+ for(auto i = 0; i < numThreads_; ++i) {
+ threads_.emplace_back([this](){
+ run();
+ });
+ }
+ }
+
+ void wait() {
+ for(auto& thread : threads_) {
+ if(thread.joinable()) {
+ thread.join();
+ }
+ }
}
void stop() {
- if (m_run) {
- m_run = false;
wait();
- }
}
- int svc() override {
+ void run() {
int ops = 0;
std::string key_str;
std::shared_ptr<CacheableKey> key;
@@ -217,49 +225,49 @@ class putThread : public ACE_Task_Base {
std::vector<std::shared_ptr<CacheableKey>> keys0;
auto pid = boost::this_process::get_id();
- if (m_regInt) {
- m_reg->registerAllKeys(false, true);
+ if (regInt_) {
+ region_->registerAllKeys(false, true);
}
- if (m_waitTime != 0) {
- std::this_thread::sleep_for(std::chrono::seconds{m_waitTime});
+ if (waitTime_ != 0) {
+ std::this_thread::sleep_for(std::chrono::seconds{waitTime_});
}
- while (ops++ < m_numops) {
- if (m_sameKey) {
+ while (ops++ < numOps_) {
+ if (sameKey_) {
key_str = "key-1";
} else {
key_str = "key-" + std::to_string(ops);
}
key = CacheableKey::create(key_str);
- if (m_opcode == 0) {
+ if (opcode_ == 0) {
std::string value_str;
- if (m_isCallBack) {
+ if (isCallback_) {
auto boolptr = CacheableBoolean::create("true");
value_str = "client1-value" + std::to_string(ops);
value = CacheableString::create(value_str);
- m_reg->put(key, value, boolptr);
+ region_->put(key, value, boolptr);
} else {
value_str = "client2-value" + std::to_string(ops);
value = CacheableString::create(value_str);
- m_reg->put(key, value);
+ region_->put(key, value);
}
- } else if (m_opcode == 1) {
- m_reg->get(key);
- } else if (m_opcode == 5) {
+ } else if (opcode_ == 1) {
+ region_->get(key);
+ } else if (opcode_ == 5) {
keys0.push_back(key);
- if (ops == m_numops) {
- m_reg->registerKeys(keys0, false, true);
+ if (ops == numOps_) {
+ region_->registerKeys(keys0, false, true);
}
- } else if (m_opcode == 6) {
- m_reg->registerRegex("key-[1-3]", false, true);
+ } else if (opcode_ == 6) {
+ region_->registerRegex("key-[1-3]", false, true);
} else {
try {
- if (m_isCallBack) {
+ if (isCallback_) {
auto boolptr = CacheableBoolean::create("true");
- m_reg->destroy(key, boolptr);
+ region_->destroy(key, boolptr);
} else {
- m_reg->destroy(key);
+ region_->destroy(key);
}
} catch (Exception& ex) {
auto tid =
@@ -268,18 +276,20 @@ class putThread : public ACE_Task_Base {
}
}
}
- return 0;
}
- std::shared_ptr<Region> m_reg;
- bool m_run;
- int m_opcode;
- int m_numops;
- int m_numthreads;
- bool m_isCallBack;
- bool m_sameKey;
- bool m_regInt;
- int m_waitTime;
+ protected:
+ std::shared_ptr<Region> region_;
+
+ int opcode_;
+ int numOps_;
+ int numThreads_;
+ bool isCallback_;
+ bool sameKey_;
+ bool regInt_;
+ int waitTime_;
+
+ std::vector<std::thread> threads_;
};
} // namespace
diff --git a/cppcache/integration-test/ThinClientTransactions.hpp b/cppcache/integration-test/ThinClientTransactions.hpp
index d1f24a4..5ff3d01 100644
--- a/cppcache/integration-test/ThinClientTransactions.hpp
+++ b/cppcache/integration-test/ThinClientTransactions.hpp
@@ -21,11 +21,9 @@
#define GEODE_INTEGRATION_TEST_THINCLIENTTRANSACTIONS_H_
#include "fw_dunit.hpp"
-#include <ace/Auto_Event.h>
-#include <ace/OS.h>
-#include <ace/High_Res_Timer.h>
#include <string>
+
#include <geode/TransactionId.hpp>
#include <geode/CacheTransactionManager.hpp>
@@ -33,9 +31,11 @@
#define ROOT_SCOPE DISTRIBUTED_ACK
#include "CacheHelper.hpp"
+#include "util/concurrent/binary_semaphore.hpp"
namespace { // NOLINT(google-build-namespaces)
+using apache::geode::client::binary_semaphore;
using apache::geode::client::CacheableKey;
using apache::geode::client::CacheableString;
using apache::geode::client::CacheHelper;
@@ -358,40 +358,34 @@ const bool NO_ACK = false;
#define THREADERRORCHECK(x, y) \
do { \
if (!(x)) { \
- m_isFailed = true; \
- m_error = y; \
- return -1; \
+ failed_ = true; \
+ error_ = y; \
+ return; \
} \
} while (0)
-class SuspendTransactionThread : public ACE_Task_Base {
- private:
- TransactionId* m_suspendedTransaction;
- bool m_sleep;
- ACE_Auto_Event* m_txEvent;
-
+class SuspendTransactionThread {
public:
- SuspendTransactionThread(bool sleep, ACE_Auto_Event* txEvent)
- : m_suspendedTransaction(nullptr), m_sleep(sleep), m_txEvent(txEvent) {}
+ SuspendTransactionThread(bool sleep, binary_semaphore& event)
+ : sleep_{sleep}, sem_{event} {}
- int svc(void) override {
+ void run() {
LOG(" In SuspendTransactionThread");
-
auto txManager = getHelper()->getCache()->getCacheTransactionManager();
txManager->begin();
-
createEntry(regionNames[0], keys[4], vals[4]);
createEntry(regionNames[1], keys[5], vals[5]);
- m_suspendedTransaction = &txManager->getTransactionId();
+ tx_ = &txManager->getTransactionId();
+
+ if (sleep_) {
+ sem_.acquire();
- if (m_sleep) {
- m_txEvent->wait();
- std::this_thread::sleep_for(std::chrono::seconds(5));
+ std::this_thread::sleep_for(std::chrono::seconds{5});
}
- m_suspendedTransaction = &txManager->suspend();
+ tx_ = &txManager->suspend();
LOG(" Out SuspendTransactionThread");
getHelper()
@@ -399,32 +393,35 @@ class SuspendTransactionThread : public ACE_Task_Base {
->getPoolManager()
.find("__TESTPOOL1_")
->releaseThreadLocalConnection();
+ }
- return 0;
+ void start() {
+ thread_ = std::thread{[this]() { run(); }};
}
- void start() { activate(); }
- void stop() { wait(); }
- TransactionId& getSuspendedTx() { return *m_suspendedTransaction; }
+
+ void stop() {
+ if (thread_.joinable()) {
+ thread_.join();
+ }
+ }
+
+ TransactionId& getSuspendedTx() { return *tx_; }
+
+ protected:
+ bool sleep_;
+ std::thread thread_;
+ binary_semaphore& sem_;
+ TransactionId* tx_{nullptr};
};
-class ResumeTransactionThread : public ACE_Task_Base {
- private:
- TransactionId& m_suspendedTransaction;
- bool m_commit;
- bool m_tryResumeWithSleep;
- bool m_isFailed;
- std::string m_error;
- ACE_Auto_Event* m_txEvent;
+class ResumeTransactionThread {
public:
- ResumeTransactionThread(TransactionId& suspendedTransaction, bool commit,
- bool tryResumeWithSleep, ACE_Auto_Event* txEvent)
- : m_suspendedTransaction(suspendedTransaction),
- m_commit(commit),
- m_tryResumeWithSleep(tryResumeWithSleep),
- m_isFailed(false),
- m_txEvent(txEvent) {}
-
- int svc(void) override {
+
+ ResumeTransactionThread(TransactionId& tx, bool commit, bool sleep,
+ binary_semaphore& event)
+ : tx_(tx), commit_{commit}, sleep_(sleep), failed_{false}, sem_{event} {}
+
+ void run() {
LOG("In ResumeTransactionThread");
auto regPtr0 = getHelper()->getRegion(regionNames[0]);
@@ -448,25 +445,25 @@ class ResumeTransactionThread : public ACE_Task_Base {
"found in region.");
auto txManager = getHelper()->getCache()->getCacheTransactionManager();
- if (m_tryResumeWithSleep) {
- THREADERRORCHECK(!txManager->isSuspended(m_suspendedTransaction),
+ if (sleep_) {
+ THREADERRORCHECK(!txManager->isSuspended(tx_),
"In ResumeTransactionThread - the transaction should "
"NOT be in suspended state");
} else {
- THREADERRORCHECK(txManager->isSuspended(m_suspendedTransaction),
+ THREADERRORCHECK(txManager->isSuspended(tx_),
"In ResumeTransactionThread - the transaction should be "
"in suspended state");
}
THREADERRORCHECK(
- txManager->exists(m_suspendedTransaction),
+ txManager->exists(tx_),
"In ResumeTransactionThread - the transaction should exist");
- if (m_tryResumeWithSleep) {
- m_txEvent->signal();
- txManager->tryResume(m_suspendedTransaction, std::chrono::seconds(30));
+ if (sleep_) {
+ sem_.release();
+ txManager->tryResume(tx_, std::chrono::seconds(30));
} else {
- txManager->resume(m_suspendedTransaction);
+ txManager->resume(tx_);
}
THREADERRORCHECK(
@@ -478,7 +475,7 @@ class ResumeTransactionThread : public ACE_Task_Base {
createEntry(regionNames[1], keys[6], vals[6]);
- if (m_commit) {
+ if (commit_) {
txManager->commit();
THREADERRORCHECK(
regPtr0->containsKeyOnServer(keyPtr4),
@@ -503,7 +500,7 @@ class ResumeTransactionThread : public ACE_Task_Base {
"found in region.");
}
- if (m_commit) {
+ if (commit_) {
regPtr1->destroy(keyPtr6);
regPtr1->destroy(keyPtr5);
regPtr0->destroy(keyPtr4);
@@ -537,18 +534,38 @@ class ResumeTransactionThread : public ACE_Task_Base {
LOG("Got expected EntryNotFoundException for keyPtr4");
}
}
+
getHelper()
->getCache()
->getPoolManager()
.find("__TESTPOOL1_")
->releaseThreadLocalConnection();
LOG(" Out ResumeTransactionThread");
- return 0;
}
- void start() { activate(); }
- void stop() { wait(); }
- bool isFailed() { return m_isFailed; }
- std::string getError() { return m_error; }
+
+ void start() {
+ thread_ = std::thread{[this]() { run(); }};
+ }
+
+ void stop() {
+ if (thread_.joinable()) {
+ thread_.join();
+ }
+ }
+
+ bool isFailed() { return failed_; }
+
+ std::string getError() { return error_; }
+
+ protected:
+ TransactionId& tx_;
+ bool commit_;
+ bool sleep_;
+ bool failed_;
+ std::string error_;
+
+ std::thread thread_;
+ binary_semaphore& sem_;
};
DUNIT_TASK_DEFINITION(SERVER1, CreateServer1)
@@ -769,75 +786,78 @@ DUNIT_TASK_DEFINITION(CLIENT1, SuspendResumeRollback)
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, SuspendResumeInThread)
{
- // start suspend thread and resume thread and rollback immedidately
- LOG("start suspend thread and resume thread and rollback immedidately");
- ACE_Auto_Event txEvent;
-
- SuspendTransactionThread* suspendTh =
- new SuspendTransactionThread(false, &txEvent);
- suspendTh->activate();
- std::this_thread::sleep_for(std::chrono::seconds(2));
- ResumeTransactionThread* resumeTh = new ResumeTransactionThread(
- suspendTh->getSuspendedTx(), false, false, &txEvent);
- resumeTh->activate();
-
- suspendTh->wait();
- delete suspendTh;
- resumeTh->wait();
- ASSERT(!resumeTh->isFailed(), resumeTh->getError());
- delete resumeTh;
-
- // start suspend thread and resume thread and commit immedidately
- LOG("start suspend thread and resume thread and commit immedidately");
- suspendTh = new SuspendTransactionThread(false, &txEvent);
- suspendTh->activate();
- std::this_thread::sleep_for(std::chrono::seconds(2));
- resumeTh = new ResumeTransactionThread(suspendTh->getSuspendedTx(), true,
- false, &txEvent);
- resumeTh->activate();
-
- suspendTh->wait();
- delete suspendTh;
- resumeTh->wait();
- ASSERT(!resumeTh->isFailed(), resumeTh->getError());
- delete resumeTh;
-
- // start suspend thread and tryresume thread with rollback. make tryResume
- // to
- // sleep
- LOG("start suspend thread and tryresume thread with rollback. make "
+ LOG("Start suspend thread and resume thread and rollback immedidately");
+
+ {
+ binary_semaphore event{0};
+ SuspendTransactionThread suspend{false, event};
+ suspend.start();
+
+ std::this_thread::sleep_for(std::chrono::seconds{2});
+ ResumeTransactionThread resume{suspend.getSuspendedTx(), false, false,
+ event};
+ resume.start();
+
+ suspend.stop();
+ resume.stop();
+
+ ASSERT(!resume.isFailed(), resume.getError());
+ }
+
+ LOG("Start suspend thread and resume thread and commit immedidately");
+ {
+ binary_semaphore event{0};
+ SuspendTransactionThread suspend{false, event};
+
+ suspend.start();
+ std::this_thread::sleep_for(std::chrono::seconds{2});
+ ResumeTransactionThread resume{suspend.getSuspendedTx(), true, false,
+ event};
+ resume.start();
+
+ suspend.stop();
+ resume.stop();
+ ASSERT(!resume.isFailed(), resume.getError());
+ }
+
+ LOG("Start suspend thread and tryresume thread with rollback. make "
"tryResume to sleep");
- suspendTh = new SuspendTransactionThread(true, &txEvent);
- suspendTh->activate();
- std::this_thread::sleep_for(std::chrono::seconds(2));
- resumeTh = new ResumeTransactionThread(suspendTh->getSuspendedTx(), false,
- true, &txEvent);
- resumeTh->activate();
-
- suspendTh->wait();
- delete suspendTh;
- resumeTh->wait();
- ASSERT(!resumeTh->isFailed(), resumeTh->getError());
- delete resumeTh;
-
- // start suspend thread and tryresume thread with commit. make tryResume to
- // sleep
- LOG("start suspend thread and tryresume thread with commit. make "
+ {
+ binary_semaphore event{0};
+ SuspendTransactionThread suspend{true, event};
+ suspend.start();
+
+ std::this_thread::sleep_for(std::chrono::seconds{2});
+ ResumeTransactionThread resume{suspend.getSuspendedTx(), false, true,
+ event};
+ resume.start();
+
+ suspend.stop();
+ resume.stop();
+
+ ASSERT(!resume.isFailed(), resume.getError());
+ }
+
+ LOG("Start suspend thread and tryresume thread with commit. make "
"tryResume to sleep");
- suspendTh = new SuspendTransactionThread(true, &txEvent);
- suspendTh->activate();
- std::this_thread::sleep_for(std::chrono::seconds(2));
- LOG("suspendTh->activate();");
-
- resumeTh = new ResumeTransactionThread(suspendTh->getSuspendedTx(), true,
- true, &txEvent);
- resumeTh->activate();
-
- suspendTh->wait();
- delete suspendTh;
- resumeTh->wait();
- ASSERT(!resumeTh->isFailed(), resumeTh->getError());
- delete resumeTh;
+
+ {
+ binary_semaphore event{0};
+ SuspendTransactionThread suspend{true, event};
+ suspend.start();
+
+ std::this_thread::sleep_for(std::chrono::seconds{2});
+ LOG("suspendTh->activate();");
+
+ ResumeTransactionThread resume{suspend.getSuspendedTx(), true, true,
+ event};
+ resume.start();
+
+ suspend.stop();
+ resume.stop();
+
+ ASSERT(!resume.isFailed(), resume.getError());
+ }
}
END_TASK_DEFINITION
diff --git a/cppcache/integration-test/ThinClientTransactionsXA.hpp b/cppcache/integration-test/ThinClientTransactionsXA.hpp
index faf335e..e3563f7 100644
--- a/cppcache/integration-test/ThinClientTransactionsXA.hpp
+++ b/cppcache/integration-test/ThinClientTransactionsXA.hpp
@@ -21,9 +21,6 @@
#define GEODE_INTEGRATION_TEST_THINCLIENTTRANSACTIONSXA_H_
#include "fw_dunit.hpp"
-#include <ace/Auto_Event.h>
-#include <ace/OS.h>
-#include <ace/High_Res_Timer.h>
#include <string>
#include <geode/TransactionId.hpp>
@@ -34,8 +31,11 @@
#include "CacheHelper.hpp"
+#include "util/concurrent/binary_semaphore.hpp"
+
namespace { // NOLINT(google-build-namespaces)
+using apache::geode::client::binary_semaphore;
using apache::geode::client::CacheableKey;
using apache::geode::client::CacheableString;
using apache::geode::client::CacheHelper;
@@ -185,8 +185,8 @@ void createPooledRegion(const std::string& name, bool ackMode,
bool cachingEnable = true) {
LOG("createRegion_Pool() entered.");
std::cout << "Creating region -- " << name << " ackMode is " << ackMode
- << "\n"
- << std::flush;
+ << "\n" << std::flush;
+
auto regPtr =
getHelper()->createPooledRegion(name, ackMode, locators, poolname,
cachingEnable, clientNotificationEnabled);
@@ -200,9 +200,10 @@ void createPooledRegionSticky(const std::string& name, bool ackMode,
bool clientNotificationEnabled = false,
bool cachingEnable = true) {
LOG("createRegion_Pool() entered.");
+
std::cout << "Creating region -- " << name << " ackMode is " << ackMode
- << "\n"
- << std::flush;
+ << "\n" << std::flush;
+
auto regPtr = getHelper()->createPooledRegionSticky(
name, ackMode, locators, poolname, cachingEnable,
clientNotificationEnabled);
@@ -355,23 +356,18 @@ const bool NO_ACK = false;
#define THREADERRORCHECK(x, y) \
do { \
if (!(x)) { \
- m_isFailed = true; \
- m_error = y; \
- return -1; \
+ failed_ = true; \
+ error_ = y; \
+ return; \
} \
} while (0)
-class SuspendTransactionThread : public ACE_Task_Base {
- private:
- TransactionId* m_suspendedTransaction;
- bool m_sleep;
- ACE_Auto_Event* m_txEvent;
-
+class SuspendTransactionThread {
public:
- SuspendTransactionThread(bool sleep, ACE_Auto_Event* txEvent)
- : m_suspendedTransaction(nullptr), m_sleep(sleep), m_txEvent(txEvent) {}
+ SuspendTransactionThread(bool sleep, binary_semaphore& event)
+ : sleep_{sleep}, tx_{nullptr}, sem_(event) {}
- int svc(void) override {
+ void run() {
LOG(" In SuspendTransactionThread");
auto txManager = getHelper()->getCache()->getCacheTransactionManager();
@@ -380,14 +376,14 @@ class SuspendTransactionThread : public ACE_Task_Base {
createEntry(regionNames[0], keys[4], vals[4]);
- m_suspendedTransaction = &txManager->getTransactionId();
+ tx_ = &txManager->getTransactionId();
- if (m_sleep) {
- m_txEvent->wait();
- std::this_thread::sleep_for(std::chrono::seconds(5));
+ if (sleep_) {
+ sem_.acquire();
+ std::this_thread::sleep_for(std::chrono::seconds{5});
}
- m_suspendedTransaction = &txManager->suspend();
+ tx_ = &txManager->suspend();
LOG(" Out SuspendTransactionThread");
getHelper()
@@ -395,32 +391,38 @@ class SuspendTransactionThread : public ACE_Task_Base {
->getPoolManager()
.find("__TESTPOOL1_")
->releaseThreadLocalConnection();
+ }
- return 0;
+ void start() {
+ thread_ = std::thread{[this]() { run(); }};
}
- void start() { activate(); }
- void stop() { wait(); }
- TransactionId& getSuspendedTx() { return *m_suspendedTransaction; }
+
+ void stop() {
+ if (thread_.joinable()) {
+ thread_.join();
+ }
+ }
+
+ TransactionId& getSuspendedTx() { return *tx_; }
+
+ protected:
+ bool sleep_;
+ TransactionId* tx_;
+ std::thread thread_;
+ binary_semaphore& sem_;
};
-class ResumeTransactionThread : public ACE_Task_Base {
- private:
- TransactionId& m_suspendedTransaction;
- bool m_commit;
- bool m_tryResumeWithSleep;
- bool m_isFailed;
- std::string m_error;
- ACE_Auto_Event* m_txEvent;
+class ResumeTransactionThread {
public:
ResumeTransactionThread(TransactionId& suspendedTransaction, bool commit,
- bool tryResumeWithSleep, ACE_Auto_Event* txEvent)
- : m_suspendedTransaction(suspendedTransaction),
- m_commit(commit),
- m_tryResumeWithSleep(tryResumeWithSleep),
- m_isFailed(false),
- m_txEvent(txEvent) {}
-
- int svc(void) override {
+ bool tryResumeWithSleep, binary_semaphore& event)
+ : commit_{commit},
+ sleep_{tryResumeWithSleep},
+ failed_{false},
+ tx_{suspendedTransaction},
+ sem_{event} {}
+
+ void run() {
LOG("In ResumeTransactionThread");
auto regPtr0 = getHelper()->getRegion(regionNames[0]);
@@ -436,32 +438,32 @@ class ResumeTransactionThread : public ACE_Task_Base {
"found in region.");
auto txManager = getHelper()->getCache()->getCacheTransactionManager();
- if (m_tryResumeWithSleep) {
- THREADERRORCHECK(!txManager->isSuspended(m_suspendedTransaction),
+ if (sleep_) {
+ THREADERRORCHECK(!txManager->isSuspended(tx_),
"In ResumeTransactionThread - the transaction should "
"NOT be in suspended state");
} else {
- THREADERRORCHECK(txManager->isSuspended(m_suspendedTransaction),
+ THREADERRORCHECK(txManager->isSuspended(tx_),
"In ResumeTransactionThread - the transaction should be "
"in suspended state");
}
THREADERRORCHECK(
- txManager->exists(m_suspendedTransaction),
+ txManager->exists(tx_),
"In ResumeTransactionThread - the transaction should exist");
- if (m_tryResumeWithSleep) {
- m_txEvent->signal();
- txManager->tryResume(m_suspendedTransaction, std::chrono::seconds(30));
+ if (sleep_) {
+ sem_.release();
+ txManager->tryResume(tx_, std::chrono::seconds{30});
} else {
- txManager->resume(m_suspendedTransaction);
+ txManager->resume(tx_);
}
THREADERRORCHECK(
regPtr0->containsKeyOnServer(keyPtr4),
"In ResumeTransactionThread - Key should have been found in region.");
- if (m_commit) {
+ if (commit_) {
txManager->prepare();
txManager->commit();
THREADERRORCHECK(
@@ -475,7 +477,7 @@ class ResumeTransactionThread : public ACE_Task_Base {
"found in region.");
}
- if (m_commit) {
+ if (commit_) {
regPtr0->destroy(keyPtr4);
THREADERRORCHECK(!regPtr0->containsKeyOnServer(keyPtr4),
@@ -489,18 +491,36 @@ class ResumeTransactionThread : public ACE_Task_Base {
LOG("Got expected EntryNotFoundException for keyPtr4");
}
}
+
getHelper()
->getCache()
->getPoolManager()
.find("__TESTPOOL1_")
->releaseThreadLocalConnection();
- LOG(" Out ResumeTransactionThread");
- return 0;
+ LOG("Out ResumeTransactionThread");
+ }
+
+ void start() {
+ thread_ = std::thread{[this]() { run(); }};
+ }
+
+ void stop() {
+ if (thread_.joinable()) {
+ thread_.join();
+ }
}
- void start() { activate(); }
- void stop() { wait(); }
- bool isFailed() { return m_isFailed; }
- std::string getError() { return m_error; }
+
+ bool isFailed() { return failed_; }
+ const std::string& getError() { return error_; }
+
+ protected:
+ bool commit_;
+ bool sleep_;
+ bool failed_;
+ std::string error_;
+ TransactionId& tx_;
+ std::thread thread_;
+ binary_semaphore& sem_;
};
DUNIT_TASK_DEFINITION(SERVER1, CreateServer1)
@@ -682,71 +702,71 @@ DUNIT_TASK_DEFINITION(CLIENT1, SuspendResumeRollback)
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, SuspendResumeInThread)
{
- // start suspend thread and resume thread and rollback immedidately
- LOG("start suspend thread and resume thread and rollback immedidately");
- ACE_Auto_Event txEvent;
-
- SuspendTransactionThread* suspendTh =
- new SuspendTransactionThread(false, &txEvent);
- suspendTh->activate();
- std::this_thread::sleep_for(std::chrono::seconds(2));
- ResumeTransactionThread* resumeTh = new ResumeTransactionThread(
- suspendTh->getSuspendedTx(), false, false, &txEvent);
- resumeTh->activate();
-
- suspendTh->wait();
- delete suspendTh;
- resumeTh->wait();
- delete resumeTh;
-
- // start suspend thread and resume thread and commit immedidately
- LOG("start suspend thread and resume thread and commit immedidately");
- suspendTh = new SuspendTransactionThread(false, &txEvent);
- suspendTh->activate();
- std::this_thread::sleep_for(std::chrono::seconds(2));
- resumeTh = new ResumeTransactionThread(suspendTh->getSuspendedTx(), true,
- false, &txEvent);
- resumeTh->activate();
-
- suspendTh->wait();
- delete suspendTh;
- resumeTh->wait();
- delete resumeTh;
-
- // start suspend thread and tryresume thread with rollback. make tryResume
- // to
- // sleep
- LOG("start suspend thread and tryresume thread with rollback. make "
+ LOG("Start suspend thread and resume thread and rollback immediately");
+ {
+ binary_semaphore event{0};
+ SuspendTransactionThread suspend{false, event};
+
+ suspend.start();
+ std::this_thread::sleep_for(std::chrono::seconds{2});
+
+ ResumeTransactionThread resume{suspend.getSuspendedTx(), false, false,
+ event};
+ resume.start();
+
+ suspend.stop();
+ resume.stop();
+ }
+
+ LOG("Start suspend thread and resume thread and commit immediately");
+ {
+ binary_semaphore event{0};
+ SuspendTransactionThread suspend{false, event};
+
+ suspend.start();
+ std::this_thread::sleep_for(std::chrono::seconds{2});
+
+ ResumeTransactionThread resume{suspend.getSuspendedTx(), true, false,
+ event};
+ resume.start();
+
+ suspend.stop();
+ resume.stop();
+ }
+
+ LOG("Start suspend thread and tryresume thread with rollback. make "
"tryResume to sleep");
- suspendTh = new SuspendTransactionThread(true, &txEvent);
- suspendTh->activate();
- std::this_thread::sleep_for(std::chrono::seconds(2));
- resumeTh = new ResumeTransactionThread(suspendTh->getSuspendedTx(), false,
- true, &txEvent);
- resumeTh->activate();
-
- suspendTh->wait();
- delete suspendTh;
- resumeTh->wait();
- delete resumeTh;
-
- // start suspend thread and tryresume thread with commit. make tryResume to
- // sleep
- LOG("start suspend thread and tryresume thread with commit. make "
+ {
+ binary_semaphore event{0};
+ SuspendTransactionThread suspend{true, event};
+
+ suspend.start();
+ std::this_thread::sleep_for(std::chrono::seconds{2});
+
+ ResumeTransactionThread resume{suspend.getSuspendedTx(), false, true,
+ event};
+ resume.start();
+
+ suspend.stop();
+ resume.stop();
+ }
+
+ LOG("Start suspend thread and tryresume thread with commit. make "
"tryResume to sleep");
- suspendTh = new SuspendTransactionThread(true, &txEvent);
- suspendTh->activate();
- std::this_thread::sleep_for(std::chrono::seconds(2));
- LOG("suspendTh->activate();");
-
- resumeTh = new ResumeTransactionThread(suspendTh->getSuspendedTx(), true,
- true, &txEvent);
- resumeTh->activate();
-
- suspendTh->wait();
- delete suspendTh;
- resumeTh->wait();
- delete resumeTh;
+ {
+ binary_semaphore event{0};
+ SuspendTransactionThread suspend{true, event};
+
+ suspend.start();
+ std::this_thread::sleep_for(std::chrono::seconds{2});
+
+ ResumeTransactionThread resume{suspend.getSuspendedTx(), true, true,
+ event};
+ resume.start();
+
+ suspend.stop();
+ resume.stop();
+ }
}
END_TASK_DEFINITION
diff --git a/cppcache/integration-test/ThinClientVersionedOps.hpp b/cppcache/integration-test/ThinClientVersionedOps.hpp
index 95e970c..cc37642 100644
--- a/cppcache/integration-test/ThinClientVersionedOps.hpp
+++ b/cppcache/integration-test/ThinClientVersionedOps.hpp
@@ -32,10 +32,10 @@
// This is the test for tracking work. bug#304
-putThread *thread1 = nullptr;
-putThread *thread2 = nullptr;
-putThread *thread3 = nullptr;
-putThread *thread4 = nullptr;
+PutThread* thread1 = nullptr;
+PutThread* thread2 = nullptr;
+PutThread* thread3 = nullptr;
+PutThread* thread4 = nullptr;
const char *regNames[] = {"DistRegionAck", "DistRegionNoAck"};
const char *group1 = "A";
@@ -185,7 +185,7 @@ END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, threadPutonClient1)
{
auto rptr = getHelper()->getRegion(regNames[0]);
- thread4 = new putThread(rptr, false);
+ thread4 = new PutThread(rptr, false);
thread4->setParams(0, 10, 1, true, false, 0);
thread4->start();
LOG("Task: threadPutonClient1 Done");
@@ -309,7 +309,7 @@ END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, PutOnClient1)
{
auto rptr = getHelper()->getRegion(regNames[0]);
- thread1 = new putThread(rptr, false);
+ thread1 = new PutThread(rptr, false);
thread1->setParams(0, 5, 1, true, false, 1);
thread1->start();
LOG("PUT ops done on client 1");
@@ -319,7 +319,7 @@ END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, PutOnClient2)
{
auto rptr = getHelper()->getRegion(regNames[0]);
- thread2 = new putThread(rptr, false);
+ thread2 = new PutThread(rptr, false);
thread2->setParams(0, 5, 1, false, false, 0); // 0, 5, 1, false, false, 0
thread2->start();
LOG("PUT ops done on client 2");
@@ -329,7 +329,7 @@ END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, testServerGC)
{
auto rptr = getHelper()->getRegion(regNames[0]);
- thread3 = new putThread(rptr, false);
+ thread3 = new PutThread(rptr, false);
thread3->setParams(0, 5000, 1, true, false, 0); // 0, 5, 1, false, false, 0
thread3->start();
LOG("5000 PUT ops done on client 1");
diff --git a/cppcache/integration-test/TimeBomb.cpp b/cppcache/integration-test/TimeBomb.cpp
new file mode 100644
index 0000000..9bb987a
--- /dev/null
+++ b/cppcache/integration-test/TimeBomb.cpp
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TimeBomb.hpp"
+
+#include <iostream>
+
+TimeBomb::TimeBomb(const std::chrono::milliseconds& sleep,
+ std::function<void()> cleanup)
+ : enabled_{false}, callback_{cleanup}, sleep_{sleep} {}
+
+TimeBomb::~TimeBomb() noexcept {
+ if (enabled_) {
+ disarm();
+ }
+}
+
+void TimeBomb::arm() {
+ enabled_ = true;
+ thread_ = std::thread{[this] { run(); }};
+}
+
+void TimeBomb::disarm() {
+ enabled_ = false;
+ cv_.notify_all();
+ thread_.join();
+}
+
+void TimeBomb::run() {
+ std::clog << "TimeBomb armed to trigger in " << sleep_.count() << " ms"
+ << std::endl;
+ {
+ std::unique_lock<decltype(mutex_)> lock{mutex_};
+ cv_.wait_for(lock, sleep_);
+ }
+
+ if (enabled_) {
+ std::clog << "####### ERROR: TIMEBOMB WENT OFF, TEST TIMED OUT ########"
+ << std::endl
+ << std::flush;
+ callback_();
+ exit(-1);
+ } else {
+ std::clog << "###### TIMEBOMB Disabled ######" << std::endl << std::flush;
+ }
+}
diff --git a/cppcache/integration-test/TimeBomb.hpp b/cppcache/integration-test/TimeBomb.hpp
index ae466f9..c54dedd 100644
--- a/cppcache/integration-test/TimeBomb.hpp
+++ b/cppcache/integration-test/TimeBomb.hpp
@@ -20,103 +20,31 @@
#ifndef GEODE_INTEGRATION_TEST_TIMEBOMB_H_
#define GEODE_INTEGRATION_TEST_TIMEBOMB_H_
-#include <iostream>
+#include <condition_variable>
+#include <functional>
+#include <mutex>
#include <thread>
-#include <ace/Task.h>
-#include <assert.h>
-
-
-#include "Utils.hpp"
-
-#define MAX_CLIENT 10
-
-class ClientCleanup {
- private:
- void (*m_cleanupCallback[MAX_CLIENT])();
- int m_numberOfClient;
-
+class TimeBomb {
public:
- ClientCleanup() : m_numberOfClient(0) {}
-
- void callClientCleanup() {
- std::cout << "callClientCleanup ... " << m_numberOfClient << "\n";
- for (int i = 0; i < m_numberOfClient; i++) {
- try {
- m_cleanupCallback[i]();
- } catch (...) {
- }
- }
- }
-
- bool registerCallback(void (*cleanupFunc)()) {
- if (m_numberOfClient < MAX_CLIENT) {
- m_cleanupCallback[m_numberOfClient++] = cleanupFunc;
- return true;
- }
- return false;
- }
-};
-
-// Automatic stack variable that exits the process after
-// a time specified in the environment.
-
-class TimeBomb : public ACE_Task_Base {
- private:
- // UNUSED int m_numberOfClient;
- void (*m_cleanupCallback)();
- void callClientCleanup() {
- if (m_cleanupCallback != nullptr) m_cleanupCallback();
- }
-
- public:
- std::chrono::seconds m_sleep;
-
- explicit TimeBomb(void (*cleanupFunc)() = nullptr)
- : m_sleep(0) /* UNUSED , m_numberOfClient( -1 )*/
- {
- std::string sleepEnv = apache::geode::client::Utils::getEnv("TIMEBOMB");
- if (!sleepEnv.empty()) {
- m_sleep = std::chrono::seconds{std::stoi(sleepEnv)};
- }
-
- m_cleanupCallback = cleanupFunc;
- arm(); // starting
- }
-
- int arm() {
- int thrAttrs = THR_NEW_LWP | THR_DETACHED;
-#ifndef WIN32
- thrAttrs |= THR_INHERIT_SCHED;
-#endif
- return activate(thrAttrs, 1);
- }
-
- int svc() override {
- if (m_sleep == std::chrono::seconds{}) {
- std::cout << "###### TIMEBOMB Disabled. ######\n";
- fflush(stdout);
- return 0;
- }
-
- auto start = std::chrono::steady_clock::now();
- decltype(start) now;
+ explicit TimeBomb(const std::chrono::milliseconds& sleep,
+ std::function<void()> cleanup);
- do {
- std::this_thread::sleep_for(std::chrono::seconds(1));
- now = std::chrono::steady_clock::now();
- } while ((now - start) < m_sleep);
+ ~TimeBomb() noexcept;
- std::cout << "####### ERROR: TIMEBOMB WENT OFF, TEST TIMED OUT ########\n";
- fflush(stdout);
+ void arm();
+ void disarm();
- callClientCleanup();
+ void run();
- exit(-1);
- return 0;
- }
+ protected:
+ bool enabled_;
+ std::mutex mutex_;
+ std::condition_variable cv_;
- ~TimeBomb() noexcept override = default;
+ std::thread thread_;
+ std::function<void()> callback_;
+ std::chrono::milliseconds sleep_;
};
#endif // GEODE_INTEGRATION_TEST_TIMEBOMB_H_
diff --git a/cppcache/integration-test/fw_dunit.cpp b/cppcache/integration-test/fw_dunit.cpp
index aa2ac4d..c102be7 100644
--- a/cppcache/integration-test/fw_dunit.cpp
+++ b/cppcache/integration-test/fw_dunit.cpp
@@ -44,6 +44,9 @@
#define __DUNIT_NO_MAIN__
#include "fw_dunit.hpp"
+#include "Utils.hpp"
+
+namespace bp = boost::process;
namespace bip = boost::interprocess;
static std::string g_programName;
@@ -720,15 +723,17 @@ void log(std::string s, int lineno, const char * /*filename*/) {
<< std::flush;
}
-void cleanup() { gClientCleanup.callClientCleanup(); }
-
int dmain(int argc, char *argv[]) {
+ using apache::geode::client::Utils;
+
#ifdef USE_SMARTHEAP
MemRegisterTask();
#endif
setupCRTOutput();
- TimeBomb tb(&cleanup);
- // tb->arm(); // leak this on purpose.
+ auto timebomb = std::chrono::seconds{std::stoi(Utils::getEnv("TIMEBOMB"))};
+ TimeBomb tb(timebomb, []() { gClientCleanup.trigger(); });
+ tb.arm();
+
try {
g_programName = argv[0];
const ACE_TCHAR options[] = ACE_TEXT("s:m:");
@@ -774,13 +779,13 @@ int dmain(int argc, char *argv[]) {
fflush(stdout);
}
+
std::cout << "final worker id " << workerId << ", result " << result
<< "\n";
std::cout << "before calling cleanup " << workerId << "\n";
- gClientCleanup.callClientCleanup();
+ gClientCleanup.trigger();
std::cout << "after calling cleanup\n";
return result;
-
} catch (dunit::TestException &te) {
te.print();
} catch (apache::geode::client::testframework::FwkException &fe) {
@@ -794,7 +799,7 @@ int dmain(int argc, char *argv[]) {
<< std::flush;
}
- gClientCleanup.callClientCleanup();
+ gClientCleanup.trigger();
return 1;
}
diff --git a/cppcache/integration-test/fw_dunit.hpp b/cppcache/integration-test/fw_dunit.hpp
index eb9e486..13c9fe2 100644
--- a/cppcache/integration-test/fw_dunit.hpp
+++ b/cppcache/integration-test/fw_dunit.hpp
@@ -118,9 +118,12 @@ END_TASK(validate)
#include <iostream>
#include <string>
+#include <geode/Exception.hpp>
+
#include <boost/interprocess/managed_shared_memory.hpp>
#include <signal.h>
+#include "ClientCleanup.hpp"
#include "TimeBomb.hpp"
#define ASSERT(x, y) \
diff --git a/cppcache/integration-test/testExpiration.cpp b/cppcache/integration-test/testExpiration.cpp
index 76834b2..655b361 100644
--- a/cppcache/integration-test/testExpiration.cpp
+++ b/cppcache/integration-test/testExpiration.cpp
@@ -17,6 +17,7 @@
#include <geode/ExpirationAction.hpp>
#include <geode/Region.hpp>
+#include <geode/SystemProperties.hpp>
#include "fw_helper.hpp"
#include "CacheRegionHelper.hpp"
diff --git a/cppcache/integration-test/testOverflowPutGetSqLite.cpp b/cppcache/integration-test/testOverflowPutGetSqLite.cpp
index c019a8a..98ea64f 100644
--- a/cppcache/integration-test/testOverflowPutGetSqLite.cpp
+++ b/cppcache/integration-test/testOverflowPutGetSqLite.cpp
@@ -247,28 +247,34 @@ void testEntryInvalidate(std::shared_ptr<Region> ®ionPtr, uint32_t num) {
ASSERT(v.size() == num, "size of key vec not equal");
}
-class PutThread : public ACE_Task_Base {
- private:
- std::shared_ptr<Region> m_regPtr;
- int m_min;
- int m_max;
-
+class PutThread {
public:
- PutThread(std::shared_ptr<Region> ®Ptr, int min, int max)
- : m_regPtr(regPtr), m_min(min), m_max(max) {}
+ PutThread(std::shared_ptr<Region> region, int min, int max)
+ : min_{min}, max_{max}, region_{region} {}
- int svc(void) override {
+ void run() {
/** put some values into the cache. */
- doNput(m_regPtr, m_max, m_min);
+ doNput(region_, max_, min_);
/** do some gets... printing what we find in the cache. */
- doNget(m_regPtr, m_max, m_min);
+ doNget(region_, max_, min_);
LOG("Completed doNget");
- return 0;
}
- void start() { activate(); }
+ void start() {
+ thread_ = std::thread{[this]() { run(); }};
+ }
+
+ void stop() {
+ if (thread_.joinable()) {
+ thread_.join();
+ }
+ }
- void stop() { wait(); }
+ protected:
+ int min_;
+ int max_;
+ std::thread thread_;
+ std::shared_ptr<Region> region_;
};
void verifyGetAll(std::shared_ptr<Region> region, int startIndex) {
diff --git a/cppcache/integration-test/testRegionAccessThreadSafe.cpp b/cppcache/integration-test/testRegionAccessThreadSafe.cpp
index 52fbbe4..6943c95 100644
--- a/cppcache/integration-test/testRegionAccessThreadSafe.cpp
+++ b/cppcache/integration-test/testRegionAccessThreadSafe.cpp
@@ -16,32 +16,27 @@
*/
#include "fw_dunit.hpp"
#include "ThinClientHelper.hpp"
-#include <ace/Task.h>
+
+#include <atomic>
using apache::geode::client::Exception;
-class GetRegionThread : public ACE_Task_Base {
+class GetRegionThread {
public:
- bool m_running;
- std::string m_path;
- std::string m_subPath;
- bool m_regionCreateDone;
- bool m_subRegionCreateDone;
- std::recursive_mutex mutex_;
GetRegionThread(const char *path, const char *subPath)
: m_running(false),
m_path(path),
m_subPath(subPath),
m_regionCreateDone(false),
m_subRegionCreateDone(false) {}
- int svc(void) override {
- while (m_running == true) {
+
+ void run() {
+ while (m_running) {
SLEEP(40);
try {
auto rptr = getHelper()->getRegion(m_path.c_str());
if (rptr != nullptr) {
- std::lock_guard<decltype(mutex_)> guard{mutex_};
- ASSERT(m_regionCreateDone == true, "regionCreate Not Done");
+ ASSERT(m_regionCreateDone, "regionCreate Not Done");
}
} catch (Exception &ex) {
LOG(ex.what());
@@ -56,9 +51,8 @@ class GetRegionThread : public ACE_Task_Base {
try {
auto rptr = getHelper()->getRegion(m_subPath.c_str());
if (rptr != nullptr) {
- std::lock_guard<decltype(mutex_)> guard{mutex_};
- ASSERT(m_subRegionCreateDone == true, "subRegionCreate Not Done");
- return 0;
+ ASSERT(m_subRegionCreateDone, "subRegionCreate Not Done");
+ return;
}
} catch (Exception &ex) {
LOG(ex.what());
@@ -68,24 +62,31 @@ class GetRegionThread : public ACE_Task_Base {
LOG("getRegion: unknown exception");
}
}
- return 0;
- }
- void setRegionFlag() {
- std::lock_guard<decltype(mutex_)> guard{mutex_};
- m_regionCreateDone = true;
- }
- void setSubRegionFlag() {
- std::lock_guard<decltype(mutex_)> guard{mutex_};
- m_subRegionCreateDone = true;
}
+
+ void setRegionFlag() { m_regionCreateDone = true; }
+
+ void setSubRegionFlag() { m_subRegionCreateDone = true; }
+
void start() {
m_running = true;
- activate();
+ thread_ = std::thread{[this]() { run(); }};
}
+
void stop() {
m_running = false;
- wait();
+ if (thread_.joinable()) {
+ thread_.join();
+ }
}
+
+ protected:
+ bool m_running;
+ std::string m_path;
+ std::thread thread_;
+ std::string m_subPath;
+ std::atomic<bool> m_regionCreateDone;
+ std::atomic<bool> m_subRegionCreateDone;
};
static int numberOfLocators = 1;
diff --git a/cppcache/integration-test/testSpinLock.cpp b/cppcache/integration-test/testSpinLock.cpp
index 14c4efe..b2f2242 100644
--- a/cppcache/integration-test/testSpinLock.cpp
+++ b/cppcache/integration-test/testSpinLock.cpp
@@ -18,42 +18,13 @@
#include "fw_dunit.hpp"
#include <mutex>
-#include <condition_variable>
-#include <util/concurrent/spinlock_mutex.hpp>
-#include <ace/Task.h>
-#include <ace/Time_Value.h>
-#include <ace/Guard_T.h>
+#include "util/concurrent/binary_semaphore.hpp"
+#include "util/concurrent/spinlock_mutex.hpp"
namespace { // NOLINT(google-build-namespaces)
-class semaphore {
- public:
- explicit semaphore(bool released) : released_(released) {}
-
- void release() {
- std::lock_guard<std::mutex> lock(mutex_);
- released_ = true;
- cv_.notify_one();
- }
-
- void acquire() {
- std::unique_lock<std::mutex> lock(mutex_);
- cv_.wait(lock, [this]() { return released_; });
- released_ = false;
- }
-
- semaphore& operator=(const semaphore& other) {
- released_ = other.released_;
- return *this;
- }
-
- protected:
- bool released_;
- std::mutex mutex_;
- std::condition_variable cv_;
-};
-
+using apache::geode::client::binary_semaphore;
using apache::geode::util::concurrent::spinlock_mutex;
DUNIT_TASK(s1p1, Basic)
@@ -63,56 +34,86 @@ DUNIT_TASK(s1p1, Basic)
}
END_TASK(Basic)
-semaphore triggerA{0};
-semaphore triggerB{0};
-semaphore triggerM{0};
-
spinlock_mutex lock;
std::chrono::steady_clock::time_point btime;
-class ThreadA : public ACE_Task_Base {
+class ThreadA {
public:
- ThreadA() : ACE_Task_Base() {}
-
- int svc() override {
- {
- std::lock_guard<spinlock_mutex> lk(lock);
- LOG("ThreadA: Acquired lock x.");
- triggerM.release();
- triggerA.acquire();
+ ThreadA(binary_semaphore& triggerA, binary_semaphore& triggerM)
+ : triggerA_{triggerA}, triggerM_{triggerM} {}
+
+ ~ThreadA() { stop(); }
+
+ void start() {
+ thread_ = std::thread{[this]() { run(); }};
+ }
+
+ void stop() {
+ if (thread_.joinable()) {
+ thread_.join();
}
+ }
+
+ protected:
+ void run() {
+ std::lock_guard<spinlock_mutex> lk(lock);
+ LOG("ThreadA: Acquired lock x.");
+ triggerM_.release();
+ triggerA_.acquire();
+
LOG("ThreadA: Released lock.");
- return 0;
}
+
+ protected:
+ std::thread thread_;
+ binary_semaphore& triggerA_;
+ binary_semaphore& triggerM_;
};
-class ThreadB : public ACE_Task_Base {
+class ThreadB {
public:
- ThreadB() : ACE_Task_Base() {}
-
- int svc() override {
- triggerB.acquire();
- {
- std::lock_guard<spinlock_mutex> lk(lock);
- btime = std::chrono::steady_clock::now();
- LOG("ThreadB: Acquired lock.");
- triggerM.release();
+ ThreadB(binary_semaphore& triggerB, binary_semaphore& triggerM)
+ : triggerB_{triggerB}, triggerM_{triggerM} {}
+
+ ~ThreadB() { stop(); }
+
+ void start() {
+ thread_ = std::thread{[this]() { run(); }};
+ }
+
+ void stop() {
+ if (thread_.joinable()) {
+ thread_.join();
}
- return 0;
}
+
+ protected:
+ void run() {
+ triggerB_.acquire();
+
+ std::lock_guard<spinlock_mutex> lk(lock);
+ btime = std::chrono::steady_clock::now();
+ LOG("ThreadB: Acquired lock.");
+ triggerM_.release();
+ }
+
+ protected:
+ std::thread thread_;
+ binary_semaphore& triggerB_;
+ binary_semaphore& triggerM_;
};
DUNIT_TASK(s1p1, TwoThreads)
{
- triggerA = semaphore{0};
- triggerB = semaphore{0};
- triggerM = semaphore{0};
+ binary_semaphore triggerA{0};
+ binary_semaphore triggerB{0};
+ binary_semaphore triggerM{0};
- ThreadA* threadA = new ThreadA();
- ThreadB* threadB = new ThreadB();
+ ThreadA threadA{triggerA, triggerM};
+ ThreadB threadB{triggerB, triggerM};
- threadA->activate();
- threadB->activate();
+ threadA.start();
+ threadB.start();
// A runs, locks the spinlock, and triggers me. B is idle.
triggerM.acquire();
@@ -130,18 +131,15 @@ DUNIT_TASK(s1p1, TwoThreads)
auto delta =
std::chrono::duration_cast<std::chrono::milliseconds>(btime - stime)
.count();
- std::string msg = "acquire delay was " + std::to_string(delta);
- LOG(msg);
+ LOG("acquire delay was " + std::to_string(delta));
ASSERT(delta >= 4900, "Expected 5 second or more spinlock delay");
// Note the test is against 4900 instead of 5000 as there are some
// measurement
// issues. Often delta comes back as 4999 on linux.
- threadA->wait();
- delete threadA;
- threadB->wait();
- delete threadB;
+ threadA.stop();
+ threadB.stop();
}
END_TASK(TwoThreads)
diff --git a/cppcache/integration-test/testThinClientCqFailover.cpp b/cppcache/integration-test/testThinClientCqFailover.cpp
index 460ef2d..2ae52d6 100644
--- a/cppcache/integration-test/testThinClientCqFailover.cpp
+++ b/cppcache/integration-test/testThinClientCqFailover.cpp
@@ -88,29 +88,23 @@ class MyCqListener : public CqListener {
void close() override { LOG("MyCqListener::close called"); }
};
-class KillServerThread : public ACE_Task_Base {
+class KillServerThread {
public:
- bool m_running;
- MyCqListener *m_listener;
- explicit KillServerThread(MyCqListener *listener)
- : m_running(false), m_listener(listener) {}
- int svc(void) override {
- while (m_running == true) {
+ void start() {
+ thread_ = std::thread{[]() {
CacheHelper::closeServer(1);
LOG("THREAD CLOSED SERVER 1");
- // m_listener->setFailedOver();
- m_running = false;
- }
- return 0;
- }
- void start() {
- m_running = true;
- activate();
+ }};
}
+
void stop() {
- m_running = false;
- wait();
+ if (thread_.joinable()) {
+ thread_.join();
+ }
}
+
+ protected:
+ std::thread thread_;
};
void initClientCq(const bool isthinClient) {
@@ -295,7 +289,8 @@ DUNIT_TASK_DEFINITION(CLIENT1, StepThree3)
ASSERT(cqLstner != nullptr, "listener is nullptr");
auto myListener = dynamic_cast<MyCqListener *>(cqLstner.get());
ASSERT(myListener != nullptr, "my listener is nullptr<cast failed>");
- kst = new KillServerThread(myListener);
+
+ kst = new KillServerThread();
LOG(std::string("before kill server 1, before=") +
std::to_string(myListener->getCountBefore()) +
", after=" + std::to_string(myListener->getCountAfter()));
diff --git a/cppcache/integration-test/testThinClientCqHAFailover.cpp b/cppcache/integration-test/testThinClientCqHAFailover.cpp
index 7016a7e..31a1874 100644
--- a/cppcache/integration-test/testThinClientCqHAFailover.cpp
+++ b/cppcache/integration-test/testThinClientCqHAFailover.cpp
@@ -87,29 +87,23 @@ class MyCqListener : public CqListener {
void close() override { LOG("MyCqListener::close called"); }
};
-class KillServerThread : public ACE_Task_Base {
+class KillServerThread {
public:
- bool m_running;
- MyCqListener *m_listener;
- explicit KillServerThread(MyCqListener *listener)
- : m_running(false), m_listener(listener) {}
- int svc(void) override {
- while (m_running == true) {
+ void start() {
+ thread_ = std::thread{[]() {
CacheHelper::closeServer(1);
LOG("THREAD CLOSED SERVER 1");
- // m_listener->setFailedOver();
- m_running = false;
- }
- return 0;
- }
- void start() {
- m_running = true;
- activate();
+ }};
}
+
void stop() {
- m_running = false;
- wait();
+ if (thread_.joinable()) {
+ thread_.join();
+ }
}
+
+ protected:
+ std::thread thread_;
};
void initClientCq() {
@@ -315,7 +309,7 @@ DUNIT_TASK_DEFINITION(CLIENT1, StepThree3)
ASSERT(cqLstner != nullptr, "listener is nullptr");
MyCqListener *myListener = dynamic_cast<MyCqListener *>(cqLstner.get());
ASSERT(myListener != nullptr, "my listener is nullptr<cast failed>");
- kst = new KillServerThread(myListener);
+ kst = new KillServerThread();
char buf[1024];
LOG(std::string("before kill server 1, before=") +
std::to_string(myListener->getCountBefore()) +
diff --git a/cppcache/integration-test/testThinClientHAQueryFailover.cpp b/cppcache/integration-test/testThinClientHAQueryFailover.cpp
index d31d57c..afd2178 100644
--- a/cppcache/integration-test/testThinClientHAQueryFailover.cpp
+++ b/cppcache/integration-test/testThinClientHAQueryFailover.cpp
@@ -58,28 +58,23 @@ static int numberOfLocators = 1;
const std::string locatorsG =
CacheHelper::getLocatorHostPort(isLocator, isLocalServer, numberOfLocators);
-class KillServerThread : public ACE_Task_Base {
+class KillServerThread {
public:
- bool m_running;
- KillServerThread() : m_running(false) {}
- int svc(void) override {
- while (m_running == true) {
- // CacheHelper::initServer( 2, "cacheserver_remoteoql2.xml");
- // LOG("THREAD STARTED SERVER 2");
+ void start() {
+ thread_ = std::thread{[]() {
CacheHelper::closeServer(1);
LOG("THREAD CLOSED SERVER 1");
- m_running = false;
- }
- return 0;
- }
- void start() {
- m_running = true;
- activate();
+ }};
}
+
void stop() {
- m_running = false;
- wait();
+ if (thread_.joinable()) {
+ thread_.join();
+ }
}
+
+ protected:
+ std::thread thread_;
};
void initClient() {
diff --git a/cppcache/integration-test/testThinClientPoolAttrTest.cpp b/cppcache/integration-test/testThinClientPoolAttrTest.cpp
index 1e9fd2e..65aa48c 100644
--- a/cppcache/integration-test/testThinClientPoolAttrTest.cpp
+++ b/cppcache/integration-test/testThinClientPoolAttrTest.cpp
@@ -44,17 +44,16 @@ const char *poolName1 = "clientPool";
const char *serverGroup = "ServerGroup1";
std::shared_ptr<Cache> cachePtr;
-class putThread : public ACE_Task_Base {
- private:
- std::shared_ptr<Region> regPtr;
-
+class PutThread {
public:
- explicit putThread(const char *name) : regPtr(getHelper()->getRegion(name)) {}
+ explicit PutThread() : regPtr{nullptr} {}
+ explicit PutThread(const std::string &name)
+ : regPtr{getHelper()->getRegion(name)} {}
- int svc(void) override {
+ void run() {
// TODO: No. of connection should be = minConnection
- for (int i = 0; i < 10000; i++) {
+ for (auto i = 0; i < 10000; ++i) {
try {
regPtr->put(keys[i % 5], vals[i % 6]);
} catch (const Exception &) {
@@ -64,11 +63,23 @@ class putThread : public ACE_Task_Base {
}
// TODO: Check no. of connection > minConnetion
}
- // LOG(" Incremented 100 times by thread.");
- return 0;
}
- void start() { activate(); }
+
+ void start() {
+ thread_ = std::thread{[this]() { run(); }};
+ }
+
+ void wait() {
+ if (thread_.joinable()) {
+ thread_.join();
+ }
+ }
+
void stop() { wait(); }
+
+ protected:
+ std::shared_ptr<Region> regPtr;
+ std::thread thread_;
};
void doAttrTestingAndCreatePool(const char *poolNameToUse) {
@@ -270,10 +281,10 @@ DUNIT_TASK(CLIENT1, ClientOp)
std::string("Pool level not equal to min level. Expected ") +
std::to_string(min) + ", actual " + std::to_string(level));
- putThread *threads[25];
+ PutThread threads[25];
for (int thdIdx = 0; thdIdx < 10; thdIdx++) {
- threads[thdIdx] = new putThread(poolRegNames[0]);
- threads[thdIdx]->start();
+ threads[thdIdx] = PutThread(poolRegNames[0]);
+ threads[thdIdx].start();
}
SLEEP(5000); // wait for threads to become active
@@ -291,7 +302,7 @@ DUNIT_TASK(CLIENT1, ClientOp)
std::to_string(max) + ", actual " + std::to_string(level));
for (int thdIdx = 0; thdIdx < 10; thdIdx++) {
- threads[thdIdx]->stop();
+ threads[thdIdx].stop();
}
// Milli second sleep: IdleTimeout is 5 sec, load conditioning
diff --git a/cppcache/integration-test/testThinClientRemoteQueryFailover.cpp b/cppcache/integration-test/testThinClientRemoteQueryFailover.cpp
index daa08fb..80255bd 100644
--- a/cppcache/integration-test/testThinClientRemoteQueryFailover.cpp
+++ b/cppcache/integration-test/testThinClientRemoteQueryFailover.cpp
@@ -53,26 +53,23 @@ using testobject::PortfolioPdx;
using testobject::Position;
using testobject::PositionPdx;
-class KillServerThread : public ACE_Task_Base {
+class KillServerThread {
public:
- bool m_running;
- KillServerThread() : m_running(false) {}
- int svc(void) override {
- while (m_running == true) {
+ void start() {
+ thread_ = std::thread{[]() {
CacheHelper::closeServer(1);
LOG("THREAD CLOSED SERVER 1");
- m_running = false;
- }
- return 0;
- }
- void start() {
- m_running = true;
- activate();
+ }};
}
+
void stop() {
- m_running = false;
- wait();
+ if (thread_.joinable()) {
+ thread_.join();
+ }
}
+
+ protected:
+ std::thread thread_;
};
bool isLocator = false;
diff --git a/cppcache/integration-test/testThinClientRemoteQueryFailoverPdx.cpp b/cppcache/integration-test/testThinClientRemoteQueryFailoverPdx.cpp
index dbc456a..88ca233 100644
--- a/cppcache/integration-test/testThinClientRemoteQueryFailoverPdx.cpp
+++ b/cppcache/integration-test/testThinClientRemoteQueryFailoverPdx.cpp
@@ -50,26 +50,23 @@ using apache::geode::client::IllegalStateException;
using apache::geode::client::QueryService;
using apache::geode::client::SelectResults;
-class KillServerThread : public ACE_Task_Base {
+class KillServerThread {
public:
- bool m_running;
- KillServerThread() : m_running(false) {}
- int svc(void) override {
- while (m_running == true) {
+ void start() {
+ thread_ = std::thread{[]() {
CacheHelper::closeServer(1);
LOG("THREAD CLOSED SERVER 1");
- m_running = false;
- }
- return 0;
- }
- void start() {
- m_running = true;
- activate();
+ }};
}
+
void stop() {
- m_running = false;
- wait();
+ if (thread_.joinable()) {
+ thread_.join();
+ }
}
+
+ protected:
+ std::thread thread_;
};
bool isLocator = false;