You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2021/03/29 17:09:19 UTC

[geode-native] branch develop updated: GEODE-8793: Fix PdxTypeRegistry cleanup (#715)

This is an automated email from the ASF dual-hosted git repository.

bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git


The following commit(s) were added to refs/heads/develop by this push:
     new d80dd32  GEODE-8793: Fix PdxTypeRegistry cleanup (#715)
d80dd32 is described below

commit d80dd3286369476552cd27a27ba7a7c431e20227
Author: Mario Salazar de Torres <ma...@est.tech>
AuthorDate: Mon Mar 29 19:09:10 2021 +0200

    GEODE-8793: Fix PdxTypeRegistry cleanup (#715)
    
     - Upon disconnection, if option on-client-disconnect-clear-pdxType-Ids
       is enabled, PdxTypeRegistry is supposed to be cleaned up. But the
       problem is that this was not happening each time, but each 2 times
       the client disconnects from the cluster.
     - Due to the project being moving away from ACE, ACE semaphore was
       replaced.
     - Also some methods have been renamed to better represent the
       functionality.
     - Given its quite complex to test this behaviour with IT, a UT has been
       created testing that every time pool-size reaches 0, PdxTypeRegstry
       is cleaned up.
     - Also fixed an issue within ClientMetadataService that was causing a
       coredump while trying to stop the thread when it was not started in
       the first time.
     - PdxTypeRegistry clean up is now done synchronously.
     - Added a counter for connected endpoints per endpoint.
     - Now PdxTypeRegistry clean up is triggered whenever there are no
       connected endpoints remaining rather than whenever there are no
       connections remaining in the pool. This mimics Java client
       functionality and its much more reliable.
     - A new IT has been added in favour of the previous UT.
     - In order to keep this change as minimal as possible, changes in
       ClientMetadataService have been reverted as now they are not strictly
       needed.
---
 cppcache/integration/test/CMakeLists.txt          |   1 +
 cppcache/integration/test/PdxTypeRegistryTest.cpp | 158 ++++++++++++++++++++++
 cppcache/src/TcrEndpoint.cpp                      |  55 +++++---
 cppcache/src/TcrEndpoint.hpp                      |   6 +-
 cppcache/src/TcrPoolEndPoint.cpp                  |   1 -
 cppcache/src/ThinClientBaseDM.hpp                 |   3 +
 cppcache/src/ThinClientPoolDM.cpp                 |  88 ++++++------
 cppcache/src/ThinClientPoolDM.hpp                 |  16 ++-
 8 files changed, 246 insertions(+), 82 deletions(-)

diff --git a/cppcache/integration/test/CMakeLists.txt b/cppcache/integration/test/CMakeLists.txt
index 6ad08b8..61fd240 100644
--- a/cppcache/integration/test/CMakeLists.txt
+++ b/cppcache/integration/test/CMakeLists.txt
@@ -38,6 +38,7 @@ add_executable(cpp-integration-test
   PdxInstanceTest.cpp
   PdxJsonTypeTest.cpp
   PdxSerializerTest.cpp
+  PdxTypeRegistryTest.cpp
   Position.cpp
   Position.hpp
   PositionKey.cpp
diff --git a/cppcache/integration/test/PdxTypeRegistryTest.cpp b/cppcache/integration/test/PdxTypeRegistryTest.cpp
new file mode 100644
index 0000000..11fa16e
--- /dev/null
+++ b/cppcache/integration/test/PdxTypeRegistryTest.cpp
@@ -0,0 +1,158 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more *
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include <geode/Cache.hpp>
+#include <geode/CacheFactory.hpp>
+#include <geode/CacheListener.hpp>
+#include <geode/PoolManager.hpp>
+#include <geode/QueryService.hpp>
+#include <geode/RegionFactory.hpp>
+#include <geode/RegionShortcut.hpp>
+
+#include "framework/Cluster.h"
+#include "framework/Framework.h"
+#include "framework/Gfsh.h"
+
+namespace {
+
+using apache::geode::client::Cache;
+using apache::geode::client::CacheableInt16;
+using apache::geode::client::CacheFactory;
+using apache::geode::client::CacheListener;
+using apache::geode::client::NotConnectedException;
+using apache::geode::client::PdxInstance;
+using apache::geode::client::Region;
+using apache::geode::client::RegionEvent;
+using apache::geode::client::RegionShortcut;
+using apache::geode::client::SelectResults;
+
+static bool isDisconnected = false;
+
+class RegionListener : public CacheListener {
+ public:
+  void waitConnected() {
+    std::unique_lock<decltype(mutex_)> lock{mutex_};
+    status_cv_.wait(lock, [this] { return status_; });
+  }
+
+  void waitDisconnected() {
+    std::unique_lock<decltype(mutex_)> lock{mutex_};
+    status_cv_.wait(lock, [this] { return !status_; });
+  }
+
+ protected:
+  void afterRegionDisconnected(Region&) override {
+    std::unique_lock<decltype(mutex_)> lock{mutex_};
+
+    status_ = false;
+    status_cv_.notify_all();
+  }
+
+  void afterRegionLive(const RegionEvent&) override {
+    std::unique_lock<decltype(mutex_)> lock{mutex_};
+
+    status_ = true;
+    status_cv_.notify_all();
+  }
+
+ protected:
+  bool status_;
+  std::mutex mutex_;
+  std::condition_variable status_cv_;
+};
+
+Cache createTestCache() {
+  CacheFactory cacheFactory;
+  return cacheFactory.set("log-level", "none")
+      .set("connect-timeout", "2s")
+      .set("statistic-sampling-enabled", "false")
+      .set("on-client-disconnect-clear-pdxType-Ids", "true")
+      .setPdxReadSerialized(true)
+      .create();
+}
+
+void createTestPool(Cluster& cluster, Cache& cache) {
+  auto poolFactory = cache.getPoolManager()
+                         .createFactory()
+                         .setReadTimeout(std::chrono::seconds{1})
+                         .setPingInterval(std::chrono::seconds{5})
+                         .setSubscriptionEnabled(true);
+
+  cluster.applyLocators(poolFactory);
+  poolFactory.create("pool");
+}
+
+std::shared_ptr<Region> createTestRegion(
+    Cache& cache, std::shared_ptr<RegionListener> listener) {
+  auto regionFactory = cache.createRegionFactory(RegionShortcut::PROXY);
+  return regionFactory.setPoolName("pool").setCacheListener(listener).create(
+      "region");
+}
+
+std::shared_ptr<PdxInstance> createTestPdxInstance(Cache& cache,
+                                                   const std::string& entry) {
+  auto factory = cache.createPdxInstanceFactory("__GEMFIRE_JSON", false);
+  return factory.writeString("entryName", entry)
+      .writeInt("int-value", -1)
+      .create();
+}
+
+TEST(PdxTypeRegistryTest, cleanupOnClusterRestart) {
+  Cluster cluster{LocatorCount{1}, ServerCount{2}};
+  cluster.start();
+
+  auto& gfsh = cluster.getGfsh();
+  gfsh.create().region().withName("region").withType("PARTITION").execute();
+
+  auto listener = std::make_shared<RegionListener>();
+
+  auto cache = createTestCache();
+  createTestPool(cluster, cache);
+  auto qs = cache.getQueryService("pool");
+  auto region = createTestRegion(cache, listener);
+
+  std::string key = "before-shutdown";
+  region->put(key, createTestPdxInstance(cache, key));
+
+  // Shutdown and wait for some time
+  gfsh.shutdown().execute();
+  listener->waitDisconnected();
+  std::this_thread::sleep_for(std::chrono::seconds{15});
+
+  for (auto& server : cluster.getServers()) {
+    server.start();
+  }
+
+  listener->waitConnected();
+
+  key = "after-restart";
+  region->put(key, createTestPdxInstance(cache, key));
+
+  // If PdxTypeRegistry was cleaned up, then the PdxType should have been
+  // registered in the new cluster
+
+  std::shared_ptr<SelectResults> result;
+  auto query =
+      qs->newQuery("SELECT * FROM /region WHERE entryName = '" + key + "'");
+
+  EXPECT_NO_THROW(result = query->execute());
+  EXPECT_TRUE(result);
+  EXPECT_GT(result->size(), 0);
+}
+
+}  // namespace
diff --git a/cppcache/src/TcrEndpoint.cpp b/cppcache/src/TcrEndpoint.cpp
index 5ec3c3c..7b49104 100644
--- a/cppcache/src/TcrEndpoint.cpp
+++ b/cppcache/src/TcrEndpoint.cpp
@@ -68,7 +68,7 @@ TcrEndpoint::TcrEndpoint(const std::string& name, CacheImpl* cacheImpl,
       m_msgSent(false),
       m_pingSent(false),
       m_isMultiUserMode(isMultiUserMode),
-      m_connected(false),
+      connected_(false),
       m_isActiveEndpoint(false),
       m_serverQueueStatus(NON_REDUNDANT_SERVER),
       m_queueSize(0),
@@ -81,7 +81,7 @@ TcrEndpoint::TcrEndpoint(const std::string& name, CacheImpl* cacheImpl,
 }
 
 TcrEndpoint::~TcrEndpoint() {
-  m_connected = false;
+  connected_ = false;
   m_isActiveEndpoint = false;
   closeConnections();
   {
@@ -209,7 +209,6 @@ GfErrType TcrEndpoint::createNewConnection(
             break;
           }
         }
-        // m_connected = true;
       }
       err = GF_NOERR;
       break;
@@ -354,7 +353,7 @@ ServerQueueStatus TcrEndpoint::getFreshServerQueueStatus(
       } else {
         statusConn = newConn;
       }
-      m_connected = true;
+      setConnected(true);
       return status;
     } else {
       //  remove port from ports list (which is sent to server in notification
@@ -393,12 +392,12 @@ GfErrType TcrEndpoint::registerDM(bool clientNotification, bool isSecondary,
   } else if (!m_isActiveEndpoint) {
     int maxConnections = 0;
     if (isActiveEndpoint) {
-      if (m_connected) {
+      if (connected_) {
         maxConnections = m_maxConnections - 1;
       } else {
         maxConnections = m_maxConnections;
       }
-    } else if (!m_connected) {
+    } else if (!connected_) {
       maxConnections = 1;
     }
     if (maxConnections > 0) {
@@ -412,8 +411,8 @@ GfErrType TcrEndpoint::registerDM(bool clientNotification, bool isSecondary,
                                        m_cacheImpl->getDistributedSystem()
                                            .getSystemProperties()
                                            .connectTimeout(),
-                                       0, m_connected)) != GF_NOERR) {
-          m_connected = false;
+                                       0, connected_)) != GF_NOERR) {
+          setConnected(false);
           m_isActiveEndpoint = false;
           closeConnections();
           return err;
@@ -424,12 +423,12 @@ GfErrType TcrEndpoint::registerDM(bool clientNotification, bool isSecondary,
               (isSecondary ? "secondary server "
                            : (isActiveEndpoint ? "" : "primary server ")),
               m_name.c_str());
-      m_connected = true;
+      setConnected(true);
       m_isActiveEndpoint = isActiveEndpoint;
     }
   }
 
-  if (m_connected || connected) {
+  if (connected_ || connected) {
     if (clientNotification) {
       if (distMgr != nullptr) {
         std::lock_guard<decltype(m_distMgrsLock)> guardDistMgrs(m_distMgrsLock);
@@ -449,7 +448,7 @@ GfErrType TcrEndpoint::registerDM(bool clientNotification, bool isSecondary,
                                                .connectTimeout() *
                                            3,
                                        0)) != GF_NOERR) {
-          m_connected = false;
+          setConnected(false);
           m_isActiveEndpoint = false;
           closeConnections();
           LOGWARN("Failed to start subscription channel for endpoint %s",
@@ -464,7 +463,7 @@ GfErrType TcrEndpoint::registerDM(bool clientNotification, bool isSecondary,
       ++m_numRegionListener;
       LOGFINEST("Incremented notification region count for endpoint %s to %d",
                 m_name.c_str(), m_numRegionListener);
-      m_connected = true;
+      setConnected(true);
     }
   }
 
@@ -499,7 +498,7 @@ void TcrEndpoint::unregisterDM(bool clientNotification,
 
 void TcrEndpoint::pingServer(ThinClientPoolDM* poolDM) {
   LOGDEBUG("Sending ping message to endpoint %s", m_name.c_str());
-  if (!m_connected) {
+  if (!connected_) {
     LOGFINER("Skipping ping task for disconnected endpoint %s", m_name.c_str());
     return;
   }
@@ -532,7 +531,7 @@ void TcrEndpoint::pingServer(ThinClientPoolDM* poolDM) {
       bool connected = (error == GF_NOERR)
                            ? (reply.getMessageType() == TcrMessage::REPLY)
                            : false;
-      if (m_connected != connected) {
+      if (connected_ != connected) {
         setConnectionStatus(connected);
       }
     }
@@ -673,7 +672,7 @@ void TcrEndpoint::receiveNotification(std::atomic<bool>& isRunning) {
       LOGFINER(
           "IO exception while receiving subscription event for endpoint %s: %s",
           m_name.c_str(), e.what());
-      if (m_connected) {
+      if (connected_) {
         setConnectionStatus(false);
         // close notification channel
         std::lock_guard<decltype(m_notifyReceiverLock)> guard(
@@ -913,7 +912,7 @@ GfErrType TcrEndpoint::sendRequestWithRetry(
              m_name.c_str());
     if (createNewConn) {
       createNewConn = false;
-      if (!m_connected) {
+      if (!connected_) {
         return GF_NOTCON;
       } else if ((error =
                       createNewConnection(conn, false, false,
@@ -932,7 +931,7 @@ GfErrType TcrEndpoint::sendRequestWithRetry(
       // max wait time to get a connection
       conn = m_opConnections.getUntil(timeout);
     }
-    if (!m_connected) {
+    if (!connected_) {
       return GF_NOTCON;
     }
     if (conn != nullptr) {
@@ -1136,6 +1135,17 @@ GfErrType TcrEndpoint::sendRequestConnWithRetry(const TcrMessage& request,
   return error;
 }
 
+void TcrEndpoint::setConnected(bool status) {
+  bool flag = !status;
+  if (connected_.compare_exchange_strong(flag, status)) {
+    if (status) {
+      m_baseDM->incConnectedEndpoints();
+    } else {
+      m_baseDM->decConnectedEndpoints();
+    }
+  }
+}
+
 void TcrEndpoint::setConnectionStatus(bool status) {
   // : Store the original value of m_isActiveEndpoint.
   // This is to try make failover more resilient for the case when
@@ -1149,17 +1159,18 @@ void TcrEndpoint::setConnectionStatus(bool status) {
   // bool wasActive = m_isActiveEndpoint;
   // Then after taking the lock:
   // If ( !wasActive && isActiveEndpoint ) { return; }
-  std::lock_guard<decltype(m_connectionLock)> guard(m_connectionLock);
-  if (m_connected != status) {
-    bool connected = m_connected;
-    m_connected = status;
-    if (connected) {
+  bool flag = !status;
+  if (connected_.compare_exchange_strong(flag, status)) {
+    if (status) {
+      m_baseDM->incConnectedEndpoints();
+    } else {
       m_numberOfTimesFailed += 1;
       m_isAuthenticated = false;
       // disconnected
       LOGFINE("Disconnecting from endpoint %s", m_name.c_str());
       closeConnections();
       m_isActiveEndpoint = false;
+      m_baseDM->decConnectedEndpoints();
       LOGFINE("Disconnected from endpoint %s", m_name.c_str());
       triggerRedundancyThread();
     }
diff --git a/cppcache/src/TcrEndpoint.hpp b/cppcache/src/TcrEndpoint.hpp
index bfbda3c..9827e33 100644
--- a/cppcache/src/TcrEndpoint.hpp
+++ b/cppcache/src/TcrEndpoint.hpp
@@ -89,7 +89,7 @@ class TcrEndpoint : public std::enable_shared_from_this<TcrEndpoint> {
   void stopNotifyReceiverAndCleanup();
   void stopNoBlock();
 
-  bool inline connected() const { return m_connected; }
+  bool inline connected() const { return connected_; }
 
   int inline numRegions() const { return m_numRegions; }
 
@@ -154,7 +154,7 @@ class TcrEndpoint : public std::enable_shared_from_this<TcrEndpoint> {
                                   bool isClientNotification, bool isSecondary,
                                   std::chrono::microseconds connectTimeout);
 
-  void setConnected(volatile bool connected = true) { m_connected = connected; }
+  void setConnected(bool connected = true);
   virtual ThinClientPoolDM* getPoolHADM() const { return nullptr; }
   bool isQueueHosted();
   std::recursive_mutex& getQueueHostedMutex() { return m_notifyReceiverLock; }
@@ -225,7 +225,7 @@ class TcrEndpoint : public std::enable_shared_from_this<TcrEndpoint> {
   volatile bool m_msgSent;
   volatile bool m_pingSent;
   bool m_isMultiUserMode;
-  volatile bool m_connected;
+  std::atomic<bool> connected_;
   bool m_isActiveEndpoint;
   ServerQueueStatus m_serverQueueStatus;
   int32_t m_queueSize;
diff --git a/cppcache/src/TcrPoolEndPoint.cpp b/cppcache/src/TcrPoolEndPoint.cpp
index c270929..628ff1b 100644
--- a/cppcache/src/TcrPoolEndPoint.cpp
+++ b/cppcache/src/TcrPoolEndPoint.cpp
@@ -74,7 +74,6 @@ GfErrType TcrPoolEndPoint::registerDM(bool, bool isSecondary, bool,
       return err;
     }
     m_dm->addConnection(newConn);
-    // m_connected = true;
     setConnected(true);
   }
 
diff --git a/cppcache/src/ThinClientBaseDM.hpp b/cppcache/src/ThinClientBaseDM.hpp
index 57ae3b6..8c760e0 100644
--- a/cppcache/src/ThinClientBaseDM.hpp
+++ b/cppcache/src/ThinClientBaseDM.hpp
@@ -117,6 +117,9 @@ class ThinClientBaseDM {
 
   virtual TcrEndpoint* getActiveEndpoint() { return nullptr; }
 
+  virtual void incConnectedEndpoints() {}
+  virtual void decConnectedEndpoints() {}
+
   virtual bool checkDupAndAdd(std::shared_ptr<EventId> eventid);
 
   virtual std::recursive_mutex& getRedundancyLock();
diff --git a/cppcache/src/ThinClientPoolDM.cpp b/cppcache/src/ThinClientPoolDM.cpp
index 80b464d..8ae524e 100644
--- a/cppcache/src/ThinClientPoolDM.cpp
+++ b/cppcache/src/ThinClientPoolDM.cpp
@@ -142,7 +142,6 @@ ThinClientPoolDM::ThinClientPoolDM(const char* name,
       m_sticky(false),
       update_locators_semaphore_(0),
       ping_semaphore_(0),
-      cli_callback_semaphore_(0),
       m_isDestroyed(false),
       m_destroyPending(false),
       m_destroyPendingHADM(false),
@@ -155,11 +154,11 @@ ThinClientPoolDM::ThinClientPoolDM(const char* name,
       m_connManageTask(nullptr),
       m_pingTask(nullptr),
       m_updateLocatorListTask(nullptr),
-      m_cliCallbackTask(nullptr),
       m_pingTaskId(-1),
       m_updateLocatorListTaskId(-1),
       m_connManageTaskId(-1),
       m_clientOps(0),
+      connected_endpoints_(0),
       m_PoolStatsSampler(nullptr),
       m_clientMetadataService(nullptr),
       m_primaryServerQueueSize(PRIMARY_QUEUE_NOT_AVAILABLE) {
@@ -172,18 +171,18 @@ ThinClientPoolDM::ThinClientPoolDM(const char* name,
   auto cacheImpl = m_connManager.getCacheImpl();
   auto& distributedSystem = cacheImpl->getDistributedSystem();
 
-  auto& sysProp = distributedSystem.getSystemProperties();
+  auto& props = distributedSystem.getSystemProperties();
   // to set security flag at pool level
   m_isSecurityOn = cacheImpl->getAuthInitialize() != nullptr;
 
-  const auto& durableId = sysProp.durableClientId();
+  const auto& durableId = props.durableClientId();
 
   std::string clientDurableId = durableId;
   if (!m_poolName.empty()) {
     clientDurableId += "_gem_" + m_poolName;
   }
 
-  const auto durableTimeOut = sysProp.durableTimeout();
+  const auto durableTimeOut = props.durableTimeout();
   m_memId = cacheImpl->getClientProxyMembershipIDFactory().create(
       clientDurableId.c_str(), durableTimeOut);
 
@@ -201,7 +200,7 @@ ThinClientPoolDM::ThinClientPoolDM(const char* name,
       cacheImpl->getStatisticsManager().getStatisticsFactory(), m_poolName);
   cacheImpl->getStatisticsManager().forceSample();
 
-  if (!sysProp.isEndpointShufflingDisabled()) {
+  if (!props.isEndpointShufflingDisabled()) {
     if (!m_attrs->m_initServList.empty()) {
       RandGen randgen;
       m_server = randgen(static_cast<uint32_t>(m_attrs->m_initServList.size()));
@@ -212,6 +211,8 @@ ThinClientPoolDM::ThinClientPoolDM(const char* name,
         std::unique_ptr<ClientMetadataService>(new ClientMetadataService(this));
   }
   m_manager = new ThinClientStickyManager(this);
+
+  clear_pdx_registry_ = props.onClientDisconnectClearPdxTypeIds();
 }
 
 void ThinClientPoolDM::init() {
@@ -265,23 +266,16 @@ std::shared_ptr<Properties> ThinClientPoolDM::getCredentials(TcrEndpoint* ep) {
 }
 
 void ThinClientPoolDM::startBackgroundThreads() {
+  auto& props = m_connManager.getCacheImpl()
+                    ->getDistributedSystem()
+                    .getSystemProperties();
+
   LOGDEBUG("ThinClientPoolDM::startBackgroundThreads: Starting ping thread");
   m_pingTask =
       std::unique_ptr<Task<ThinClientPoolDM>>(new Task<ThinClientPoolDM>(
           this, &ThinClientPoolDM::pingServer, NC_Ping_Thread));
   m_pingTask->start();
 
-  auto& props = m_connManager.getCacheImpl()
-                    ->getDistributedSystem()
-                    .getSystemProperties();
-
-  if (props.onClientDisconnectClearPdxTypeIds()) {
-    m_cliCallbackTask =
-        std::unique_ptr<Task<ThinClientPoolDM>>(new Task<ThinClientPoolDM>(
-            this, &ThinClientPoolDM::cliCallback, "NC_cliCallback"));
-    m_cliCallbackTask->start();
-  }
-
   const auto& pingInterval = getPingInterval();
   if (pingInterval > std::chrono::seconds::zero()) {
     LOGDEBUG(
@@ -792,16 +786,6 @@ void ThinClientPoolDM::stopUpdateLocatorListThread() {
   }
 }
 
-void ThinClientPoolDM::stopCliCallbackThread() {
-  if (m_cliCallbackTask) {
-    LOGFINE("ThinClientPoolDM::destroy(): Closing cliCallback thread.");
-    m_cliCallbackTask->stopNoblock();
-    cli_callback_semaphore_.release();
-    m_cliCallbackTask->wait();
-    m_cliCallbackTask = nullptr;
-  }
-}
-
 void ThinClientPoolDM::destroy(bool keepAlive) {
   LOGDEBUG("ThinClientPoolDM::destroy...");
   if (!m_isDestroyed && (!m_destroyPending || m_destroyPendingHADM)) {
@@ -818,9 +802,6 @@ void ThinClientPoolDM::destroy(bool keepAlive) {
       m_PoolStatsSampler = nullptr;
     }
     LOGDEBUG("PoolStatsSampler thread closed .");
-    // TODO suspect
-    // NOLINTNEXTLINE(clang-analyzer-optin.cplusplus.VirtualCall)
-    stopCliCallbackThread();
     LOGDEBUG("ThinClientPoolDM::destroy( ): Closing connection manager.");
     auto cacheImpl = m_connManager.getCacheImpl();
     if (m_connManageTask) {
@@ -931,7 +912,8 @@ int32_t ThinClientPoolDM::GetPDXIdForType(
 
   TcrMessageReply reply(true, this);
 
-  throwExceptionIfError("Operation Failed", sendSyncRequest(request, reply));
+  auto err = sendSyncRequest(request, reply);
+  throwExceptionIfError("Operation Failed", err);
 
   if (reply.getMessageType() == TcrMessage::EXCEPTION) {
     LOGDEBUG("ThinClientPoolDM::GetPDXTypeById: Exception = " +
@@ -1740,11 +1722,8 @@ GfErrType ThinClientPoolDM::createPoolConnectionToAEndPoint(
 }
 
 void ThinClientPoolDM::reducePoolSize(int num) {
-  LOGFINE("removing connection %d ,  pool-size =%d", num, m_poolSize.load());
+  LOGFINE("Removing %d connections, pool-size=%d", num, m_poolSize.load());
   m_poolSize -= num;
-  if (m_poolSize <= 0) {
-    if (m_cliCallbackTask != nullptr) cli_callback_semaphore_.release();
-  }
 }
 
 GfErrType ThinClientPoolDM::createPoolConnection(
@@ -2076,6 +2055,21 @@ void ThinClientPoolDM::updateLocatorList(std::atomic<bool>& isRunning) {
   LOGFINE("Ending updateLocatorList thread for pool %s", m_poolName.c_str());
 }
 
+void ThinClientPoolDM::incConnectedEndpoints() {
+  auto val = ++connected_endpoints_;
+  LOGDEBUG("Pool %s has incremented to %d the number of connected endpoints",
+           m_poolName.c_str(), val);
+}
+
+void ThinClientPoolDM::decConnectedEndpoints() {
+  auto val = --connected_endpoints_;
+  LOGDEBUG("Pool %s has decremented to %d the number of connected endpoints",
+           m_poolName.c_str(), val);
+  if (val <= 0 && clear_pdx_registry_) {
+    clearPdxTypeRegistry();
+  }
+}
+
 void ThinClientPoolDM::pingServer(std::atomic<bool>& isRunning) {
   LOGFINE("Starting ping thread for pool %s", m_poolName.c_str());
 
@@ -2091,21 +2085,15 @@ void ThinClientPoolDM::pingServer(std::atomic<bool>& isRunning) {
   LOGFINE("Ending ping thread for pool %s", m_poolName.c_str());
 }
 
-void ThinClientPoolDM::cliCallback(std::atomic<bool>& isRunning) {
-  LOGFINE("Starting cliCallback thread for pool %s", m_poolName.c_str());
-  while (isRunning) {
-    cli_callback_semaphore_.acquire();
-    if (isRunning) {
-      LOGFINE("Clearing Pdx Type Registry");
-      // this call for csharp client
-      DistributedSystemImpl::CallCliCallBack(
-          *(m_connManager.getCacheImpl()->getCache()));
-      // this call for cpp client
-      m_connManager.getCacheImpl()->getPdxTypeRegistry()->clear();
-      cli_callback_semaphore_.acquire();
-    }
-  }
-  LOGFINE("Ending cliCallback thread for pool %s", m_poolName.c_str());
+void ThinClientPoolDM::clearPdxTypeRegistry() {
+  LOGFINE("Clearing PdxTypeRegistry of pool %s", m_poolName.c_str());
+  auto cache_impl = m_connManager.getCacheImpl();
+
+  // C# call
+  DistributedSystemImpl::CallCliCallBack(*(cache_impl->getCache()));
+
+  // C++ call
+  cache_impl->getPdxTypeRegistry()->clear();
 }
 
 int ThinClientPoolDM::doPing(const ACE_Time_Value&, const void*) {
diff --git a/cppcache/src/ThinClientPoolDM.hpp b/cppcache/src/ThinClientPoolDM.hpp
index 63576e0..460e1df 100644
--- a/cppcache/src/ThinClientPoolDM.hpp
+++ b/cppcache/src/ThinClientPoolDM.hpp
@@ -104,11 +104,12 @@ class ThinClientPoolDM
   void addConnection(TcrConnection* conn);
 
   std::shared_ptr<TcrEndpoint> addEP(ServerLocation& serverLoc);
-
   std::shared_ptr<TcrEndpoint> addEP(const std::string& endpointName);
+
+  virtual void clearPdxTypeRegistry();
+
   virtual void pingServer(std::atomic<bool>& isRunning);
   virtual void updateLocatorList(std::atomic<bool>& isRunning);
-  virtual void cliCallback(std::atomic<bool>& isRunning);
   virtual void pingServerLocal();
 
   ~ThinClientPoolDM() override;
@@ -132,6 +133,9 @@ class ThinClientPoolDM
   void incRegionCount();
   void decRegionCount();
 
+  void incConnectedEndpoints() override;
+  void decConnectedEndpoints() override;
+
   virtual void setStickyNull(bool isBGThread) {
     if (!isBGThread) m_manager->setStickyConnection(nullptr, false);
   }
@@ -199,16 +203,15 @@ class ThinClientPoolDM
   void netDown();
   binary_semaphore update_locators_semaphore_;
   binary_semaphore ping_semaphore_;
-  binary_semaphore cli_callback_semaphore_;
   volatile bool m_isDestroyed;
   volatile bool m_destroyPending;
   volatile bool m_destroyPendingHADM;
-  void checkRegions();
   std::shared_ptr<RemoteQueryService> m_remoteQueryServicePtr;
+
+  void checkRegions();
   virtual void startBackgroundThreads();
   virtual void stopPingThread();
   virtual void stopUpdateLocatorListThread();
-  virtual void stopCliCallbackThread();
   virtual void cleanStickyConnections(std::atomic<bool>& isRunning);
   virtual TcrConnection* getConnectionFromQueue(bool timeout, GfErrType* error,
                                                 std::set<ServerLocation>&,
@@ -249,6 +252,7 @@ class ThinClientPoolDM
   // get endpoint using the endpoint string
   std::shared_ptr<TcrEndpoint> getEndpoint(const std::string& epNameStr);
 
+  bool clear_pdx_registry_{false};
   bool m_isSecurityOn;
   bool m_isMultiUserMode;
 
@@ -294,7 +298,6 @@ class ThinClientPoolDM
   std::unique_ptr<Task<ThinClientPoolDM>> m_connManageTask;
   std::unique_ptr<Task<ThinClientPoolDM>> m_pingTask;
   std::unique_ptr<Task<ThinClientPoolDM>> m_updateLocatorListTask;
-  std::unique_ptr<Task<ThinClientPoolDM>> m_cliCallbackTask;
   ExpiryTaskManager::id_type m_pingTaskId;
   ExpiryTaskManager::id_type m_updateLocatorListTaskId;
   ExpiryTaskManager::id_type m_connManageTaskId;
@@ -306,6 +309,7 @@ class ThinClientPoolDM
   void cleanStaleConnections(std::atomic<bool>& isRunning);
   void restoreMinConnections(std::atomic<bool>& isRunning);
   std::atomic<int32_t> m_clientOps;  // Actual Size of Pool
+  std::atomic<int32_t> connected_endpoints_;
   std::unique_ptr<statistics::PoolStatsSampler> m_PoolStatsSampler;
   std::unique_ptr<ClientMetadataService> m_clientMetadataService;
   friend class CacheImpl;