You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2019/08/13 15:01:34 UTC

[geode-native] branch develop updated: GEODE-7019: Fix closing of idle connections in C++ Native client (#504)

This is an automated email from the ASF dual-hosted git repository.

bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7453395  GEODE-7019: Fix closing of idle connections in C++ Native client (#504)
7453395 is described below

commit 7453395b7f36a58cfa5a0c70610ea91d32435001
Author: Alberto Gomez <al...@est.tech>
AuthorDate: Tue Aug 13 17:01:29 2019 +0200

    GEODE-7019: Fix closing of idle connections in C++ Native client (#504)
    
    - Also clean up a couple of char* variables in internal APIs
---
 CONTRIBUTING.md                                    |  12 +-
 .../testThinClientPoolAttrTest.cpp                 |  16 +--
 cppcache/integration/test/CMakeLists.txt           |   1 +
 cppcache/integration/test/CleanIdleConnections.cpp | 136 +++++++++++++++++++++
 cppcache/src/CacheImpl.cpp                         |   2 +-
 cppcache/src/CacheImpl.hpp                         |   2 +-
 cppcache/src/PoolFactory.cpp                       |  17 +--
 cppcache/src/ThinClientPoolDM.cpp                  | 133 ++++++++++----------
 8 files changed, 232 insertions(+), 87 deletions(-)

diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 1757e54..9510ca9 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -20,7 +20,7 @@ $ cd cppcache/test/<Debug|Release|if needed>
 $ ./apache-geode_unittests
 ```
 
-### Running integration tests
+### Running old integration tests
 ```bash
 $ cd <clone>
 $ cd build
@@ -41,6 +41,16 @@ $ ctest -R <test_name> -C <Debug|Release>
 ```
 .NET integration tests can be executed similarly from `build/clicache/integration-test`.
 
+### Running new integration tests
+```bash
+$ cd <clone>
+$ cd build
+$ cd cppcache/integration/test
+$ ./cpp-integration-test [<options>]
+```
+Note that <options> are gtest options that may be passed to the test executable, like for example the test cases to be run. Use --help to get all the available options.
+
+
 ## Style
 
 ### Formatting C++
diff --git a/cppcache/integration-test/testThinClientPoolAttrTest.cpp b/cppcache/integration-test/testThinClientPoolAttrTest.cpp
index 40ab5b6..d3cb2a4 100644
--- a/cppcache/integration-test/testThinClientPoolAttrTest.cpp
+++ b/cppcache/integration-test/testThinClientPoolAttrTest.cpp
@@ -260,8 +260,8 @@ DUNIT_TASK(CLIENT1, ClientOp)
     // Check current # connections they should be == min
     std::string poolName =
         getHelper()->getRegion(poolRegNames[0])->getAttributes().getPoolName();
-    int level = TestUtils::getCacheImpl(getHelper()->cachePtr)
-                    ->getPoolSize(poolName.c_str());
+    int level =
+        TestUtils::getCacheImpl(getHelper()->cachePtr)->getPoolSize(poolName);
     int min = getHelper()
                   ->getCache()
                   ->getPoolManager()
@@ -281,8 +281,8 @@ DUNIT_TASK(CLIENT1, ClientOp)
     SLEEP(5000);  // wait for threads to become active
 
     // Check current # connections they should be == max
-    level = TestUtils::getCacheImpl(getHelper()->cachePtr)
-                ->getPoolSize(poolName.c_str());
+    level =
+        TestUtils::getCacheImpl(getHelper()->cachePtr)->getPoolSize(poolName);
     int max = getHelper()
                   ->getCache()
                   ->getPoolManager()
@@ -301,8 +301,8 @@ DUNIT_TASK(CLIENT1, ClientOp)
     LOG("Waiting 25 sec for idle timeout to kick in");
     SLEEP(25000);
 
-    level = TestUtils::getCacheImpl(getHelper()->cachePtr)
-                ->getPoolSize(poolName.c_str());
+    level =
+        TestUtils::getCacheImpl(getHelper()->cachePtr)->getPoolSize(poolName);
     min = getHelper()
               ->getCache()
               ->getPoolManager()
@@ -317,8 +317,8 @@ DUNIT_TASK(CLIENT1, ClientOp)
     LOG("Waiting 1 minute for load conditioning to kick in");
     SLEEP(60000);
 
-    level = TestUtils::getCacheImpl(getHelper()->cachePtr)
-                ->getPoolSize(poolName.c_str());
+    level =
+        TestUtils::getCacheImpl(getHelper()->cachePtr)->getPoolSize(poolName);
     sprintf(logmsg,
             "Pool level not equal to min level after load "
             "conditioning. Expected %d, actual %d",
diff --git a/cppcache/integration/test/CMakeLists.txt b/cppcache/integration/test/CMakeLists.txt
index 607fb08..a1d1134 100644
--- a/cppcache/integration/test/CMakeLists.txt
+++ b/cppcache/integration/test/CMakeLists.txt
@@ -35,6 +35,7 @@ add_executable(cpp-integration-test
   SimpleCqListener.hpp
   StructTest.cpp
   TransactionCleaningTest.cpp
+  CleanIdleConnections.cpp
 )
 
 target_compile_definitions(cpp-integration-test
diff --git a/cppcache/integration/test/CleanIdleConnections.cpp b/cppcache/integration/test/CleanIdleConnections.cpp
new file mode 100644
index 0000000..de2ecbe
--- /dev/null
+++ b/cppcache/integration/test/CleanIdleConnections.cpp
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include <geode/Cache.hpp>
+#include <geode/PoolManager.hpp>
+#include <geode/RegionFactory.hpp>
+#include <geode/RegionShortcut.hpp>
+
+#include "CacheImpl.hpp"
+#include "CacheRegionHelper.hpp"
+#include "framework/Cluster.h"
+#include "framework/Framework.h"
+#include "framework/Gfsh.h"
+
+namespace {
+
+using apache::geode::client::Cache;
+using apache::geode::client::CacheImpl;
+using apache::geode::client::CacheRegionHelper;
+using apache::geode::client::Pool;
+using apache::geode::client::Region;
+using apache::geode::client::RegionShortcut;
+
+CacheImpl* getCacheImpl(Cache* cptr) {
+  return CacheRegionHelper::getCacheImpl(cptr);
+}
+
+Cache createCache() {
+  using apache::geode::client::CacheFactory;
+
+  auto cache = CacheFactory()
+                   .set("log-level", "none")
+                   .set("statistic-sampling-enabled", "false")
+                   .create();
+
+  return cache;
+}
+
+std::shared_ptr<Pool> createPool(Cluster& cluster, Cache& cache,
+                                 const int& minConns, const int& maxConns,
+                                 const std::string& poolName) {
+  auto poolFactory = cache.getPoolManager().createFactory();
+  cluster.applyLocators(poolFactory);
+  poolFactory.setPRSingleHopEnabled(true);
+  poolFactory.setMinConnections(minConns);
+  poolFactory.setMaxConnections(maxConns);
+  return poolFactory.create(poolName);
+}
+
+std::shared_ptr<Region> setupRegion(Cache& cache,
+                                    const std::shared_ptr<Pool>& pool) {
+  auto region = cache.createRegionFactory(RegionShortcut::PROXY)
+                    .setPoolName(pool->getName())
+                    .create("region");
+
+  return region;
+}
+
+void doGets(std::shared_ptr<Region> region, int entries) {
+  for (auto i = 0; i < entries; i++) {
+    region->get(i);
+  }
+}
+
+TEST(CleanIdleConnectionsTest, cleanIdleConnectionsAfterOpsPaused) {
+  Cluster cluster{LocatorCount{1}, ServerCount{2}};
+  cluster.getGfsh()
+      .create()
+      .region()
+      .withName("region")
+      .withType("PARTITION")
+      .execute();
+
+  auto cache = createCache();
+  auto minConns = 1;
+  auto maxConns = -1;
+  std::string poolName = "default";
+  auto pool = createPool(cluster, cache, minConns, maxConns, poolName);
+  auto region = setupRegion(cache, pool);
+
+  int poolSize = getCacheImpl(&cache)->getPoolSize(poolName);
+  ASSERT_EQ(poolSize, 0);
+
+  sleep(10);
+
+  poolSize = getCacheImpl(&cache)->getPoolSize(poolName);
+  ASSERT_GE(poolSize, minConns);
+
+  int entries = 10;
+  for (auto i = 0; i < entries; i++) {
+    region->put(i, "value");
+  }
+
+  std::vector<std::shared_ptr<std::thread>> tasks;
+  int threads = 10;
+
+  for (int i = 0; i < threads; i++) {
+    std::shared_ptr<std::thread> threadAux =
+        std::make_shared<std::thread>(doGets, region, entries);
+    tasks.push_back(threadAux);
+  }
+
+  for (int i = 0; i < threads; i++) {
+    tasks[i]->join();
+  }
+
+  poolSize = getCacheImpl(&cache)->getPoolSize(poolName);
+  ASSERT_GT(poolSize, minConns);
+
+  // As the default clientIdleTimeout is 5 secs, after 10 seconds
+  // all idle connections must have been closed.
+  sleep(10);
+
+  poolSize = getCacheImpl(&cache)->getPoolSize(poolName);
+  ASSERT_EQ(poolSize, minConns);
+}
+
+}  // namespace
diff --git a/cppcache/src/CacheImpl.cpp b/cppcache/src/CacheImpl.cpp
index 7853fdc..a2cce13 100644
--- a/cppcache/src/CacheImpl.cpp
+++ b/cppcache/src/CacheImpl.cpp
@@ -699,7 +699,7 @@ void CacheImpl::processMarker() {
   }
 }
 
-int CacheImpl::getPoolSize(const char* poolName) {
+int CacheImpl::getPoolSize(const std::string& poolName) {
   if (const auto pool = getPoolManager().find(poolName)) {
     if (const auto dm = std::dynamic_pointer_cast<ThinClientPoolDM>(pool)) {
       return dm->m_poolSize;
diff --git a/cppcache/src/CacheImpl.hpp b/cppcache/src/CacheImpl.hpp
index f9ca86f..25b01ae 100644
--- a/cppcache/src/CacheImpl.hpp
+++ b/cppcache/src/CacheImpl.hpp
@@ -253,7 +253,7 @@ class APACHE_GEODE_EXPORT CacheImpl : private NonCopyable,
   void processMarker();
 
   // Pool helpers for unit tests
-  int getPoolSize(const char* poolName);
+  int getPoolSize(const std::string& poolName);
 
   bool getPdxIgnoreUnreadFields() {
     this->throwIfClosed();
diff --git a/cppcache/src/PoolFactory.cpp b/cppcache/src/PoolFactory.cpp
index c9f82d7..5b25227 100644
--- a/cppcache/src/PoolFactory.cpp
+++ b/cppcache/src/PoolFactory.cpp
@@ -74,7 +74,7 @@ PoolFactory::PoolFactory(const Cache& cache)
 PoolFactory& PoolFactory::setFreeConnectionTimeout(
     std::chrono::milliseconds connectionTimeout) {
   if (connectionTimeout <= std::chrono::milliseconds::zero()) {
-    throw IllegalArgumentException("connectionTimeout must greater than 0.");
+    throw IllegalArgumentException("connectionTimeout must be greater than 0.");
   }
 
   m_attrs->setFreeConnectionTimeout(connectionTimeout);
@@ -85,7 +85,7 @@ PoolFactory& PoolFactory::setLoadConditioningInterval(
     std::chrono::milliseconds loadConditioningInterval) {
   if (loadConditioningInterval < std::chrono::milliseconds::zero()) {
     throw IllegalArgumentException(
-        "loadConditioningInterval must greater than or equlal to 0.");
+        "loadConditioningInterval must be greater than or equal to 0.");
   }
 
   m_attrs->setLoadConditioningInterval(loadConditioningInterval);
@@ -105,7 +105,7 @@ PoolFactory& PoolFactory::setThreadLocalConnections(
 
 PoolFactory& PoolFactory::setReadTimeout(std::chrono::milliseconds timeout) {
   if (timeout <= std::chrono::milliseconds::zero()) {
-    throw IllegalArgumentException("timeout must greater than 0.");
+    throw IllegalArgumentException("timeout must be greater than 0.");
   }
 
   m_attrs->setReadTimeout(timeout);
@@ -126,7 +126,7 @@ PoolFactory& PoolFactory::setIdleTimeout(
     std::chrono::milliseconds idleTimeout) {
   if (idleTimeout < std::chrono::milliseconds::zero()) {
     throw IllegalArgumentException(
-        "idleTimeout must greater than or equlal to 0.");
+        "idleTimeout must be greater than or equal to 0.");
   }
 
   m_attrs->setIdleTimeout(idleTimeout);
@@ -141,7 +141,7 @@ PoolFactory& PoolFactory::setRetryAttempts(int retryAttempts) {
 PoolFactory& PoolFactory::setPingInterval(
     std::chrono::milliseconds pingInterval) {
   if (pingInterval <= std::chrono::milliseconds::zero()) {
-    throw IllegalArgumentException("timeout must greater than 0.");
+    throw IllegalArgumentException("timeout must be greater than 0.");
   }
 
   m_attrs->setPingInterval(pingInterval);
@@ -161,7 +161,8 @@ PoolFactory& PoolFactory::setUpdateLocatorListInterval(
 PoolFactory& PoolFactory::setStatisticInterval(
     std::chrono::milliseconds statisticInterval) {
   if (statisticInterval < std::chrono::milliseconds::zero()) {
-    throw IllegalArgumentException("timeout must greater than or equal to 0.");
+    throw IllegalArgumentException(
+        "timeout must be greater than or equal to 0.");
   }
 
   m_attrs->setStatisticInterval(statisticInterval);
@@ -201,7 +202,7 @@ PoolFactory& PoolFactory::setSubscriptionRedundancy(int redundancy) {
 PoolFactory& PoolFactory::setSubscriptionMessageTrackingTimeout(
     std::chrono::milliseconds messageTrackingTimeout) {
   if (messageTrackingTimeout <= std::chrono::milliseconds::zero()) {
-    throw IllegalArgumentException("timeout must greater than 0.");
+    throw IllegalArgumentException("timeout must be greater than 0.");
   }
 
   m_attrs->setSubscriptionMessageTrackingTimeout(messageTrackingTimeout);
@@ -211,7 +212,7 @@ PoolFactory& PoolFactory::setSubscriptionMessageTrackingTimeout(
 PoolFactory& PoolFactory::setSubscriptionAckInterval(
     std::chrono::milliseconds ackInterval) {
   if (ackInterval <= std::chrono::milliseconds::zero()) {
-    throw IllegalArgumentException("timeout must greater than 0.");
+    throw IllegalArgumentException("timeout must be greater than 0.");
   }
 
   m_attrs->setSubscriptionAckInterval(ackInterval);
diff --git a/cppcache/src/ThinClientPoolDM.cpp b/cppcache/src/ThinClientPoolDM.cpp
index 4eaaca9..7f798f6 100644
--- a/cppcache/src/ThinClientPoolDM.cpp
+++ b/cppcache/src/ThinClientPoolDM.cpp
@@ -408,52 +408,73 @@ void ThinClientPoolDM::cleanStaleConnections(std::atomic<bool>& isRunning) {
 
   auto _idle = getIdleTimeout();
   auto _nextIdle = _idle;
-  {
-    TcrConnection* conn = nullptr;
 
-    std::vector<TcrConnection*> savelist;
-    std::vector<TcrConnection*> replacelist;
-    std::set<ServerLocation> excludeServers;
+  TcrConnection* conn = nullptr;
 
-    while ((conn = getNoWait()) != nullptr && isRunning) {
-      if (canItBeDeleted(conn)) {
-        replacelist.push_back(conn);
-      } else if (conn) {
-        auto nextIdle =
-            _idle - std::chrono::duration_cast<std::chrono::milliseconds>(
-                        TcrConnection::clock::now() - conn->getLastAccessed());
-        if (nextIdle > std::chrono::seconds::zero() && nextIdle < _nextIdle) {
-          _nextIdle = nextIdle;
-        }
-        savelist.push_back(conn);
+  std::vector<TcrConnection*> savelist;
+  std::vector<TcrConnection*> removelist;
+  std::set<ServerLocation> excludeServers;
+
+  while ((conn = getNoWait()) != nullptr && isRunning) {
+    if (canItBeDeleted(conn)) {
+      removelist.push_back(conn);
+    } else if (conn) {
+      auto nextIdle =
+          _idle - std::chrono::duration_cast<std::chrono::milliseconds>(
+                      TcrConnection::clock::now() - conn->getLastAccessed());
+      if (nextIdle > std::chrono::seconds::zero() && nextIdle < _nextIdle) {
+        _nextIdle = nextIdle;
       }
+      savelist.push_back(conn);
     }
+  }
 
-    size_t replaceCount =
-        m_attrs->getMinConnections() - static_cast<int>(savelist.size());
+  auto replaceCount =
+      m_attrs->getMinConnections() - static_cast<int>(savelist.size());
 
-    LOGDEBUG("Preserving %d connections", savelist.size());
+  LOGDEBUG("Preserving %d connections", savelist.size());
 
-    for (auto savedconn : savelist) {
-      put(savedconn, false);
-    }
-    savelist.clear();
-    int count = 0;
+  for (auto savedconn : savelist) {
+    put(savedconn, false);
+  }
+  savelist.clear();
+  int count = 0;
 
-    for (std::vector<TcrConnection*>::const_iterator iter = replacelist.begin();
-         iter != replacelist.end(); ++iter) {
-      TcrConnection* conn = *iter;
-      if (replaceCount <= 0) {
-        GF_SAFE_DELETE_CON(conn);
-        removeEPConnections(1, false);
-        getStats().incLoadCondDisconnects();
-        LOGDEBUG("Removed a connection");
+  for (std::vector<TcrConnection*>::const_iterator iter = removelist.begin();
+       iter != removelist.end(); ++iter) {
+    TcrConnection* conn = *iter;
+    if (replaceCount <= 0) {
+      GF_SAFE_DELETE_CON(conn);
+      removeEPConnections(1, false);
+      getStats().incLoadCondDisconnects();
+      LOGDEBUG("Removed a connection");
+    } else {
+      TcrConnection* newConn = nullptr;
+      bool maxConnLimit = false;
+      createPoolConnection(newConn, excludeServers, maxConnLimit,
+                           /*hasExpired(conn) ? nullptr :*/ conn);
+      if (newConn) {
+        auto nextIdle =
+            _idle - std::chrono::duration_cast<std::chrono::milliseconds>(
+                        TcrConnection::clock::now() - conn->getLastAccessed());
+        if (nextIdle > std::chrono::seconds::zero() && nextIdle < _nextIdle) {
+          _nextIdle = nextIdle;
+        }
+        put(newConn, false);
+        if (newConn != conn) {
+          GF_SAFE_DELETE_CON(conn);
+          removeEPConnections(1, false);
+          getStats().incLoadCondDisconnects();
+          LOGDEBUG("Removed a connection");
+        }
       } else {
-        TcrConnection* newConn = nullptr;
-        bool maxConnLimit = false;
-        createPoolConnection(newConn, excludeServers, maxConnLimit,
-                             /*hasExpired(conn) ? nullptr :*/ conn);
-        if (newConn) {
+        if (hasExpired(conn)) {
+          GF_SAFE_DELETE_CON(conn);
+          removeEPConnections(1, false);
+          getStats().incLoadCondDisconnects();
+          LOGDEBUG("Removed a connection");
+        } else {
+          conn->updateCreationTime();
           auto nextIdle =
               _idle -
               std::chrono::duration_cast<std::chrono::milliseconds>(
@@ -461,40 +482,16 @@ void ThinClientPoolDM::cleanStaleConnections(std::atomic<bool>& isRunning) {
           if (nextIdle > std::chrono::seconds::zero() && nextIdle < _nextIdle) {
             _nextIdle = nextIdle;
           }
-          put(newConn, false);
-          if (newConn != conn) {
-            GF_SAFE_DELETE_CON(conn);
-            removeEPConnections(1, false);
-            getStats().incLoadCondDisconnects();
-            LOGDEBUG("Removed a connection");
-          }
-        } else {
-          if (hasExpired(conn)) {
-            GF_SAFE_DELETE_CON(conn);
-            removeEPConnections(1, false);
-            getStats().incLoadCondDisconnects();
-            LOGDEBUG("Removed a connection");
-          } else {
-            conn->updateCreationTime();
-            auto nextIdle =
-                _idle -
-                std::chrono::duration_cast<std::chrono::milliseconds>(
-                    TcrConnection::clock::now() - conn->getLastAccessed());
-            if (nextIdle > std::chrono::seconds::zero() &&
-                nextIdle < _nextIdle) {
-              _nextIdle = nextIdle;
-            }
-            put(conn, false);
-          }
+          put(conn, false);
         }
       }
-      replaceCount--;
-      count++;
-      if (count % 10 == 0) {
-        std::this_thread::sleep_for(std::chrono::milliseconds(1));
-      }
     }
-    replacelist.clear();
+    replaceCount--;
+    count++;
+
+    if (count % 10 == 0) {
+      std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
   }
 
   if (m_connManageTaskId >= 0 && isRunning &&