You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bb...@apache.org on 2021/05/04 18:16:51 UTC
[geode-native] branch develop updated: GEODE-8991: Implement cached
region clean up (#757) (#799)
This is an automated email from the ASF dual-hosted git repository.
bbender pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git
The following commit(s) were added to refs/heads/develop by this push:
new b5cc772 GEODE-8991: Implement cached region clean up (#757) (#799)
b5cc772 is described below
commit b5cc77231ffef370a7df4acea6d3021ecbcabcf4
Author: Mario Salazar de Torres <ma...@est.tech>
AuthorDate: Tue May 4 20:16:43 2021 +0200
GEODE-8991: Implement cached region clean up (#757) (#799)
- Whenever subscription redundancy is lost cached regions are supposed
to be cleaned up. This commit implements interest recovery to achieve
precisely that.
- Whenever subscription redundancy is lost all entries with interest
registered are cleaned up.
- Implemented an optimization for regions which have interest
registered to all keys.
- ITs implemented to verify the functionality.
- Adapted RegisterAllWithConsistencyDisabled test to work with the
modified CacheListener mock.
- As noted for RegisterAllWithConsistencyDisabled case, a fail-safe
have been added to avoid a rare race condition.
- Fixed coredump when trying to clear interest registry keys for
regions with caching disabled.
- Fixed exception whenever cleaning up cached region which interest
list is a set of keys and some of them were already destroyed.
- Export binary_semaphore class to make it avaialable for ITs. In the
future it would be nice to have a library with all of the utilities
common to the apache-geode library and tests.
---
cppcache/integration/test/RegisterKeysTest.cpp | 338 ++++++++++++++++++++--
cppcache/src/ConcurrentEntriesMap.cpp | 2 +
cppcache/src/ConcurrentEntriesMap.hpp | 5 +
cppcache/src/EntriesMap.hpp | 3 +
cppcache/src/LocalRegion.cpp | 77 ++++-
cppcache/src/LocalRegion.hpp | 17 +-
cppcache/src/RegionInternal.cpp | 2 +
cppcache/src/RegionInternal.hpp | 58 ++--
cppcache/src/ThinClientPoolHADM.cpp | 7 +
cppcache/src/ThinClientPoolHADM.hpp | 1 +
cppcache/src/ThinClientRedundancyManager.cpp | 1 +
cppcache/src/ThinClientRegion.cpp | 17 ++
cppcache/src/ThinClientRegion.hpp | 2 +
cppcache/src/util/concurrent/binary_semaphore.hpp | 4 +-
14 files changed, 472 insertions(+), 62 deletions(-)
diff --git a/cppcache/integration/test/RegisterKeysTest.cpp b/cppcache/integration/test/RegisterKeysTest.cpp
index 2ce04a2..deae4c5 100644
--- a/cppcache/integration/test/RegisterKeysTest.cpp
+++ b/cppcache/integration/test/RegisterKeysTest.cpp
@@ -19,39 +19,44 @@
#include <condition_variable>
#include <mutex>
+#include <boost/thread/latch.hpp>
+
#include <gtest/gtest.h>
#include <geode/Cache.hpp>
#include <geode/CacheFactory.hpp>
#include <geode/EntryEvent.hpp>
+#include <geode/RegionEvent.hpp>
#include <geode/RegionFactory.hpp>
#include <geode/RegionShortcut.hpp>
#include "framework/Cluster.h"
#include "framework/Framework.h"
#include "framework/Gfsh.h"
-
-class CacheListenerMock : public apache::geode::client::CacheListener {
- public:
- MOCK_METHOD1(afterDestroy,
- void(const apache::geode::client::EntryEvent& event));
-};
+#include "mock/CacheListenerMock.hpp"
+#include "util/concurrent/binary_semaphore.hpp"
namespace {
+using apache::geode::client::binary_semaphore;
using apache::geode::client::Cache;
using apache::geode::client::CacheableInt16;
using apache::geode::client::CacheableKey;
using apache::geode::client::CacheableString;
using apache::geode::client::CacheFactory;
+using apache::geode::client::CacheListenerMock;
using apache::geode::client::IllegalStateException;
using apache::geode::client::Region;
using apache::geode::client::RegionShortcut;
+
using ::testing::_;
using ::testing::DoAll;
using ::testing::InvokeWithoutArgs;
+using ::testing::Return;
-ACTION_P(CvNotifyOne, cv) { cv->notify_one(); }
+ACTION_P(ReleaseSem, sem) { sem->release(); }
+ACTION_P(AcquireSem, sem) { sem->acquire(); }
+ACTION_P(CountDownLatch, latch) { latch->count_down(); }
Cache createTestCache() {
CacheFactory cacheFactory;
@@ -152,14 +157,13 @@ TEST(RegisterKeysTest, RegisterAllWithConsistencyDisabled) {
producer_region = setupProxyRegion(producer_cache);
}
- std::mutex cv_mutex;
- bool destroyed = false;
- std::condition_variable cv;
+ binary_semaphore sem{0};
auto listener = std::make_shared<CacheListenerMock>();
- EXPECT_CALL(*listener, afterDestroy(_))
- .Times(1)
- .WillOnce(DoAll(InvokeWithoutArgs([&destroyed] { destroyed = true; }),
- CvNotifyOne(&cv)));
+
+ EXPECT_CALL(*listener, afterCreate(_)).WillRepeatedly(Return());
+ EXPECT_CALL(*listener, afterRegionLive(_)).WillRepeatedly(Return());
+ EXPECT_CALL(*listener, afterRegionDisconnected(_)).WillRepeatedly(Return());
+ EXPECT_CALL(*listener, afterDestroy(_)).Times(1).WillOnce(ReleaseSem(&sem));
{
auto poolFactory =
@@ -179,11 +183,307 @@ TEST(RegisterKeysTest, RegisterAllWithConsistencyDisabled) {
producer_region->put("one", std::make_shared<CacheableInt16>(1));
producer_region->destroy("one");
+ EXPECT_TRUE(sem.try_acquire_for(std::chrono::minutes{1}));
+}
+
+TEST(RegisterKeysTest, RegisterAnyAndClusterRestart) {
+ auto N = 100U;
+ boost::latch create_latch{N};
+ binary_semaphore live_sem{0};
+ binary_semaphore shut_sem{1};
+
+ Cluster cluster{LocatorCount{1}, ServerCount{1}};
+ cluster.start();
+
+ auto& gfsh = cluster.getGfsh();
+ gfsh.create().region().withName("region").withType("REPLICATE").execute();
+
+ auto cache = createTestCache();
+ {
+ auto poolFactory =
+ cache.getPoolManager().createFactory().setSubscriptionEnabled(true);
+ cluster.applyLocators(poolFactory);
+ poolFactory.create("default");
+ }
+
+ auto listener = std::make_shared<CacheListenerMock>();
+ EXPECT_CALL(*listener, afterRegionLive(_))
+ .WillRepeatedly(DoAll(ReleaseSem(&live_sem), AcquireSem(&shut_sem)));
+ EXPECT_CALL(*listener, afterRegionDisconnected(_))
+ .WillRepeatedly(DoAll(ReleaseSem(&shut_sem), AcquireSem(&live_sem)));
+ EXPECT_CALL(*listener, afterCreate(_))
+ .Times(N)
+ .WillRepeatedly(CountDownLatch(&create_latch));
+
+ auto region = cache.createRegionFactory(RegionShortcut::CACHING_PROXY)
+ .setPoolName("default")
+ .setCacheListener(listener)
+ .create("region");
+ region->registerAllKeys(false, true);
+ EXPECT_EQ(region->keys().size(), 0);
+
+ auto producer = std::thread([®ion, N] {
+ for (auto i = 0U; i < N;) {
+ auto key = "entry-" + std::to_string(i++);
+ auto value = "{\"entryName\": \"" + key + "\"}";
+ region->put(key, value);
+ }
+ });
+
+ create_latch.wait();
+
+ producer.join();
+ gfsh.shutdown().execute();
+
+ shut_sem.acquire();
+ shut_sem.release();
+
+ for (auto& server : cluster.getServers()) {
+ server.start();
+ }
+
+ live_sem.acquire();
+ live_sem.release();
+ EXPECT_EQ(region->keys().size(), 0);
+}
+
+TEST(RegisterKeysTest, RegisterRegexAndClusterRestart) {
+ auto N_1 = 10U;
+ auto N_2 = 90U;
+ auto N = N_1 + N_2;
+ binary_semaphore live_sem{0};
+ binary_semaphore shut_sem{1};
+ boost::latch create_latch{N};
+
+ Cluster cluster{LocatorCount{1}, ServerCount{1}};
+ cluster.start();
+
+ auto& gfsh = cluster.getGfsh();
+ gfsh.create().region().withName("region").withType("REPLICATE").execute();
+
+ auto cache = createTestCache();
+ {
+ auto poolFactory =
+ cache.getPoolManager().createFactory().setSubscriptionEnabled(true);
+ cluster.applyLocators(poolFactory);
+ poolFactory.create("default");
+ }
+
+ auto listener = std::make_shared<CacheListenerMock>();
+ EXPECT_CALL(*listener, afterRegionLive(_))
+ .WillRepeatedly(DoAll(ReleaseSem(&live_sem), AcquireSem(&shut_sem)));
+ EXPECT_CALL(*listener, afterRegionDisconnected(_))
+ .WillRepeatedly(DoAll(ReleaseSem(&shut_sem), AcquireSem(&live_sem)));
+ EXPECT_CALL(*listener, afterCreate(_))
+ .Times(N)
+ .WillRepeatedly(CountDownLatch(&create_latch));
+
+ auto region = cache.createRegionFactory(RegionShortcut::CACHING_PROXY)
+ .setPoolName("default")
+ .setCacheListener(listener)
+ .create("region");
+ region->registerRegex("interest-.*", false, true);
+ EXPECT_EQ(region->keys().size(), 0);
+
+ auto producer_non_interest = std::thread([®ion, N_1] {
+ for (auto i = 0U; i < N_1;) {
+ auto key = "entry-" + std::to_string(i++);
+ auto value = "{\"entryName\": \"" + key + "\"}";
+ region->put(key, value);
+ }
+ });
+
+ auto producer_interest = std::thread([®ion, N_2] {
+ for (auto i = 0U; i < N_2;) {
+ auto key = "interest-" + std::to_string(i++);
+ auto value = "{\"entryName\": \"" + key + "\"}";
+ region->put(key, value);
+ }
+ });
+
+ create_latch.wait();
+
+ producer_non_interest.join();
+ producer_interest.join();
+
+ gfsh.shutdown().execute();
+
+ shut_sem.acquire();
+ shut_sem.release();
+
+ for (auto& server : cluster.getServers()) {
+ server.start();
+ }
+
+ live_sem.acquire();
+ live_sem.release();
+ EXPECT_EQ(region->keys().size(), N_1);
+}
+
+TEST(RegisterKeysTest, RegisterKeySetAndClusterRestart) {
+ auto N_1 = 10U;
+ std::vector<std::shared_ptr<CacheableKey>> interest_keys{
+ CacheableKey::create("dolores-1"),
+ CacheableKey::create("maeve-1"),
+ CacheableKey::create("bernard-2"),
+ CacheableKey::create("theodore-3"),
+ CacheableKey::create("william-5"),
+ CacheableKey::create("clementine-8"),
+ CacheableKey::create("abernathy-13"),
+ CacheableKey::create("ford-21"),
+ };
+
+ auto N = N_1 + interest_keys.size();
+ binary_semaphore live_sem{0};
+ binary_semaphore shut_sem{1};
+ boost::latch create_latch{N};
+ Cluster cluster{LocatorCount{1}, ServerCount{1}};
+ cluster.start();
+
+ auto& gfsh = cluster.getGfsh();
+ gfsh.create().region().withName("region").withType("REPLICATE").execute();
+
+ auto cache = createTestCache();
+ {
+ auto poolFactory =
+ cache.getPoolManager().createFactory().setSubscriptionEnabled(true);
+ cluster.applyLocators(poolFactory);
+ poolFactory.create("default");
+ }
+
+ auto listener = std::make_shared<CacheListenerMock>();
+ EXPECT_CALL(*listener, afterRegionLive(_))
+ .WillRepeatedly(DoAll(ReleaseSem(&live_sem), AcquireSem(&shut_sem)));
+ EXPECT_CALL(*listener, afterRegionDisconnected(_))
+ .WillRepeatedly(DoAll(ReleaseSem(&shut_sem), AcquireSem(&live_sem)));
+ EXPECT_CALL(*listener, afterCreate(_))
+ .Times(N)
+ .WillRepeatedly(CountDownLatch(&create_latch));
+
+ auto region = cache.createRegionFactory(RegionShortcut::CACHING_PROXY)
+ .setPoolName("default")
+ .setCacheListener(listener)
+ .create("region");
+
+ region->registerKeys(interest_keys, false, true);
+ EXPECT_EQ(region->keys().size(), 0);
+
+ auto producer_non_interest = std::thread([®ion, N_1] {
+ for (auto i = 0U; i < N_1;) {
+ auto key = "entry-" + std::to_string(i++);
+ auto value = "{\"entryName\": \"" + key + "\"}";
+ region->put(key, value);
+ }
+ });
+
+ auto producer_interest = std::thread([®ion, &interest_keys] {
+ for (auto key : interest_keys) {
+ auto value = "{\"entryName\": \"" + key->toString() + "\"}";
+ region->put(key, value);
+ }
+ });
+
+ create_latch.wait();
+
+ producer_non_interest.join();
+ producer_interest.join();
+
+ gfsh.shutdown().execute();
+
+ shut_sem.acquire();
+ shut_sem.release();
+
+ for (auto& server : cluster.getServers()) {
+ server.start();
+ }
+
+ live_sem.acquire();
+ live_sem.release();
+ EXPECT_EQ(region->keys().size(), N_1);
+}
+
+TEST(RegisterKeysTest, RegisterKeySetAndDestroyClusterRestart) {
+ auto N_1 = 10U;
+ std::vector<std::shared_ptr<CacheableKey>> interest_keys{
+ CacheableKey::create("dolores-1"),
+ CacheableKey::create("maeve-1"),
+ CacheableKey::create("bernard-2"),
+ CacheableKey::create("theodore-3"),
+ CacheableKey::create("william-5"),
+ CacheableKey::create("clementine-8"),
+ CacheableKey::create("abernathy-13"),
+ CacheableKey::create("ford-21"),
+ };
+
+ auto N = N_1 + interest_keys.size();
+ binary_semaphore live_sem{0};
+ binary_semaphore shut_sem{1};
+ boost::latch create_latch{N};
+ Cluster cluster{LocatorCount{1}, ServerCount{1}};
+ cluster.start();
+
+ auto& gfsh = cluster.getGfsh();
+ gfsh.create().region().withName("region").withType("REPLICATE").execute();
+
+ auto cache = createTestCache();
{
- std::unique_lock<std::mutex> lock(cv_mutex);
- EXPECT_TRUE(cv.wait_for(lock, std::chrono::minutes(1),
- [&destroyed] { return destroyed; }));
+ auto poolFactory =
+ cache.getPoolManager().createFactory().setSubscriptionEnabled(true);
+ cluster.applyLocators(poolFactory);
+ poolFactory.create("default");
}
+
+ auto listener = std::make_shared<CacheListenerMock>();
+ EXPECT_CALL(*listener, afterRegionLive(_))
+ .WillRepeatedly(DoAll(ReleaseSem(&live_sem), AcquireSem(&shut_sem)));
+ EXPECT_CALL(*listener, afterRegionDisconnected(_))
+ .WillRepeatedly(DoAll(ReleaseSem(&shut_sem), AcquireSem(&live_sem)));
+ EXPECT_CALL(*listener, afterCreate(_))
+ .Times(N)
+ .WillRepeatedly(CountDownLatch(&create_latch));
+ EXPECT_CALL(*listener, afterDestroy(_)).Times(1).WillOnce(Return());
+
+ auto region = cache.createRegionFactory(RegionShortcut::CACHING_PROXY)
+ .setPoolName("default")
+ .setCacheListener(listener)
+ .create("region");
+
+ region->registerKeys(interest_keys, false, true);
+ EXPECT_EQ(region->keys().size(), 0);
+
+ auto producer_non_interest = std::thread([®ion, N_1] {
+ for (auto i = 0U; i < N_1;) {
+ auto key = "entry-" + std::to_string(i++);
+ auto value = "{\"entryName\": \"" + key + "\"}";
+ region->put(key, value);
+ }
+ });
+
+ auto producer_interest = std::thread([®ion, &interest_keys] {
+ for (auto key : interest_keys) {
+ auto value = "{\"entryName\": \"" + key->toString() + "\"}";
+ region->put(key, value);
+ }
+ });
+
+ create_latch.wait();
+
+ producer_non_interest.join();
+ producer_interest.join();
+
+ region->remove(interest_keys[0]);
+ gfsh.shutdown().execute();
+
+ shut_sem.acquire();
+ shut_sem.release();
+
+ for (auto& server : cluster.getServers()) {
+ server.start();
+ }
+
+ live_sem.acquire();
+ live_sem.release();
+ EXPECT_EQ(region->keys().size(), N_1);
}
TEST(RegisterKeysTest, RegisterAnyWithCachingRegion) {
@@ -220,7 +520,7 @@ TEST(RegisterKeysTest, RegisterAnyWithCachingRegion) {
cluster.applyLocators(poolFactory);
poolFactory.create("default");
auto region2 = setupCachingProxyRegion(cache2);
- std::vector<std::shared_ptr<CacheableKey> > keys;
+ std::vector<std::shared_ptr<CacheableKey>> keys;
keys.push_back(std::make_shared<CacheableString>("one"));
auto&& entryBefore = region2->getEntry("one");
@@ -274,7 +574,7 @@ TEST(RegisterKeysTest, RegisterAnyWithProxyRegion) {
cluster.applyLocators(poolFactory);
poolFactory.create("default");
auto region = setupProxyRegion(cache);
- std::vector<std::shared_ptr<CacheableKey> > keys;
+ std::vector<std::shared_ptr<CacheableKey>> keys;
keys.push_back(std::make_shared<CacheableInt16>(2));
EXPECT_THROW(region->registerKeys(keys, false, true), IllegalStateException);
diff --git a/cppcache/src/ConcurrentEntriesMap.cpp b/cppcache/src/ConcurrentEntriesMap.cpp
index cea4e8b..a9b17f8 100644
--- a/cppcache/src/ConcurrentEntriesMap.cpp
+++ b/cppcache/src/ConcurrentEntriesMap.cpp
@@ -177,6 +177,8 @@ void ConcurrentEntriesMap::getValues(
}
}
+bool ConcurrentEntriesMap::empty() const { return m_size == 0; }
+
uint32_t ConcurrentEntriesMap::size() const { return m_size; }
int ConcurrentEntriesMap::addTrackerForEntry(
diff --git a/cppcache/src/ConcurrentEntriesMap.hpp b/cppcache/src/ConcurrentEntriesMap.hpp
index bcd0cd5..18f09dd 100644
--- a/cppcache/src/ConcurrentEntriesMap.hpp
+++ b/cppcache/src/ConcurrentEntriesMap.hpp
@@ -151,6 +151,11 @@ class ConcurrentEntriesMap : public EntriesMap {
std::vector<std::shared_ptr<Cacheable>>& result) const override;
/**
+ * @brief return whether there are no entries.
+ */
+ bool empty() const override;
+
+ /**
* @brief return the number of entries in the map.
*/
uint32_t size() const override;
diff --git a/cppcache/src/EntriesMap.hpp b/cppcache/src/EntriesMap.hpp
index b2cff83..32fc39d 100644
--- a/cppcache/src/EntriesMap.hpp
+++ b/cppcache/src/EntriesMap.hpp
@@ -126,6 +126,9 @@ class EntriesMap {
virtual void getValues(
std::vector<std::shared_ptr<Cacheable>>& result) const = 0;
+ /** @brief return whether there are no entryies. */
+ virtual bool empty() const = 0;
+
/** @brief return the number of entries in the map. */
virtual uint32_t size() const = 0;
diff --git a/cppcache/src/LocalRegion.cpp b/cppcache/src/LocalRegion.cpp
index 0b3503c..c4b89e2 100644
--- a/cppcache/src/LocalRegion.cpp
+++ b/cppcache/src/LocalRegion.cpp
@@ -17,6 +17,7 @@
#include "LocalRegion.hpp"
+#include <regex>
#include <vector>
#include <geode/PoolManager.hpp>
@@ -29,6 +30,7 @@
#include "EntriesMapFactory.hpp"
#include "EntryExpiryTask.hpp"
#include "ExpiryTaskManager.hpp"
+#include "InterestResultPolicy.hpp"
#include "LRUEntriesMap.hpp"
#include "RegionExpiryTask.hpp"
#include "RegionGlobalLocks.hpp"
@@ -450,6 +452,13 @@ void LocalRegion::destroy(
throwExceptionIfError("Region::destroy", err);
}
+GfErrType LocalRegion::localDestroyNoCallbacks(
+ const std::shared_ptr<CacheableKey>& key) {
+ return destroyNoThrow(key, nullptr, -1,
+ CacheEventFlags::LOCAL | CacheEventFlags::NOCALLBACKS,
+ nullptr);
+}
+
void LocalRegion::localDestroy(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Serializable>& aCallbackArgument) {
@@ -1753,12 +1762,16 @@ GfErrType LocalRegion::updateNoThrow(
m_entries->removeTrackerForEntry(key);
}
}
- // invokeCacheListenerForEntryEvent method has the check that if oldValue
- // is a CacheableToken then it sets it to nullptr; also determines if it
- // should be AFTER_UPDATE or AFTER_CREATE depending on oldValue
- err =
- invokeCacheListenerForEntryEvent(key, oldValue, value, aCallbackArgument,
- eventFlags, TAction::s_afterEventType);
+
+ if (!eventFlags.isNoCallbacks()) {
+ // invokeCacheListenerForEntryEvent method has the check that if oldValue
+ // is a CacheableToken then it sets it to nullptr; also determines if it
+ // should be AFTER_UPDATE or AFTER_CREATE depending on oldValue
+ err = invokeCacheListenerForEntryEvent(key, oldValue, value,
+ aCallbackArgument, eventFlags,
+ TAction::s_afterEventType);
+ }
+
return err;
}
@@ -3205,6 +3218,58 @@ void LocalRegion::acquireGlobals(bool) {}
void LocalRegion::releaseGlobals(bool) {}
+void LocalRegion::clearKeysOfInterest(
+ const std::unordered_map<std::shared_ptr<CacheableKey>,
+ InterestResultPolicy>& interest_list) {
+ if (m_entries->empty()) {
+ return;
+ }
+
+ for (const auto& kv : interest_list) {
+ auto err = localDestroyNoCallbacks(kv.first);
+ // It could happen that interest was registered for a key for which
+ // there is not an entry right now
+ if (err != GF_CACHE_ENTRY_NOT_FOUND) {
+ throwExceptionIfError("LocalRegion::clearKeysOfInterest", err);
+ }
+ }
+}
+
+void LocalRegion::clearKeysOfInterestRegex(const std::string& pattern) {
+ if (m_entries->empty()) {
+ return;
+ }
+
+ std::regex expression{pattern};
+ for (const auto& key : keys()) {
+ if (std::regex_search(key->toString(), expression)) {
+ auto err = localDestroyNoCallbacks(key);
+ if (err != GF_CACHE_ENTRY_NOT_FOUND) {
+ throwExceptionIfError("LocalRegion::clearKeysOfInterest", err);
+ }
+ }
+ }
+}
+
+void LocalRegion::clearKeysOfInterestRegex(
+ const std::unordered_map<std::string, InterestResultPolicy>&
+ interest_list) {
+ if (m_entries->empty()) {
+ return;
+ }
+
+ static const std::string ALL_KEYS_REGEX = ".*";
+ for (const auto& kv : interest_list) {
+ const auto& regex = kv.first;
+ if (regex == ALL_KEYS_REGEX) {
+ localClear();
+ break;
+ } else {
+ clearKeysOfInterestRegex(kv.first);
+ }
+ }
+}
+
} // namespace client
} // namespace geode
} // namespace apache
diff --git a/cppcache/src/LocalRegion.hpp b/cppcache/src/LocalRegion.hpp
index 9f37af8..09ac066 100644
--- a/cppcache/src/LocalRegion.hpp
+++ b/cppcache/src/LocalRegion.hpp
@@ -72,12 +72,13 @@ namespace client {
} while (0)
#endif
-class PutActions;
-class PutActionsTx;
class CreateActions;
class DestroyActions;
-class RemoveActions;
class InvalidateActions;
+class InterestResultPolicy;
+class PutActions;
+class PutActionsTx;
+class RemoveActions;
class VersionedCacheableObjectPartList;
typedef std::unordered_map<std::shared_ptr<CacheableKey>,
@@ -198,6 +199,8 @@ class APACHE_GEODE_EXPORT LocalRegion : public RegionInternal {
void localDestroy(const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Serializable>& aCallbackArgument =
nullptr) override;
+ virtual GfErrType localDestroyNoCallbacks(
+ const std::shared_ptr<CacheableKey>& key);
bool remove(const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
const std::shared_ptr<Serializable>& aCallbackArgument =
@@ -572,6 +575,14 @@ class APACHE_GEODE_EXPORT LocalRegion : public RegionInternal {
std::shared_ptr<EventId> eventId, std::shared_ptr<Cacheable>& fullObject,
std::shared_ptr<VersionTag>& versionTag);
+ void clearKeysOfInterest(
+ const std::unordered_map<std::shared_ptr<CacheableKey>,
+ InterestResultPolicy>& interest_list);
+ void clearKeysOfInterestRegex(const std::string& regex);
+ void clearKeysOfInterestRegex(
+ const std::unordered_map<std::string, InterestResultPolicy>&
+ interest_list);
+
private:
std::shared_ptr<Region> findSubRegion(const std::string& name);
GfErrType invalidateRegionNoThrowOnSubRegions(
diff --git a/cppcache/src/RegionInternal.cpp b/cppcache/src/RegionInternal.cpp
index ea01843..e3a541c 100644
--- a/cppcache/src/RegionInternal.cpp
+++ b/cppcache/src/RegionInternal.cpp
@@ -39,6 +39,8 @@ const CacheEventFlags CacheEventFlags::CACHE_CLOSE(
CacheEventFlags::GF_CACHE_CLOSE);
const CacheEventFlags CacheEventFlags::NOCACHEWRITER(
CacheEventFlags::GF_NOCACHEWRITER);
+const CacheEventFlags CacheEventFlags::NOCALLBACKS(
+ CacheEventFlags::GF_NOCALLBACKS);
RegionInternal::RegionInternal(CacheImpl* cacheImpl,
RegionAttributes attributes)
diff --git a/cppcache/src/RegionInternal.hpp b/cppcache/src/RegionInternal.hpp
index 65b0798..1e25a6f 100644
--- a/cppcache/src/RegionInternal.hpp
+++ b/cppcache/src/RegionInternal.hpp
@@ -45,17 +45,18 @@ namespace client {
*/
class CacheEventFlags {
private:
- uint8_t m_flags;
- static const uint8_t GF_NORMAL = 0x01;
- static const uint8_t GF_LOCAL = 0x02;
- static const uint8_t GF_NOTIFICATION = 0x04;
- static const uint8_t GF_NOTIFICATION_UPDATE = 0x08;
- static const uint8_t GF_EVICTION = 0x10;
- static const uint8_t GF_EXPIRATION = 0x20;
- static const uint8_t GF_CACHE_CLOSE = 0x40;
- static const uint8_t GF_NOCACHEWRITER = 0x80;
-
- inline explicit CacheEventFlags(const uint8_t flags) : m_flags(flags) {}
+ uint16_t m_flags;
+ static const uint16_t GF_NORMAL = 0x01;
+ static const uint16_t GF_LOCAL = 0x02;
+ static const uint16_t GF_NOTIFICATION = 0x04;
+ static const uint16_t GF_NOTIFICATION_UPDATE = 0x08;
+ static const uint16_t GF_EVICTION = 0x10;
+ static const uint16_t GF_EXPIRATION = 0x20;
+ static const uint16_t GF_CACHE_CLOSE = 0x40;
+ static const uint16_t GF_NOCACHEWRITER = 0x80;
+ static const uint16_t GF_NOCALLBACKS = 0x100;
+
+ inline explicit CacheEventFlags(const uint16_t flags) : m_flags(flags) {}
public:
static const CacheEventFlags NORMAL;
@@ -66,6 +67,7 @@ class CacheEventFlags {
static const CacheEventFlags EXPIRATION;
static const CacheEventFlags CACHE_CLOSE;
static const CacheEventFlags NOCACHEWRITER;
+ static const CacheEventFlags NOCALLBACKS;
inline CacheEventFlags(const CacheEventFlags& flags) = default;
@@ -84,46 +86,36 @@ class CacheEventFlags {
return (m_flags == flags.m_flags);
}
- inline bool isNormal() const {
- return (m_flags & GF_NORMAL) > 0 ? true : false;
- }
+ inline bool isNormal() const { return (m_flags & GF_NORMAL) > 0; }
- inline bool isLocal() const {
- return (m_flags & GF_LOCAL) > 0 ? true : false;
- }
+ inline bool isLocal() const { return (m_flags & GF_LOCAL) > 0; }
- inline bool isNotification() const {
- return (m_flags & GF_NOTIFICATION) > 0 ? true : false;
- }
+ inline bool isNotification() const { return (m_flags & GF_NOTIFICATION) > 0; }
inline bool isNotificationUpdate() const {
- return (m_flags & GF_NOTIFICATION_UPDATE) > 0 ? true : false;
+ return (m_flags & GF_NOTIFICATION_UPDATE) > 0;
}
- inline bool isEviction() const {
- return (m_flags & GF_EVICTION) > 0 ? true : false;
- }
+ inline bool isEviction() const { return (m_flags & GF_EVICTION) > 0; }
- inline bool isExpiration() const {
- return (m_flags & GF_EXPIRATION) > 0 ? true : false;
- }
+ inline bool isExpiration() const { return (m_flags & GF_EXPIRATION) > 0; }
- inline bool isCacheClose() const {
- return (m_flags & GF_CACHE_CLOSE) > 0 ? true : false;
- }
+ inline bool isCacheClose() const { return (m_flags & GF_CACHE_CLOSE) > 0; }
inline bool isNoCacheWriter() const {
- return (m_flags & GF_NOCACHEWRITER) > 0 ? true : false;
+ return (m_flags & GF_NOCACHEWRITER) > 0;
}
+ inline bool isNoCallbacks() const { return (m_flags & GF_NOCALLBACKS) > 0; }
+
inline bool isEvictOrExpire() const {
- return (m_flags & (GF_EVICTION | GF_EXPIRATION)) > 0 ? true : false;
+ return (m_flags & (GF_EVICTION | GF_EXPIRATION)) > 0;
}
// special optimized method for CacheWriter invocation condition
inline bool invokeCacheWriter() const {
return ((m_flags & (GF_NOTIFICATION | GF_EVICTION | GF_EXPIRATION |
- GF_NOCACHEWRITER)) == 0x0);
+ GF_NOCACHEWRITER | GF_NOCALLBACKS)) == 0x0);
}
};
diff --git a/cppcache/src/ThinClientPoolHADM.cpp b/cppcache/src/ThinClientPoolHADM.cpp
index 1113b2a..e3d4b6a 100644
--- a/cppcache/src/ThinClientPoolHADM.cpp
+++ b/cppcache/src/ThinClientPoolHADM.cpp
@@ -295,6 +295,13 @@ void ThinClientPoolHADM::sendNotConnectedMessageToAllregions() {
}
}
+void ThinClientPoolHADM::clearKeysOfInterestAllRegions() {
+ std::lock_guard<decltype(regionsLock_)> guard(regionsLock_);
+ for (auto region : regions_) {
+ region->clearKeysOfInterest();
+ }
+}
+
std::shared_ptr<TcrEndpoint> ThinClientPoolHADM::createEP(
const char* endpointName) {
return std::make_shared<TcrPoolEndPoint>(
diff --git a/cppcache/src/ThinClientPoolHADM.hpp b/cppcache/src/ThinClientPoolHADM.hpp
index 3bfb055..4fff940 100644
--- a/cppcache/src/ThinClientPoolHADM.hpp
+++ b/cppcache/src/ThinClientPoolHADM.hpp
@@ -121,6 +121,7 @@ class ThinClientPoolHADM : public ThinClientPoolDM {
void removeRegion(ThinClientRegion* theTCR);
void sendNotConnectedMessageToAllregions();
void addDisconnectedMessageToQueue(ThinClientRegion* theTCR);
+ void clearKeysOfInterestAllRegions();
friend class ThinClientHARegion;
friend class TcrConnectionManager;
diff --git a/cppcache/src/ThinClientRedundancyManager.cpp b/cppcache/src/ThinClientRedundancyManager.cpp
index 7374b2a..fa1dcea 100644
--- a/cppcache/src/ThinClientRedundancyManager.cpp
+++ b/cppcache/src/ThinClientRedundancyManager.cpp
@@ -455,6 +455,7 @@ GfErrType ThinClientRedundancyManager::maintainRedundancyLevel(
// that we can send it back to the caller, to avoid missing out due
// to nonfatal errors such as server not available
if (m_poolHADM && !m_allEndpointsDisconnected) {
+ m_poolHADM->clearKeysOfInterestAllRegions();
m_poolHADM->sendNotConnectedMessageToAllregions();
m_allEndpointsDisconnected = true;
}
diff --git a/cppcache/src/ThinClientRegion.cpp b/cppcache/src/ThinClientRegion.cpp
index f190198..55940f1 100644
--- a/cppcache/src/ThinClientRegion.cpp
+++ b/cppcache/src/ThinClientRegion.cpp
@@ -1998,6 +1998,23 @@ GfErrType ThinClientRegion::registerStoredRegex(
return retVal;
}
+void ThinClientRegion::clearKeysOfInterest() {
+ if (!getAttributes().getCachingEnabled()) {
+ return;
+ }
+
+ clearKeysOfInterestRegex(m_interestListRegex);
+ clearKeysOfInterestRegex(m_interestListRegexForUpdatesAsInvalidates);
+ clearKeysOfInterestRegex(m_durableInterestListRegex);
+ clearKeysOfInterestRegex(m_durableInterestListRegexForUpdatesAsInvalidates);
+
+ LocalRegion::clearKeysOfInterest(m_interestList);
+ LocalRegion::clearKeysOfInterest(m_interestListForUpdatesAsInvalidates);
+ LocalRegion::clearKeysOfInterest(m_durableInterestList);
+ LocalRegion::clearKeysOfInterest(
+ m_durableInterestListForUpdatesAsInvalidates);
+}
+
GfErrType ThinClientRegion::registerKeys(TcrEndpoint* endpoint,
const TcrMessage* request,
TcrMessageReply* reply) {
diff --git a/cppcache/src/ThinClientRegion.hpp b/cppcache/src/ThinClientRegion.hpp
index 59dd51b..d593ed4 100644
--- a/cppcache/src/ThinClientRegion.hpp
+++ b/cppcache/src/ThinClientRegion.hpp
@@ -191,6 +191,8 @@ class ThinClientRegion : public LocalRegion {
const std::shared_ptr<Serializable>& callBack,
std::shared_ptr<VersionTag> versionTag) override;
+ void clearKeysOfInterest();
+
protected:
GfErrType getNoThrow_remote(
const std::shared_ptr<CacheableKey>& keyPtr,
diff --git a/cppcache/src/util/concurrent/binary_semaphore.hpp b/cppcache/src/util/concurrent/binary_semaphore.hpp
index 4fc4b44..3fb3526 100644
--- a/cppcache/src/util/concurrent/binary_semaphore.hpp
+++ b/cppcache/src/util/concurrent/binary_semaphore.hpp
@@ -23,10 +23,12 @@
#include <condition_variable>
#include <mutex>
+#include <geode/internal/geode_globals.hpp>
+
namespace apache {
namespace geode {
namespace client {
-class binary_semaphore {
+class APACHE_GEODE_EXPORT binary_semaphore {
public:
explicit binary_semaphore(bool released);