You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2019/08/13 15:01:34 UTC
[geode-native] branch develop updated: GEODE-7019: Fix closing of
idle connections in C++ Native client (#504)
This is an automated email from the ASF dual-hosted git repository.
bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git
The following commit(s) were added to refs/heads/develop by this push:
new 7453395 GEODE-7019: Fix closing of idle connections in C++ Native client (#504)
7453395 is described below
commit 7453395b7f36a58cfa5a0c70610ea91d32435001
Author: Alberto Gomez <al...@est.tech>
AuthorDate: Tue Aug 13 17:01:29 2019 +0200
GEODE-7019: Fix closing of idle connections in C++ Native client (#504)
- Also clean up a couple of char* variables in internal APIs
---
CONTRIBUTING.md | 12 +-
.../testThinClientPoolAttrTest.cpp | 16 +--
cppcache/integration/test/CMakeLists.txt | 1 +
cppcache/integration/test/CleanIdleConnections.cpp | 136 +++++++++++++++++++++
cppcache/src/CacheImpl.cpp | 2 +-
cppcache/src/CacheImpl.hpp | 2 +-
cppcache/src/PoolFactory.cpp | 17 +--
cppcache/src/ThinClientPoolDM.cpp | 133 ++++++++++----------
8 files changed, 232 insertions(+), 87 deletions(-)
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 1757e54..9510ca9 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -20,7 +20,7 @@ $ cd cppcache/test/<Debug|Release|if needed>
$ ./apache-geode_unittests
```
-### Running integration tests
+### Running old integration tests
```bash
$ cd <clone>
$ cd build
@@ -41,6 +41,16 @@ $ ctest -R <test_name> -C <Debug|Release>
```
.NET integration tests can be executed similarly from `build/clicache/integration-test`.
+### Running new integration tests
+```bash
+$ cd <clone>
+$ cd build
+$ cd cppcache/integration/test
+$ ./cpp-integration-test [<options>]
+```
+Note that <options> are gtest options that may be passed to the test executable, like for example the test cases to be run. Use --help to get all the available options.
+
+
## Style
### Formatting C++
diff --git a/cppcache/integration-test/testThinClientPoolAttrTest.cpp b/cppcache/integration-test/testThinClientPoolAttrTest.cpp
index 40ab5b6..d3cb2a4 100644
--- a/cppcache/integration-test/testThinClientPoolAttrTest.cpp
+++ b/cppcache/integration-test/testThinClientPoolAttrTest.cpp
@@ -260,8 +260,8 @@ DUNIT_TASK(CLIENT1, ClientOp)
// Check current # connections they should be == min
std::string poolName =
getHelper()->getRegion(poolRegNames[0])->getAttributes().getPoolName();
- int level = TestUtils::getCacheImpl(getHelper()->cachePtr)
- ->getPoolSize(poolName.c_str());
+ int level =
+ TestUtils::getCacheImpl(getHelper()->cachePtr)->getPoolSize(poolName);
int min = getHelper()
->getCache()
->getPoolManager()
@@ -281,8 +281,8 @@ DUNIT_TASK(CLIENT1, ClientOp)
SLEEP(5000); // wait for threads to become active
// Check current # connections they should be == max
- level = TestUtils::getCacheImpl(getHelper()->cachePtr)
- ->getPoolSize(poolName.c_str());
+ level =
+ TestUtils::getCacheImpl(getHelper()->cachePtr)->getPoolSize(poolName);
int max = getHelper()
->getCache()
->getPoolManager()
@@ -301,8 +301,8 @@ DUNIT_TASK(CLIENT1, ClientOp)
LOG("Waiting 25 sec for idle timeout to kick in");
SLEEP(25000);
- level = TestUtils::getCacheImpl(getHelper()->cachePtr)
- ->getPoolSize(poolName.c_str());
+ level =
+ TestUtils::getCacheImpl(getHelper()->cachePtr)->getPoolSize(poolName);
min = getHelper()
->getCache()
->getPoolManager()
@@ -317,8 +317,8 @@ DUNIT_TASK(CLIENT1, ClientOp)
LOG("Waiting 1 minute for load conditioning to kick in");
SLEEP(60000);
- level = TestUtils::getCacheImpl(getHelper()->cachePtr)
- ->getPoolSize(poolName.c_str());
+ level =
+ TestUtils::getCacheImpl(getHelper()->cachePtr)->getPoolSize(poolName);
sprintf(logmsg,
"Pool level not equal to min level after load "
"conditioning. Expected %d, actual %d",
diff --git a/cppcache/integration/test/CMakeLists.txt b/cppcache/integration/test/CMakeLists.txt
index 607fb08..a1d1134 100644
--- a/cppcache/integration/test/CMakeLists.txt
+++ b/cppcache/integration/test/CMakeLists.txt
@@ -35,6 +35,7 @@ add_executable(cpp-integration-test
SimpleCqListener.hpp
StructTest.cpp
TransactionCleaningTest.cpp
+ CleanIdleConnections.cpp
)
target_compile_definitions(cpp-integration-test
diff --git a/cppcache/integration/test/CleanIdleConnections.cpp b/cppcache/integration/test/CleanIdleConnections.cpp
new file mode 100644
index 0000000..de2ecbe
--- /dev/null
+++ b/cppcache/integration/test/CleanIdleConnections.cpp
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include <geode/Cache.hpp>
+#include <geode/PoolManager.hpp>
+#include <geode/RegionFactory.hpp>
+#include <geode/RegionShortcut.hpp>
+
+#include "CacheImpl.hpp"
+#include "CacheRegionHelper.hpp"
+#include "framework/Cluster.h"
+#include "framework/Framework.h"
+#include "framework/Gfsh.h"
+
+namespace {
+
+using apache::geode::client::Cache;
+using apache::geode::client::CacheImpl;
+using apache::geode::client::CacheRegionHelper;
+using apache::geode::client::Pool;
+using apache::geode::client::Region;
+using apache::geode::client::RegionShortcut;
+
+CacheImpl* getCacheImpl(Cache* cptr) {
+ return CacheRegionHelper::getCacheImpl(cptr);
+}
+
+Cache createCache() {
+ using apache::geode::client::CacheFactory;
+
+ auto cache = CacheFactory()
+ .set("log-level", "none")
+ .set("statistic-sampling-enabled", "false")
+ .create();
+
+ return cache;
+}
+
+std::shared_ptr<Pool> createPool(Cluster& cluster, Cache& cache,
+ const int& minConns, const int& maxConns,
+ const std::string& poolName) {
+ auto poolFactory = cache.getPoolManager().createFactory();
+ cluster.applyLocators(poolFactory);
+ poolFactory.setPRSingleHopEnabled(true);
+ poolFactory.setMinConnections(minConns);
+ poolFactory.setMaxConnections(maxConns);
+ return poolFactory.create(poolName);
+}
+
+std::shared_ptr<Region> setupRegion(Cache& cache,
+ const std::shared_ptr<Pool>& pool) {
+ auto region = cache.createRegionFactory(RegionShortcut::PROXY)
+ .setPoolName(pool->getName())
+ .create("region");
+
+ return region;
+}
+
+void doGets(std::shared_ptr<Region> region, int entries) {
+ for (auto i = 0; i < entries; i++) {
+ region->get(i);
+ }
+}
+
+TEST(CleanIdleConnectionsTest, cleanIdleConnectionsAfterOpsPaused) {
+ Cluster cluster{LocatorCount{1}, ServerCount{2}};
+ cluster.getGfsh()
+ .create()
+ .region()
+ .withName("region")
+ .withType("PARTITION")
+ .execute();
+
+ auto cache = createCache();
+ auto minConns = 1;
+ auto maxConns = -1;
+ std::string poolName = "default";
+ auto pool = createPool(cluster, cache, minConns, maxConns, poolName);
+ auto region = setupRegion(cache, pool);
+
+ int poolSize = getCacheImpl(&cache)->getPoolSize(poolName);
+ ASSERT_EQ(poolSize, 0);
+
+ sleep(10);
+
+ poolSize = getCacheImpl(&cache)->getPoolSize(poolName);
+ ASSERT_GE(poolSize, minConns);
+
+ int entries = 10;
+ for (auto i = 0; i < entries; i++) {
+ region->put(i, "value");
+ }
+
+ std::vector<std::shared_ptr<std::thread>> tasks;
+ int threads = 10;
+
+ for (int i = 0; i < threads; i++) {
+ std::shared_ptr<std::thread> threadAux =
+ std::make_shared<std::thread>(doGets, region, entries);
+ tasks.push_back(threadAux);
+ }
+
+ for (int i = 0; i < threads; i++) {
+ tasks[i]->join();
+ }
+
+ poolSize = getCacheImpl(&cache)->getPoolSize(poolName);
+ ASSERT_GT(poolSize, minConns);
+
+ // As the default clientIdleTimeout is 5 secs, after 10 seconds
+ // all idle connections must have been closed.
+ sleep(10);
+
+ poolSize = getCacheImpl(&cache)->getPoolSize(poolName);
+ ASSERT_EQ(poolSize, minConns);
+}
+
+} // namespace
diff --git a/cppcache/src/CacheImpl.cpp b/cppcache/src/CacheImpl.cpp
index 7853fdc..a2cce13 100644
--- a/cppcache/src/CacheImpl.cpp
+++ b/cppcache/src/CacheImpl.cpp
@@ -699,7 +699,7 @@ void CacheImpl::processMarker() {
}
}
-int CacheImpl::getPoolSize(const char* poolName) {
+int CacheImpl::getPoolSize(const std::string& poolName) {
if (const auto pool = getPoolManager().find(poolName)) {
if (const auto dm = std::dynamic_pointer_cast<ThinClientPoolDM>(pool)) {
return dm->m_poolSize;
diff --git a/cppcache/src/CacheImpl.hpp b/cppcache/src/CacheImpl.hpp
index f9ca86f..25b01ae 100644
--- a/cppcache/src/CacheImpl.hpp
+++ b/cppcache/src/CacheImpl.hpp
@@ -253,7 +253,7 @@ class APACHE_GEODE_EXPORT CacheImpl : private NonCopyable,
void processMarker();
// Pool helpers for unit tests
- int getPoolSize(const char* poolName);
+ int getPoolSize(const std::string& poolName);
bool getPdxIgnoreUnreadFields() {
this->throwIfClosed();
diff --git a/cppcache/src/PoolFactory.cpp b/cppcache/src/PoolFactory.cpp
index c9f82d7..5b25227 100644
--- a/cppcache/src/PoolFactory.cpp
+++ b/cppcache/src/PoolFactory.cpp
@@ -74,7 +74,7 @@ PoolFactory::PoolFactory(const Cache& cache)
PoolFactory& PoolFactory::setFreeConnectionTimeout(
std::chrono::milliseconds connectionTimeout) {
if (connectionTimeout <= std::chrono::milliseconds::zero()) {
- throw IllegalArgumentException("connectionTimeout must greater than 0.");
+ throw IllegalArgumentException("connectionTimeout must be greater than 0.");
}
m_attrs->setFreeConnectionTimeout(connectionTimeout);
@@ -85,7 +85,7 @@ PoolFactory& PoolFactory::setLoadConditioningInterval(
std::chrono::milliseconds loadConditioningInterval) {
if (loadConditioningInterval < std::chrono::milliseconds::zero()) {
throw IllegalArgumentException(
- "loadConditioningInterval must greater than or equlal to 0.");
+ "loadConditioningInterval must be greater than or equal to 0.");
}
m_attrs->setLoadConditioningInterval(loadConditioningInterval);
@@ -105,7 +105,7 @@ PoolFactory& PoolFactory::setThreadLocalConnections(
PoolFactory& PoolFactory::setReadTimeout(std::chrono::milliseconds timeout) {
if (timeout <= std::chrono::milliseconds::zero()) {
- throw IllegalArgumentException("timeout must greater than 0.");
+ throw IllegalArgumentException("timeout must be greater than 0.");
}
m_attrs->setReadTimeout(timeout);
@@ -126,7 +126,7 @@ PoolFactory& PoolFactory::setIdleTimeout(
std::chrono::milliseconds idleTimeout) {
if (idleTimeout < std::chrono::milliseconds::zero()) {
throw IllegalArgumentException(
- "idleTimeout must greater than or equlal to 0.");
+ "idleTimeout must be greater than or equal to 0.");
}
m_attrs->setIdleTimeout(idleTimeout);
@@ -141,7 +141,7 @@ PoolFactory& PoolFactory::setRetryAttempts(int retryAttempts) {
PoolFactory& PoolFactory::setPingInterval(
std::chrono::milliseconds pingInterval) {
if (pingInterval <= std::chrono::milliseconds::zero()) {
- throw IllegalArgumentException("timeout must greater than 0.");
+ throw IllegalArgumentException("timeout must be greater than 0.");
}
m_attrs->setPingInterval(pingInterval);
@@ -161,7 +161,8 @@ PoolFactory& PoolFactory::setUpdateLocatorListInterval(
PoolFactory& PoolFactory::setStatisticInterval(
std::chrono::milliseconds statisticInterval) {
if (statisticInterval < std::chrono::milliseconds::zero()) {
- throw IllegalArgumentException("timeout must greater than or equal to 0.");
+ throw IllegalArgumentException(
+ "timeout must be greater than or equal to 0.");
}
m_attrs->setStatisticInterval(statisticInterval);
@@ -201,7 +202,7 @@ PoolFactory& PoolFactory::setSubscriptionRedundancy(int redundancy) {
PoolFactory& PoolFactory::setSubscriptionMessageTrackingTimeout(
std::chrono::milliseconds messageTrackingTimeout) {
if (messageTrackingTimeout <= std::chrono::milliseconds::zero()) {
- throw IllegalArgumentException("timeout must greater than 0.");
+ throw IllegalArgumentException("timeout must be greater than 0.");
}
m_attrs->setSubscriptionMessageTrackingTimeout(messageTrackingTimeout);
@@ -211,7 +212,7 @@ PoolFactory& PoolFactory::setSubscriptionMessageTrackingTimeout(
PoolFactory& PoolFactory::setSubscriptionAckInterval(
std::chrono::milliseconds ackInterval) {
if (ackInterval <= std::chrono::milliseconds::zero()) {
- throw IllegalArgumentException("timeout must greater than 0.");
+ throw IllegalArgumentException("timeout must be greater than 0.");
}
m_attrs->setSubscriptionAckInterval(ackInterval);
diff --git a/cppcache/src/ThinClientPoolDM.cpp b/cppcache/src/ThinClientPoolDM.cpp
index 4eaaca9..7f798f6 100644
--- a/cppcache/src/ThinClientPoolDM.cpp
+++ b/cppcache/src/ThinClientPoolDM.cpp
@@ -408,52 +408,73 @@ void ThinClientPoolDM::cleanStaleConnections(std::atomic<bool>& isRunning) {
auto _idle = getIdleTimeout();
auto _nextIdle = _idle;
- {
- TcrConnection* conn = nullptr;
- std::vector<TcrConnection*> savelist;
- std::vector<TcrConnection*> replacelist;
- std::set<ServerLocation> excludeServers;
+ TcrConnection* conn = nullptr;
- while ((conn = getNoWait()) != nullptr && isRunning) {
- if (canItBeDeleted(conn)) {
- replacelist.push_back(conn);
- } else if (conn) {
- auto nextIdle =
- _idle - std::chrono::duration_cast<std::chrono::milliseconds>(
- TcrConnection::clock::now() - conn->getLastAccessed());
- if (nextIdle > std::chrono::seconds::zero() && nextIdle < _nextIdle) {
- _nextIdle = nextIdle;
- }
- savelist.push_back(conn);
+ std::vector<TcrConnection*> savelist;
+ std::vector<TcrConnection*> removelist;
+ std::set<ServerLocation> excludeServers;
+
+ while ((conn = getNoWait()) != nullptr && isRunning) {
+ if (canItBeDeleted(conn)) {
+ removelist.push_back(conn);
+ } else if (conn) {
+ auto nextIdle =
+ _idle - std::chrono::duration_cast<std::chrono::milliseconds>(
+ TcrConnection::clock::now() - conn->getLastAccessed());
+ if (nextIdle > std::chrono::seconds::zero() && nextIdle < _nextIdle) {
+ _nextIdle = nextIdle;
}
+ savelist.push_back(conn);
}
+ }
- size_t replaceCount =
- m_attrs->getMinConnections() - static_cast<int>(savelist.size());
+ auto replaceCount =
+ m_attrs->getMinConnections() - static_cast<int>(savelist.size());
- LOGDEBUG("Preserving %d connections", savelist.size());
+ LOGDEBUG("Preserving %d connections", savelist.size());
- for (auto savedconn : savelist) {
- put(savedconn, false);
- }
- savelist.clear();
- int count = 0;
+ for (auto savedconn : savelist) {
+ put(savedconn, false);
+ }
+ savelist.clear();
+ int count = 0;
- for (std::vector<TcrConnection*>::const_iterator iter = replacelist.begin();
- iter != replacelist.end(); ++iter) {
- TcrConnection* conn = *iter;
- if (replaceCount <= 0) {
- GF_SAFE_DELETE_CON(conn);
- removeEPConnections(1, false);
- getStats().incLoadCondDisconnects();
- LOGDEBUG("Removed a connection");
+ for (std::vector<TcrConnection*>::const_iterator iter = removelist.begin();
+ iter != removelist.end(); ++iter) {
+ TcrConnection* conn = *iter;
+ if (replaceCount <= 0) {
+ GF_SAFE_DELETE_CON(conn);
+ removeEPConnections(1, false);
+ getStats().incLoadCondDisconnects();
+ LOGDEBUG("Removed a connection");
+ } else {
+ TcrConnection* newConn = nullptr;
+ bool maxConnLimit = false;
+ createPoolConnection(newConn, excludeServers, maxConnLimit,
+ /*hasExpired(conn) ? nullptr :*/ conn);
+ if (newConn) {
+ auto nextIdle =
+ _idle - std::chrono::duration_cast<std::chrono::milliseconds>(
+ TcrConnection::clock::now() - conn->getLastAccessed());
+ if (nextIdle > std::chrono::seconds::zero() && nextIdle < _nextIdle) {
+ _nextIdle = nextIdle;
+ }
+ put(newConn, false);
+ if (newConn != conn) {
+ GF_SAFE_DELETE_CON(conn);
+ removeEPConnections(1, false);
+ getStats().incLoadCondDisconnects();
+ LOGDEBUG("Removed a connection");
+ }
} else {
- TcrConnection* newConn = nullptr;
- bool maxConnLimit = false;
- createPoolConnection(newConn, excludeServers, maxConnLimit,
- /*hasExpired(conn) ? nullptr :*/ conn);
- if (newConn) {
+ if (hasExpired(conn)) {
+ GF_SAFE_DELETE_CON(conn);
+ removeEPConnections(1, false);
+ getStats().incLoadCondDisconnects();
+ LOGDEBUG("Removed a connection");
+ } else {
+ conn->updateCreationTime();
auto nextIdle =
_idle -
std::chrono::duration_cast<std::chrono::milliseconds>(
@@ -461,40 +482,16 @@ void ThinClientPoolDM::cleanStaleConnections(std::atomic<bool>& isRunning) {
if (nextIdle > std::chrono::seconds::zero() && nextIdle < _nextIdle) {
_nextIdle = nextIdle;
}
- put(newConn, false);
- if (newConn != conn) {
- GF_SAFE_DELETE_CON(conn);
- removeEPConnections(1, false);
- getStats().incLoadCondDisconnects();
- LOGDEBUG("Removed a connection");
- }
- } else {
- if (hasExpired(conn)) {
- GF_SAFE_DELETE_CON(conn);
- removeEPConnections(1, false);
- getStats().incLoadCondDisconnects();
- LOGDEBUG("Removed a connection");
- } else {
- conn->updateCreationTime();
- auto nextIdle =
- _idle -
- std::chrono::duration_cast<std::chrono::milliseconds>(
- TcrConnection::clock::now() - conn->getLastAccessed());
- if (nextIdle > std::chrono::seconds::zero() &&
- nextIdle < _nextIdle) {
- _nextIdle = nextIdle;
- }
- put(conn, false);
- }
+ put(conn, false);
}
}
- replaceCount--;
- count++;
- if (count % 10 == 0) {
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- }
}
- replacelist.clear();
+ replaceCount--;
+ count++;
+
+ if (count % 10 == 0) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
}
if (m_connManageTaskId >= 0 && isRunning &&