You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2021/05/04 18:16:51 UTC

[geode-native] branch develop updated: GEODE-8991: Implement cached region clean up (#757) (#799)

This is an automated email from the ASF dual-hosted git repository.

bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git


The following commit(s) were added to refs/heads/develop by this push:
     new b5cc772  GEODE-8991: Implement cached region clean up (#757) (#799)
b5cc772 is described below

commit b5cc77231ffef370a7df4acea6d3021ecbcabcf4
Author: Mario Salazar de Torres <ma...@est.tech>
AuthorDate: Tue May 4 20:16:43 2021 +0200

    GEODE-8991: Implement cached region clean up (#757) (#799)
    
    - Whenever subscription redundancy is lost cached regions are supposed
       to be cleaned up. This commit implements interest recovery to achieve
       precisely that.
     - Whenever subscription redundancy is lost all entries with interest
       registered are cleaned up.
     - Implemented an optimization for regions which have interest
       registered to all keys.
     - ITs implemented to verify the functionality.
     - Adapted RegisterAllWithConsistencyDisabled test to work with the
       modified CacheListener mock.
     - As noted for RegisterAllWithConsistencyDisabled case, a fail-safe
       have been added to avoid a rare race condition.
     - Fixed coredump when trying to clear interest registry keys for
       regions with caching disabled.
     - Fixed exception whenever cleaning up cached region which interest
       list is a set of keys and some of them were already destroyed.
     - Export binary_semaphore class to make it avaialable for ITs. In the
       future it would be nice to have a library with all of the utilities
       common to the apache-geode library and tests.
---
 cppcache/integration/test/RegisterKeysTest.cpp    | 338 ++++++++++++++++++++--
 cppcache/src/ConcurrentEntriesMap.cpp             |   2 +
 cppcache/src/ConcurrentEntriesMap.hpp             |   5 +
 cppcache/src/EntriesMap.hpp                       |   3 +
 cppcache/src/LocalRegion.cpp                      |  77 ++++-
 cppcache/src/LocalRegion.hpp                      |  17 +-
 cppcache/src/RegionInternal.cpp                   |   2 +
 cppcache/src/RegionInternal.hpp                   |  58 ++--
 cppcache/src/ThinClientPoolHADM.cpp               |   7 +
 cppcache/src/ThinClientPoolHADM.hpp               |   1 +
 cppcache/src/ThinClientRedundancyManager.cpp      |   1 +
 cppcache/src/ThinClientRegion.cpp                 |  17 ++
 cppcache/src/ThinClientRegion.hpp                 |   2 +
 cppcache/src/util/concurrent/binary_semaphore.hpp |   4 +-
 14 files changed, 472 insertions(+), 62 deletions(-)

diff --git a/cppcache/integration/test/RegisterKeysTest.cpp b/cppcache/integration/test/RegisterKeysTest.cpp
index 2ce04a2..deae4c5 100644
--- a/cppcache/integration/test/RegisterKeysTest.cpp
+++ b/cppcache/integration/test/RegisterKeysTest.cpp
@@ -19,39 +19,44 @@
 #include <condition_variable>
 #include <mutex>
 
+#include <boost/thread/latch.hpp>
+
 #include <gtest/gtest.h>
 
 #include <geode/Cache.hpp>
 #include <geode/CacheFactory.hpp>
 #include <geode/EntryEvent.hpp>
+#include <geode/RegionEvent.hpp>
 #include <geode/RegionFactory.hpp>
 #include <geode/RegionShortcut.hpp>
 
 #include "framework/Cluster.h"
 #include "framework/Framework.h"
 #include "framework/Gfsh.h"
-
-class CacheListenerMock : public apache::geode::client::CacheListener {
- public:
-  MOCK_METHOD1(afterDestroy,
-               void(const apache::geode::client::EntryEvent& event));
-};
+#include "mock/CacheListenerMock.hpp"
+#include "util/concurrent/binary_semaphore.hpp"
 
 namespace {
 
+using apache::geode::client::binary_semaphore;
 using apache::geode::client::Cache;
 using apache::geode::client::CacheableInt16;
 using apache::geode::client::CacheableKey;
 using apache::geode::client::CacheableString;
 using apache::geode::client::CacheFactory;
+using apache::geode::client::CacheListenerMock;
 using apache::geode::client::IllegalStateException;
 using apache::geode::client::Region;
 using apache::geode::client::RegionShortcut;
+
 using ::testing::_;
 using ::testing::DoAll;
 using ::testing::InvokeWithoutArgs;
+using ::testing::Return;
 
-ACTION_P(CvNotifyOne, cv) { cv->notify_one(); }
+ACTION_P(ReleaseSem, sem) { sem->release(); }
+ACTION_P(AcquireSem, sem) { sem->acquire(); }
+ACTION_P(CountDownLatch, latch) { latch->count_down(); }
 
 Cache createTestCache() {
   CacheFactory cacheFactory;
@@ -152,14 +157,13 @@ TEST(RegisterKeysTest, RegisterAllWithConsistencyDisabled) {
     producer_region = setupProxyRegion(producer_cache);
   }
 
-  std::mutex cv_mutex;
-  bool destroyed = false;
-  std::condition_variable cv;
+  binary_semaphore sem{0};
   auto listener = std::make_shared<CacheListenerMock>();
-  EXPECT_CALL(*listener, afterDestroy(_))
-      .Times(1)
-      .WillOnce(DoAll(InvokeWithoutArgs([&destroyed] { destroyed = true; }),
-                      CvNotifyOne(&cv)));
+
+  EXPECT_CALL(*listener, afterCreate(_)).WillRepeatedly(Return());
+  EXPECT_CALL(*listener, afterRegionLive(_)).WillRepeatedly(Return());
+  EXPECT_CALL(*listener, afterRegionDisconnected(_)).WillRepeatedly(Return());
+  EXPECT_CALL(*listener, afterDestroy(_)).Times(1).WillOnce(ReleaseSem(&sem));
 
   {
     auto poolFactory =
@@ -179,11 +183,307 @@ TEST(RegisterKeysTest, RegisterAllWithConsistencyDisabled) {
   producer_region->put("one", std::make_shared<CacheableInt16>(1));
   producer_region->destroy("one");
 
+  EXPECT_TRUE(sem.try_acquire_for(std::chrono::minutes{1}));
+}
+
+TEST(RegisterKeysTest, RegisterAnyAndClusterRestart) {
+  auto N = 100U;
+  boost::latch create_latch{N};
+  binary_semaphore live_sem{0};
+  binary_semaphore shut_sem{1};
+
+  Cluster cluster{LocatorCount{1}, ServerCount{1}};
+  cluster.start();
+
+  auto& gfsh = cluster.getGfsh();
+  gfsh.create().region().withName("region").withType("REPLICATE").execute();
+
+  auto cache = createTestCache();
+  {
+    auto poolFactory =
+        cache.getPoolManager().createFactory().setSubscriptionEnabled(true);
+    cluster.applyLocators(poolFactory);
+    poolFactory.create("default");
+  }
+
+  auto listener = std::make_shared<CacheListenerMock>();
+  EXPECT_CALL(*listener, afterRegionLive(_))
+      .WillRepeatedly(DoAll(ReleaseSem(&live_sem), AcquireSem(&shut_sem)));
+  EXPECT_CALL(*listener, afterRegionDisconnected(_))
+      .WillRepeatedly(DoAll(ReleaseSem(&shut_sem), AcquireSem(&live_sem)));
+  EXPECT_CALL(*listener, afterCreate(_))
+      .Times(N)
+      .WillRepeatedly(CountDownLatch(&create_latch));
+
+  auto region = cache.createRegionFactory(RegionShortcut::CACHING_PROXY)
+                    .setPoolName("default")
+                    .setCacheListener(listener)
+                    .create("region");
+  region->registerAllKeys(false, true);
+  EXPECT_EQ(region->keys().size(), 0);
+
+  auto producer = std::thread([&region, N] {
+    for (auto i = 0U; i < N;) {
+      auto key = "entry-" + std::to_string(i++);
+      auto value = "{\"entryName\": \"" + key + "\"}";
+      region->put(key, value);
+    }
+  });
+
+  create_latch.wait();
+
+  producer.join();
+  gfsh.shutdown().execute();
+
+  shut_sem.acquire();
+  shut_sem.release();
+
+  for (auto& server : cluster.getServers()) {
+    server.start();
+  }
+
+  live_sem.acquire();
+  live_sem.release();
+  EXPECT_EQ(region->keys().size(), 0);
+}
+
+TEST(RegisterKeysTest, RegisterRegexAndClusterRestart) {
+  auto N_1 = 10U;
+  auto N_2 = 90U;
+  auto N = N_1 + N_2;
+  binary_semaphore live_sem{0};
+  binary_semaphore shut_sem{1};
+  boost::latch create_latch{N};
+
+  Cluster cluster{LocatorCount{1}, ServerCount{1}};
+  cluster.start();
+
+  auto& gfsh = cluster.getGfsh();
+  gfsh.create().region().withName("region").withType("REPLICATE").execute();
+
+  auto cache = createTestCache();
+  {
+    auto poolFactory =
+        cache.getPoolManager().createFactory().setSubscriptionEnabled(true);
+    cluster.applyLocators(poolFactory);
+    poolFactory.create("default");
+  }
+
+  auto listener = std::make_shared<CacheListenerMock>();
+  EXPECT_CALL(*listener, afterRegionLive(_))
+      .WillRepeatedly(DoAll(ReleaseSem(&live_sem), AcquireSem(&shut_sem)));
+  EXPECT_CALL(*listener, afterRegionDisconnected(_))
+      .WillRepeatedly(DoAll(ReleaseSem(&shut_sem), AcquireSem(&live_sem)));
+  EXPECT_CALL(*listener, afterCreate(_))
+      .Times(N)
+      .WillRepeatedly(CountDownLatch(&create_latch));
+
+  auto region = cache.createRegionFactory(RegionShortcut::CACHING_PROXY)
+                    .setPoolName("default")
+                    .setCacheListener(listener)
+                    .create("region");
+  region->registerRegex("interest-.*", false, true);
+  EXPECT_EQ(region->keys().size(), 0);
+
+  auto producer_non_interest = std::thread([&region, N_1] {
+    for (auto i = 0U; i < N_1;) {
+      auto key = "entry-" + std::to_string(i++);
+      auto value = "{\"entryName\": \"" + key + "\"}";
+      region->put(key, value);
+    }
+  });
+
+  auto producer_interest = std::thread([&region, N_2] {
+    for (auto i = 0U; i < N_2;) {
+      auto key = "interest-" + std::to_string(i++);
+      auto value = "{\"entryName\": \"" + key + "\"}";
+      region->put(key, value);
+    }
+  });
+
+  create_latch.wait();
+
+  producer_non_interest.join();
+  producer_interest.join();
+
+  gfsh.shutdown().execute();
+
+  shut_sem.acquire();
+  shut_sem.release();
+
+  for (auto& server : cluster.getServers()) {
+    server.start();
+  }
+
+  live_sem.acquire();
+  live_sem.release();
+  EXPECT_EQ(region->keys().size(), N_1);
+}
+
+TEST(RegisterKeysTest, RegisterKeySetAndClusterRestart) {
+  auto N_1 = 10U;
+  std::vector<std::shared_ptr<CacheableKey>> interest_keys{
+      CacheableKey::create("dolores-1"),
+      CacheableKey::create("maeve-1"),
+      CacheableKey::create("bernard-2"),
+      CacheableKey::create("theodore-3"),
+      CacheableKey::create("william-5"),
+      CacheableKey::create("clementine-8"),
+      CacheableKey::create("abernathy-13"),
+      CacheableKey::create("ford-21"),
+  };
+
+  auto N = N_1 + interest_keys.size();
+  binary_semaphore live_sem{0};
+  binary_semaphore shut_sem{1};
+  boost::latch create_latch{N};
+  Cluster cluster{LocatorCount{1}, ServerCount{1}};
+  cluster.start();
+
+  auto& gfsh = cluster.getGfsh();
+  gfsh.create().region().withName("region").withType("REPLICATE").execute();
+
+  auto cache = createTestCache();
+  {
+    auto poolFactory =
+        cache.getPoolManager().createFactory().setSubscriptionEnabled(true);
+    cluster.applyLocators(poolFactory);
+    poolFactory.create("default");
+  }
+
+  auto listener = std::make_shared<CacheListenerMock>();
+  EXPECT_CALL(*listener, afterRegionLive(_))
+      .WillRepeatedly(DoAll(ReleaseSem(&live_sem), AcquireSem(&shut_sem)));
+  EXPECT_CALL(*listener, afterRegionDisconnected(_))
+      .WillRepeatedly(DoAll(ReleaseSem(&shut_sem), AcquireSem(&live_sem)));
+  EXPECT_CALL(*listener, afterCreate(_))
+      .Times(N)
+      .WillRepeatedly(CountDownLatch(&create_latch));
+
+  auto region = cache.createRegionFactory(RegionShortcut::CACHING_PROXY)
+                    .setPoolName("default")
+                    .setCacheListener(listener)
+                    .create("region");
+
+  region->registerKeys(interest_keys, false, true);
+  EXPECT_EQ(region->keys().size(), 0);
+
+  auto producer_non_interest = std::thread([&region, N_1] {
+    for (auto i = 0U; i < N_1;) {
+      auto key = "entry-" + std::to_string(i++);
+      auto value = "{\"entryName\": \"" + key + "\"}";
+      region->put(key, value);
+    }
+  });
+
+  auto producer_interest = std::thread([&region, &interest_keys] {
+    for (auto key : interest_keys) {
+      auto value = "{\"entryName\": \"" + key->toString() + "\"}";
+      region->put(key, value);
+    }
+  });
+
+  create_latch.wait();
+
+  producer_non_interest.join();
+  producer_interest.join();
+
+  gfsh.shutdown().execute();
+
+  shut_sem.acquire();
+  shut_sem.release();
+
+  for (auto& server : cluster.getServers()) {
+    server.start();
+  }
+
+  live_sem.acquire();
+  live_sem.release();
+  EXPECT_EQ(region->keys().size(), N_1);
+}
+
+TEST(RegisterKeysTest, RegisterKeySetAndDestroyClusterRestart) {
+  auto N_1 = 10U;
+  std::vector<std::shared_ptr<CacheableKey>> interest_keys{
+      CacheableKey::create("dolores-1"),
+      CacheableKey::create("maeve-1"),
+      CacheableKey::create("bernard-2"),
+      CacheableKey::create("theodore-3"),
+      CacheableKey::create("william-5"),
+      CacheableKey::create("clementine-8"),
+      CacheableKey::create("abernathy-13"),
+      CacheableKey::create("ford-21"),
+  };
+
+  auto N = N_1 + interest_keys.size();
+  binary_semaphore live_sem{0};
+  binary_semaphore shut_sem{1};
+  boost::latch create_latch{N};
+  Cluster cluster{LocatorCount{1}, ServerCount{1}};
+  cluster.start();
+
+  auto& gfsh = cluster.getGfsh();
+  gfsh.create().region().withName("region").withType("REPLICATE").execute();
+
+  auto cache = createTestCache();
   {
-    std::unique_lock<std::mutex> lock(cv_mutex);
-    EXPECT_TRUE(cv.wait_for(lock, std::chrono::minutes(1),
-                            [&destroyed] { return destroyed; }));
+    auto poolFactory =
+        cache.getPoolManager().createFactory().setSubscriptionEnabled(true);
+    cluster.applyLocators(poolFactory);
+    poolFactory.create("default");
   }
+
+  auto listener = std::make_shared<CacheListenerMock>();
+  EXPECT_CALL(*listener, afterRegionLive(_))
+      .WillRepeatedly(DoAll(ReleaseSem(&live_sem), AcquireSem(&shut_sem)));
+  EXPECT_CALL(*listener, afterRegionDisconnected(_))
+      .WillRepeatedly(DoAll(ReleaseSem(&shut_sem), AcquireSem(&live_sem)));
+  EXPECT_CALL(*listener, afterCreate(_))
+      .Times(N)
+      .WillRepeatedly(CountDownLatch(&create_latch));
+  EXPECT_CALL(*listener, afterDestroy(_)).Times(1).WillOnce(Return());
+
+  auto region = cache.createRegionFactory(RegionShortcut::CACHING_PROXY)
+                    .setPoolName("default")
+                    .setCacheListener(listener)
+                    .create("region");
+
+  region->registerKeys(interest_keys, false, true);
+  EXPECT_EQ(region->keys().size(), 0);
+
+  auto producer_non_interest = std::thread([&region, N_1] {
+    for (auto i = 0U; i < N_1;) {
+      auto key = "entry-" + std::to_string(i++);
+      auto value = "{\"entryName\": \"" + key + "\"}";
+      region->put(key, value);
+    }
+  });
+
+  auto producer_interest = std::thread([&region, &interest_keys] {
+    for (auto key : interest_keys) {
+      auto value = "{\"entryName\": \"" + key->toString() + "\"}";
+      region->put(key, value);
+    }
+  });
+
+  create_latch.wait();
+
+  producer_non_interest.join();
+  producer_interest.join();
+
+  region->remove(interest_keys[0]);
+  gfsh.shutdown().execute();
+
+  shut_sem.acquire();
+  shut_sem.release();
+
+  for (auto& server : cluster.getServers()) {
+    server.start();
+  }
+
+  live_sem.acquire();
+  live_sem.release();
+  EXPECT_EQ(region->keys().size(), N_1);
 }
 
 TEST(RegisterKeysTest, RegisterAnyWithCachingRegion) {
@@ -220,7 +520,7 @@ TEST(RegisterKeysTest, RegisterAnyWithCachingRegion) {
     cluster.applyLocators(poolFactory);
     poolFactory.create("default");
     auto region2 = setupCachingProxyRegion(cache2);
-    std::vector<std::shared_ptr<CacheableKey> > keys;
+    std::vector<std::shared_ptr<CacheableKey>> keys;
     keys.push_back(std::make_shared<CacheableString>("one"));
 
     auto&& entryBefore = region2->getEntry("one");
@@ -274,7 +574,7 @@ TEST(RegisterKeysTest, RegisterAnyWithProxyRegion) {
   cluster.applyLocators(poolFactory);
   poolFactory.create("default");
   auto region = setupProxyRegion(cache);
-  std::vector<std::shared_ptr<CacheableKey> > keys;
+  std::vector<std::shared_ptr<CacheableKey>> keys;
   keys.push_back(std::make_shared<CacheableInt16>(2));
 
   EXPECT_THROW(region->registerKeys(keys, false, true), IllegalStateException);
diff --git a/cppcache/src/ConcurrentEntriesMap.cpp b/cppcache/src/ConcurrentEntriesMap.cpp
index cea4e8b..a9b17f8 100644
--- a/cppcache/src/ConcurrentEntriesMap.cpp
+++ b/cppcache/src/ConcurrentEntriesMap.cpp
@@ -177,6 +177,8 @@ void ConcurrentEntriesMap::getValues(
   }
 }
 
+bool ConcurrentEntriesMap::empty() const { return m_size == 0; }
+
 uint32_t ConcurrentEntriesMap::size() const { return m_size; }
 
 int ConcurrentEntriesMap::addTrackerForEntry(
diff --git a/cppcache/src/ConcurrentEntriesMap.hpp b/cppcache/src/ConcurrentEntriesMap.hpp
index bcd0cd5..18f09dd 100644
--- a/cppcache/src/ConcurrentEntriesMap.hpp
+++ b/cppcache/src/ConcurrentEntriesMap.hpp
@@ -151,6 +151,11 @@ class ConcurrentEntriesMap : public EntriesMap {
       std::vector<std::shared_ptr<Cacheable>>& result) const override;
 
   /**
+   * @brief return whether there are no entries.
+   */
+  bool empty() const override;
+
+  /**
    * @brief return the number of entries in the map.
    */
   uint32_t size() const override;
diff --git a/cppcache/src/EntriesMap.hpp b/cppcache/src/EntriesMap.hpp
index b2cff83..32fc39d 100644
--- a/cppcache/src/EntriesMap.hpp
+++ b/cppcache/src/EntriesMap.hpp
@@ -126,6 +126,9 @@ class EntriesMap {
   virtual void getValues(
       std::vector<std::shared_ptr<Cacheable>>& result) const = 0;
 
+  /** @brief return whether there are no entryies. */
+  virtual bool empty() const = 0;
+
   /** @brief return the number of entries in the map. */
   virtual uint32_t size() const = 0;
 
diff --git a/cppcache/src/LocalRegion.cpp b/cppcache/src/LocalRegion.cpp
index 0b3503c..c4b89e2 100644
--- a/cppcache/src/LocalRegion.cpp
+++ b/cppcache/src/LocalRegion.cpp
@@ -17,6 +17,7 @@
 
 #include "LocalRegion.hpp"
 
+#include <regex>
 #include <vector>
 
 #include <geode/PoolManager.hpp>
@@ -29,6 +30,7 @@
 #include "EntriesMapFactory.hpp"
 #include "EntryExpiryTask.hpp"
 #include "ExpiryTaskManager.hpp"
+#include "InterestResultPolicy.hpp"
 #include "LRUEntriesMap.hpp"
 #include "RegionExpiryTask.hpp"
 #include "RegionGlobalLocks.hpp"
@@ -450,6 +452,13 @@ void LocalRegion::destroy(
   throwExceptionIfError("Region::destroy", err);
 }
 
+GfErrType LocalRegion::localDestroyNoCallbacks(
+    const std::shared_ptr<CacheableKey>& key) {
+  return destroyNoThrow(key, nullptr, -1,
+                        CacheEventFlags::LOCAL | CacheEventFlags::NOCALLBACKS,
+                        nullptr);
+}
+
 void LocalRegion::localDestroy(
     const std::shared_ptr<CacheableKey>& key,
     const std::shared_ptr<Serializable>& aCallbackArgument) {
@@ -1753,12 +1762,16 @@ GfErrType LocalRegion::updateNoThrow(
       m_entries->removeTrackerForEntry(key);
     }
   }
-  // invokeCacheListenerForEntryEvent method has the check that if oldValue
-  // is a CacheableToken then it sets it to nullptr; also determines if it
-  // should be AFTER_UPDATE or AFTER_CREATE depending on oldValue
-  err =
-      invokeCacheListenerForEntryEvent(key, oldValue, value, aCallbackArgument,
-                                       eventFlags, TAction::s_afterEventType);
+
+  if (!eventFlags.isNoCallbacks()) {
+    // invokeCacheListenerForEntryEvent method has the check that if oldValue
+    // is a CacheableToken then it sets it to nullptr; also determines if it
+    // should be AFTER_UPDATE or AFTER_CREATE depending on oldValue
+    err = invokeCacheListenerForEntryEvent(key, oldValue, value,
+                                           aCallbackArgument, eventFlags,
+                                           TAction::s_afterEventType);
+  }
+
   return err;
 }
 
@@ -3205,6 +3218,58 @@ void LocalRegion::acquireGlobals(bool) {}
 
 void LocalRegion::releaseGlobals(bool) {}
 
+void LocalRegion::clearKeysOfInterest(
+    const std::unordered_map<std::shared_ptr<CacheableKey>,
+                             InterestResultPolicy>& interest_list) {
+  if (m_entries->empty()) {
+    return;
+  }
+
+  for (const auto& kv : interest_list) {
+    auto err = localDestroyNoCallbacks(kv.first);
+    // It could happen that interest was registered for a key for which
+    // there is not an entry right now
+    if (err != GF_CACHE_ENTRY_NOT_FOUND) {
+      throwExceptionIfError("LocalRegion::clearKeysOfInterest", err);
+    }
+  }
+}
+
+void LocalRegion::clearKeysOfInterestRegex(const std::string& pattern) {
+  if (m_entries->empty()) {
+    return;
+  }
+
+  std::regex expression{pattern};
+  for (const auto& key : keys()) {
+    if (std::regex_search(key->toString(), expression)) {
+      auto err = localDestroyNoCallbacks(key);
+      if (err != GF_CACHE_ENTRY_NOT_FOUND) {
+        throwExceptionIfError("LocalRegion::clearKeysOfInterest", err);
+      }
+    }
+  }
+}
+
+void LocalRegion::clearKeysOfInterestRegex(
+    const std::unordered_map<std::string, InterestResultPolicy>&
+        interest_list) {
+  if (m_entries->empty()) {
+    return;
+  }
+
+  static const std::string ALL_KEYS_REGEX = ".*";
+  for (const auto& kv : interest_list) {
+    const auto& regex = kv.first;
+    if (regex == ALL_KEYS_REGEX) {
+      localClear();
+      break;
+    } else {
+      clearKeysOfInterestRegex(kv.first);
+    }
+  }
+}
+
 }  // namespace client
 }  // namespace geode
 }  // namespace apache
diff --git a/cppcache/src/LocalRegion.hpp b/cppcache/src/LocalRegion.hpp
index 9f37af8..09ac066 100644
--- a/cppcache/src/LocalRegion.hpp
+++ b/cppcache/src/LocalRegion.hpp
@@ -72,12 +72,13 @@ namespace client {
   } while (0)
 #endif
 
-class PutActions;
-class PutActionsTx;
 class CreateActions;
 class DestroyActions;
-class RemoveActions;
 class InvalidateActions;
+class InterestResultPolicy;
+class PutActions;
+class PutActionsTx;
+class RemoveActions;
 class VersionedCacheableObjectPartList;
 
 typedef std::unordered_map<std::shared_ptr<CacheableKey>,
@@ -198,6 +199,8 @@ class APACHE_GEODE_EXPORT LocalRegion : public RegionInternal {
   void localDestroy(const std::shared_ptr<CacheableKey>& key,
                     const std::shared_ptr<Serializable>& aCallbackArgument =
                         nullptr) override;
+  virtual GfErrType localDestroyNoCallbacks(
+      const std::shared_ptr<CacheableKey>& key);
   bool remove(const std::shared_ptr<CacheableKey>& key,
               const std::shared_ptr<Cacheable>& value,
               const std::shared_ptr<Serializable>& aCallbackArgument =
@@ -572,6 +575,14 @@ class APACHE_GEODE_EXPORT LocalRegion : public RegionInternal {
       std::shared_ptr<EventId> eventId, std::shared_ptr<Cacheable>& fullObject,
       std::shared_ptr<VersionTag>& versionTag);
 
+  void clearKeysOfInterest(
+      const std::unordered_map<std::shared_ptr<CacheableKey>,
+                               InterestResultPolicy>& interest_list);
+  void clearKeysOfInterestRegex(const std::string& regex);
+  void clearKeysOfInterestRegex(
+      const std::unordered_map<std::string, InterestResultPolicy>&
+          interest_list);
+
  private:
   std::shared_ptr<Region> findSubRegion(const std::string& name);
   GfErrType invalidateRegionNoThrowOnSubRegions(
diff --git a/cppcache/src/RegionInternal.cpp b/cppcache/src/RegionInternal.cpp
index ea01843..e3a541c 100644
--- a/cppcache/src/RegionInternal.cpp
+++ b/cppcache/src/RegionInternal.cpp
@@ -39,6 +39,8 @@ const CacheEventFlags CacheEventFlags::CACHE_CLOSE(
     CacheEventFlags::GF_CACHE_CLOSE);
 const CacheEventFlags CacheEventFlags::NOCACHEWRITER(
     CacheEventFlags::GF_NOCACHEWRITER);
+const CacheEventFlags CacheEventFlags::NOCALLBACKS(
+    CacheEventFlags::GF_NOCALLBACKS);
 
 RegionInternal::RegionInternal(CacheImpl* cacheImpl,
                                RegionAttributes attributes)
diff --git a/cppcache/src/RegionInternal.hpp b/cppcache/src/RegionInternal.hpp
index 65b0798..1e25a6f 100644
--- a/cppcache/src/RegionInternal.hpp
+++ b/cppcache/src/RegionInternal.hpp
@@ -45,17 +45,18 @@ namespace client {
  */
 class CacheEventFlags {
  private:
-  uint8_t m_flags;
-  static const uint8_t GF_NORMAL = 0x01;
-  static const uint8_t GF_LOCAL = 0x02;
-  static const uint8_t GF_NOTIFICATION = 0x04;
-  static const uint8_t GF_NOTIFICATION_UPDATE = 0x08;
-  static const uint8_t GF_EVICTION = 0x10;
-  static const uint8_t GF_EXPIRATION = 0x20;
-  static const uint8_t GF_CACHE_CLOSE = 0x40;
-  static const uint8_t GF_NOCACHEWRITER = 0x80;
-
-  inline explicit CacheEventFlags(const uint8_t flags) : m_flags(flags) {}
+  uint16_t m_flags;
+  static const uint16_t GF_NORMAL = 0x01;
+  static const uint16_t GF_LOCAL = 0x02;
+  static const uint16_t GF_NOTIFICATION = 0x04;
+  static const uint16_t GF_NOTIFICATION_UPDATE = 0x08;
+  static const uint16_t GF_EVICTION = 0x10;
+  static const uint16_t GF_EXPIRATION = 0x20;
+  static const uint16_t GF_CACHE_CLOSE = 0x40;
+  static const uint16_t GF_NOCACHEWRITER = 0x80;
+  static const uint16_t GF_NOCALLBACKS = 0x100;
+
+  inline explicit CacheEventFlags(const uint16_t flags) : m_flags(flags) {}
 
  public:
   static const CacheEventFlags NORMAL;
@@ -66,6 +67,7 @@ class CacheEventFlags {
   static const CacheEventFlags EXPIRATION;
   static const CacheEventFlags CACHE_CLOSE;
   static const CacheEventFlags NOCACHEWRITER;
+  static const CacheEventFlags NOCALLBACKS;
 
   inline CacheEventFlags(const CacheEventFlags& flags) = default;
 
@@ -84,46 +86,36 @@ class CacheEventFlags {
     return (m_flags == flags.m_flags);
   }
 
-  inline bool isNormal() const {
-    return (m_flags & GF_NORMAL) > 0 ? true : false;
-  }
+  inline bool isNormal() const { return (m_flags & GF_NORMAL) > 0; }
 
-  inline bool isLocal() const {
-    return (m_flags & GF_LOCAL) > 0 ? true : false;
-  }
+  inline bool isLocal() const { return (m_flags & GF_LOCAL) > 0; }
 
-  inline bool isNotification() const {
-    return (m_flags & GF_NOTIFICATION) > 0 ? true : false;
-  }
+  inline bool isNotification() const { return (m_flags & GF_NOTIFICATION) > 0; }
 
   inline bool isNotificationUpdate() const {
-    return (m_flags & GF_NOTIFICATION_UPDATE) > 0 ? true : false;
+    return (m_flags & GF_NOTIFICATION_UPDATE) > 0;
   }
 
-  inline bool isEviction() const {
-    return (m_flags & GF_EVICTION) > 0 ? true : false;
-  }
+  inline bool isEviction() const { return (m_flags & GF_EVICTION) > 0; }
 
-  inline bool isExpiration() const {
-    return (m_flags & GF_EXPIRATION) > 0 ? true : false;
-  }
+  inline bool isExpiration() const { return (m_flags & GF_EXPIRATION) > 0; }
 
-  inline bool isCacheClose() const {
-    return (m_flags & GF_CACHE_CLOSE) > 0 ? true : false;
-  }
+  inline bool isCacheClose() const { return (m_flags & GF_CACHE_CLOSE) > 0; }
 
   inline bool isNoCacheWriter() const {
-    return (m_flags & GF_NOCACHEWRITER) > 0 ? true : false;
+    return (m_flags & GF_NOCACHEWRITER) > 0;
   }
 
+  inline bool isNoCallbacks() const { return (m_flags & GF_NOCALLBACKS) > 0; }
+
   inline bool isEvictOrExpire() const {
-    return (m_flags & (GF_EVICTION | GF_EXPIRATION)) > 0 ? true : false;
+    return (m_flags & (GF_EVICTION | GF_EXPIRATION)) > 0;
   }
 
   // special optimized method for CacheWriter invocation condition
   inline bool invokeCacheWriter() const {
     return ((m_flags & (GF_NOTIFICATION | GF_EVICTION | GF_EXPIRATION |
-                        GF_NOCACHEWRITER)) == 0x0);
+                        GF_NOCACHEWRITER | GF_NOCALLBACKS)) == 0x0);
   }
 };
 
diff --git a/cppcache/src/ThinClientPoolHADM.cpp b/cppcache/src/ThinClientPoolHADM.cpp
index 1113b2a..e3d4b6a 100644
--- a/cppcache/src/ThinClientPoolHADM.cpp
+++ b/cppcache/src/ThinClientPoolHADM.cpp
@@ -295,6 +295,13 @@ void ThinClientPoolHADM::sendNotConnectedMessageToAllregions() {
   }
 }
 
+void ThinClientPoolHADM::clearKeysOfInterestAllRegions() {
+  std::lock_guard<decltype(regionsLock_)> guard(regionsLock_);
+  for (auto region : regions_) {
+    region->clearKeysOfInterest();
+  }
+}
+
 std::shared_ptr<TcrEndpoint> ThinClientPoolHADM::createEP(
     const char* endpointName) {
   return std::make_shared<TcrPoolEndPoint>(
diff --git a/cppcache/src/ThinClientPoolHADM.hpp b/cppcache/src/ThinClientPoolHADM.hpp
index 3bfb055..4fff940 100644
--- a/cppcache/src/ThinClientPoolHADM.hpp
+++ b/cppcache/src/ThinClientPoolHADM.hpp
@@ -121,6 +121,7 @@ class ThinClientPoolHADM : public ThinClientPoolDM {
   void removeRegion(ThinClientRegion* theTCR);
   void sendNotConnectedMessageToAllregions();
   void addDisconnectedMessageToQueue(ThinClientRegion* theTCR);
+  void clearKeysOfInterestAllRegions();
 
   friend class ThinClientHARegion;
   friend class TcrConnectionManager;
diff --git a/cppcache/src/ThinClientRedundancyManager.cpp b/cppcache/src/ThinClientRedundancyManager.cpp
index 7374b2a..fa1dcea 100644
--- a/cppcache/src/ThinClientRedundancyManager.cpp
+++ b/cppcache/src/ThinClientRedundancyManager.cpp
@@ -455,6 +455,7 @@ GfErrType ThinClientRedundancyManager::maintainRedundancyLevel(
     // that we can send it back to the caller, to avoid missing out due
     // to nonfatal errors such as server not available
     if (m_poolHADM && !m_allEndpointsDisconnected) {
+      m_poolHADM->clearKeysOfInterestAllRegions();
       m_poolHADM->sendNotConnectedMessageToAllregions();
       m_allEndpointsDisconnected = true;
     }
diff --git a/cppcache/src/ThinClientRegion.cpp b/cppcache/src/ThinClientRegion.cpp
index f190198..55940f1 100644
--- a/cppcache/src/ThinClientRegion.cpp
+++ b/cppcache/src/ThinClientRegion.cpp
@@ -1998,6 +1998,23 @@ GfErrType ThinClientRegion::registerStoredRegex(
   return retVal;
 }
 
+void ThinClientRegion::clearKeysOfInterest() {
+  if (!getAttributes().getCachingEnabled()) {
+    return;
+  }
+
+  clearKeysOfInterestRegex(m_interestListRegex);
+  clearKeysOfInterestRegex(m_interestListRegexForUpdatesAsInvalidates);
+  clearKeysOfInterestRegex(m_durableInterestListRegex);
+  clearKeysOfInterestRegex(m_durableInterestListRegexForUpdatesAsInvalidates);
+
+  LocalRegion::clearKeysOfInterest(m_interestList);
+  LocalRegion::clearKeysOfInterest(m_interestListForUpdatesAsInvalidates);
+  LocalRegion::clearKeysOfInterest(m_durableInterestList);
+  LocalRegion::clearKeysOfInterest(
+      m_durableInterestListForUpdatesAsInvalidates);
+}
+
 GfErrType ThinClientRegion::registerKeys(TcrEndpoint* endpoint,
                                          const TcrMessage* request,
                                          TcrMessageReply* reply) {
diff --git a/cppcache/src/ThinClientRegion.hpp b/cppcache/src/ThinClientRegion.hpp
index 59dd51b..d593ed4 100644
--- a/cppcache/src/ThinClientRegion.hpp
+++ b/cppcache/src/ThinClientRegion.hpp
@@ -191,6 +191,8 @@ class ThinClientRegion : public LocalRegion {
              const std::shared_ptr<Serializable>& callBack,
              std::shared_ptr<VersionTag> versionTag) override;
 
+  void clearKeysOfInterest();
+
  protected:
   GfErrType getNoThrow_remote(
       const std::shared_ptr<CacheableKey>& keyPtr,
diff --git a/cppcache/src/util/concurrent/binary_semaphore.hpp b/cppcache/src/util/concurrent/binary_semaphore.hpp
index 4fc4b44..3fb3526 100644
--- a/cppcache/src/util/concurrent/binary_semaphore.hpp
+++ b/cppcache/src/util/concurrent/binary_semaphore.hpp
@@ -23,10 +23,12 @@
 #include <condition_variable>
 #include <mutex>
 
+#include <geode/internal/geode_globals.hpp>
+
 namespace apache {
 namespace geode {
 namespace client {
-class binary_semaphore {
+class APACHE_GEODE_EXPORT binary_semaphore {
  public:
   explicit binary_semaphore(bool released);