You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2020/07/09 18:59:49 UTC
[geode] 04/13: GEODE-7912: cacheWriter should be triggered when
PR.clear (#4882)
This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 2840a19ce2fc35796a3197f215bee7b62da36c2f
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Mon Mar 30 19:34:35 2020 -0700
GEODE-7912: cacheWriter should be triggered when PR.clear (#4882)
Co-authored-by: Anil <ag...@pivotal.io>
Co-authored-by: Xiaojian Zhou <gz...@pivotal.io>
---
.../cache/PartitionedRegionClearDUnitTest.java | 228 +++++++++++++++++----
.../apache/geode/internal/cache/LocalRegion.java | 4 +-
.../geode/internal/cache/PartitionedRegion.java | 56 +++--
3 files changed, 223 insertions(+), 65 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
index fb2a81b..a5a22b9 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
@@ -20,6 +20,7 @@ import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCach
import static org.assertj.core.api.Assertions.assertThat;
import java.io.Serializable;
+import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
@@ -30,13 +31,15 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.client.ClientRegionShortcut;
-import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.cache.util.CacheWriterAdapter;
import org.apache.geode.test.dunit.SerializableCallableIF;
import org.apache.geode.test.dunit.rules.ClientVM;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
@@ -68,12 +71,6 @@ public class PartitionedRegionClearDUnitTest implements Serializable {
c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
client2 = cluster.startClientVM(6,
c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
- dataStore1.invoke(this::initDataStore);
- dataStore2.invoke(this::initDataStore);
- dataStore3.invoke(this::initDataStore);
- accessor.invoke(this::initAccessor);
- client1.invoke(this::initClientCache);
- client2.invoke(this::initClientCache);
}
protected RegionShortcut getRegionShortCut() {
@@ -104,14 +101,18 @@ public class PartitionedRegionClearDUnitTest implements Serializable {
region.registerInterestForAllKeys(InterestResultPolicy.KEYS);
}
- private void initDataStore() {
- getCache().createRegionFactory(getRegionShortCut())
- .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create())
- .addCacheListener(new CountingCacheListener())
- .create(REGION_NAME);
+ private void initDataStore(boolean withWriter) {
+ RegionFactory factory = getCache().createRegionFactory(getRegionShortCut())
+ .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create());
+ if (withWriter) {
+ factory.setCacheWriter(new CountingCacheWriter());
+ }
+ factory.create(REGION_NAME);
+ clearsByRegion = new HashMap<>();
+ destroysByRegion = new HashMap<>();
}
- private void initAccessor() {
+ private void initAccessor(boolean withWriter) {
RegionShortcut shortcut = getRegionShortCut();
if (shortcut.isPersistent()) {
if (shortcut == RegionShortcut.PARTITION_PERSISTENT) {
@@ -126,12 +127,16 @@ public class PartitionedRegionClearDUnitTest implements Serializable {
fail("Wrong region type:" + shortcut);
}
}
- getCache().createRegionFactory(shortcut)
+ RegionFactory factory = getCache().createRegionFactory(shortcut)
.setPartitionAttributes(
new PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create())
- .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create())
- .addCacheListener(new CountingCacheListener())
- .create(REGION_NAME);
+ .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create());
+ if (withWriter) {
+ factory.setCacheWriter(new CountingCacheWriter());
+ }
+ factory.create(REGION_NAME);
+ clearsByRegion = new HashMap<>();
+ destroysByRegion = new HashMap<>();
}
private void feed(boolean isClient) {
@@ -152,45 +157,148 @@ public class PartitionedRegionClearDUnitTest implements Serializable {
// client2.invoke(()->verifyRegionSize(true, expectedNum));
}
- private void verifyCacheListenerTriggerCount(MemberVM serverVM) {
- SerializableCallableIF<Integer> getListenerTriggerCount = () -> {
- CountingCacheListener countingCacheListener =
- (CountingCacheListener) getRegion(false).getAttributes()
- .getCacheListeners()[0];
- return countingCacheListener.getClears();
- };
+ SerializableCallableIF<Integer> getWriterClears = () -> {
+ int clears =
+ clearsByRegion.get(REGION_NAME) == null ? 0 : clearsByRegion.get(REGION_NAME).get();
+ return clears;
+ };
- int count = accessor.invoke(getListenerTriggerCount)
- + dataStore1.invoke(getListenerTriggerCount)
- + dataStore2.invoke(getListenerTriggerCount)
- + dataStore3.invoke(getListenerTriggerCount);
- assertThat(count).isEqualTo(1);
+ SerializableCallableIF<Integer> getWriterDestroys = () -> {
+ int destroys =
+ destroysByRegion.get(REGION_NAME) == null ? 0 : destroysByRegion.get(REGION_NAME).get();
+ return destroys;
+ };
- if (serverVM != null) {
- assertThat(serverVM.invoke(getListenerTriggerCount)).isEqualTo(1);
- }
+ void configureServers(boolean dataStoreWithWriter, boolean accessorWithWriter) {
+ dataStore1.invoke(() -> initDataStore(dataStoreWithWriter));
+ dataStore2.invoke(() -> initDataStore(dataStoreWithWriter));
+ dataStore3.invoke(() -> initDataStore(dataStoreWithWriter));
+ accessor.invoke(() -> initAccessor(accessorWithWriter));
+ // make sure only datastore3 has cacheWriter
+ dataStore1.invoke(() -> {
+ Region region = getRegion(false);
+ region.getAttributesMutator().setCacheWriter(null);
+ });
+ dataStore2.invoke(() -> {
+ Region region = getRegion(false);
+ region.getAttributesMutator().setCacheWriter(null);
+ });
+ }
+
+ @Test
+ public void normalClearFromDataStoreWithWriterOnDataStore() {
+ configureServers(true, true);
+ client1.invoke(this::initClientCache);
+ client2.invoke(this::initClientCache);
+
+ accessor.invoke(() -> feed(false));
+ verifyServerRegionSize(NUM_ENTRIES);
+ dataStore3.invoke(() -> getRegion(false).clear());
+ verifyServerRegionSize(0);
+
+ // do the region destroy to compare that the same callbacks will be triggered
+ dataStore3.invoke(() -> {
+ Region region = getRegion(false);
+ region.destroyRegion();
+ });
+
+ assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(dataStore1.invoke(getWriterClears))
+ .isEqualTo(0);
+ assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(dataStore2.invoke(getWriterClears))
+ .isEqualTo(0);
+ assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(dataStore3.invoke(getWriterClears))
+ .isEqualTo(1);
+ assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(accessor.invoke(getWriterClears))
+ .isEqualTo(0);
}
@Test
- public void normalClearFromDataStore() {
+ public void normalClearFromDataStoreWithoutWriterOnDataStore() {
+ configureServers(false, true);
+ client1.invoke(this::initClientCache);
+ client2.invoke(this::initClientCache);
+
accessor.invoke(() -> feed(false));
verifyServerRegionSize(NUM_ENTRIES);
dataStore1.invoke(() -> getRegion(false).clear());
verifyServerRegionSize(0);
- verifyCacheListenerTriggerCount(dataStore1);
+
+ // do the region destroy to compare that the same callbacks will be triggered
+ dataStore1.invoke(() -> {
+ Region region = getRegion(false);
+ region.destroyRegion();
+ });
+
+ assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(dataStore1.invoke(getWriterClears))
+ .isEqualTo(0);
+ assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(dataStore2.invoke(getWriterClears))
+ .isEqualTo(0);
+ assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(dataStore3.invoke(getWriterClears))
+ .isEqualTo(0);
+ assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(accessor.invoke(getWriterClears))
+ .isEqualTo(1);
}
@Test
- public void normalClearFromAccessor() {
+ public void normalClearFromAccessorWithWriterOnDataStore() {
+ configureServers(true, true);
+ client1.invoke(this::initClientCache);
+ client2.invoke(this::initClientCache);
+
accessor.invoke(() -> feed(false));
verifyServerRegionSize(NUM_ENTRIES);
accessor.invoke(() -> getRegion(false).clear());
verifyServerRegionSize(0);
- verifyCacheListenerTriggerCount(accessor);
+
+ // do the region destroy to compare that the same callbacks will be triggered
+ accessor.invoke(() -> {
+ Region region = getRegion(false);
+ region.destroyRegion();
+ });
+
+ assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(dataStore1.invoke(getWriterClears))
+ .isEqualTo(0);
+ assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(dataStore2.invoke(getWriterClears))
+ .isEqualTo(0);
+ assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(dataStore3.invoke(getWriterClears))
+ .isEqualTo(0);
+ assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(accessor.invoke(getWriterClears))
+ .isEqualTo(1);
+ }
+
+ @Test
+ public void normalClearFromAccessorWithoutWriterButWithWriterOnDataStore() {
+ configureServers(true, false);
+ client1.invoke(this::initClientCache);
+ client2.invoke(this::initClientCache);
+
+ accessor.invoke(() -> feed(false));
+ verifyServerRegionSize(NUM_ENTRIES);
+ accessor.invoke(() -> getRegion(false).clear());
+ verifyServerRegionSize(0);
+
+ // do the region destroy to compare that the same callbacks will be triggered
+ accessor.invoke(() -> {
+ Region region = getRegion(false);
+ region.destroyRegion();
+ });
+
+ assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(dataStore1.invoke(getWriterClears))
+ .isEqualTo(0);
+ assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(dataStore2.invoke(getWriterClears))
+ .isEqualTo(0);
+ assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(dataStore3.invoke(getWriterClears))
+ .isEqualTo(1);
+ assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(accessor.invoke(getWriterClears))
+ .isEqualTo(0);
}
@Test
public void normalClearFromClient() {
+ configureServers(true, false);
+ client1.invoke(this::initClientCache);
+ client2.invoke(this::initClientCache);
+
client1.invoke(() -> feed(true));
verifyClientRegionSize(NUM_ENTRIES);
verifyServerRegionSize(NUM_ENTRIES);
@@ -198,21 +306,53 @@ public class PartitionedRegionClearDUnitTest implements Serializable {
client1.invoke(() -> getRegion(true).clear());
verifyServerRegionSize(0);
verifyClientRegionSize(0);
- verifyCacheListenerTriggerCount(null);
+
+ // do the region destroy to compare that the same callbacks will be triggered
+ client1.invoke(() -> {
+ Region region = getRegion(true);
+ region.destroyRegion();
+ });
+
+ assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(dataStore1.invoke(getWriterClears))
+ .isEqualTo(0);
+ assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(dataStore2.invoke(getWriterClears))
+ .isEqualTo(0);
+ assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(dataStore3.invoke(getWriterClears))
+ .isEqualTo(1);
+ assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(accessor.invoke(getWriterClears))
+ .isEqualTo(0);
}
- private static class CountingCacheListener extends CacheListenerAdapter {
- private final AtomicInteger clears = new AtomicInteger();
+ public static HashMap<String, AtomicInteger> clearsByRegion = new HashMap<>();
+ public static HashMap<String, AtomicInteger> destroysByRegion = new HashMap<>();
+ private static class CountingCacheWriter extends CacheWriterAdapter {
@Override
- public void afterRegionClear(RegionEvent event) {
+ public void beforeRegionClear(RegionEvent event) throws CacheWriterException {
Region region = event.getRegion();
- logger.info("Region " + region.getFullPath() + " is cleared.");
- clears.incrementAndGet();
+ AtomicInteger clears = clearsByRegion.get(region.getName());
+ if (clears == null) {
+ clears = new AtomicInteger(1);
+ clearsByRegion.put(region.getName(), clears);
+ } else {
+ clears.incrementAndGet();
+ }
+ logger
+ .info("Region " + region.getName() + " will be cleared, clear count is:" + clears.get());
}
- int getClears() {
- return clears.get();
+ @Override
+ public void beforeRegionDestroy(RegionEvent event) throws CacheWriterException {
+ Region region = event.getRegion();
+ AtomicInteger destroys = destroysByRegion.get(region.getName());
+ if (destroys == null) {
+ destroys = new AtomicInteger(1);
+ destroysByRegion.put(region.getName(), destroys);
+ } else {
+ destroys.incrementAndGet();
+ }
+ logger.info(
+ "Region " + region.getName() + " will be destroyed, destroy count is:" + destroys.get());
}
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index a27e058..3580f72 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -3000,7 +3000,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
/**
* @since GemFire 5.7
*/
- private void serverRegionClear(RegionEventImpl regionEvent) {
+ protected void serverRegionClear(RegionEventImpl regionEvent) {
if (regionEvent.getOperation().isDistributed()) {
ServerRegionProxy mySRP = getServerProxy();
if (mySRP != null) {
@@ -3121,7 +3121,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
return result;
}
- private void cacheWriteBeforeRegionClear(RegionEventImpl event)
+ void cacheWriteBeforeRegionClear(RegionEventImpl event)
throws CacheWriterException, TimeoutException {
// copy into local var to prevent race condition
CacheWriter writer = basicGetWriter();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 312e951..9222765 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -2188,6 +2188,9 @@ public class PartitionedRegion extends LocalRegion
throw cache.getCacheClosedException("Cache is shutting down");
}
+ // do cacheWrite
+ cacheWriteBeforeRegionClear(regionEvent);
+
// create ClearPRMessage per bucket
List<ClearPRMessage> clearMsgList = createClearPRMessages(regionEvent.getEventId());
for (ClearPRMessage clearPRMessage : clearMsgList) {
@@ -4455,6 +4458,26 @@ public class PartitionedRegion extends LocalRegion
return null;
}
+ boolean triggerWriter(RegionEventImpl event, SearchLoadAndWriteProcessor processor, int paction,
+ String theKey) {
+ CacheWriter localWriter = basicGetWriter();
+ Set netWriteRecipients = localWriter == null ? this.distAdvisor.adviseNetWrite() : null;
+
+ if (localWriter == null && (netWriteRecipients == null || netWriteRecipients.isEmpty())) {
+ return false;
+ }
+
+ final long start = getCachePerfStats().startCacheWriterCall();
+ try {
+ processor.initialize(this, theKey, null);
+ processor.doNetWrite(event, netWriteRecipients, localWriter, paction);
+ processor.release();
+ } finally {
+ getCachePerfStats().endCacheWriterCall(start);
+ }
+ return true;
+ }
+
/**
* This invokes a cache writer before a destroy operation. Although it has the same method
* signature as the method in LocalRegion, it is invoked in a different code path. LocalRegion
@@ -4464,31 +4487,26 @@ public class PartitionedRegion extends LocalRegion
@Override
boolean cacheWriteBeforeRegionDestroy(RegionEventImpl event)
throws CacheWriterException, TimeoutException {
-
if (event.getOperation().isDistributed()) {
serverRegionDestroy(event);
- CacheWriter localWriter = basicGetWriter();
- Set netWriteRecipients = localWriter == null ? this.distAdvisor.adviseNetWrite() : null;
-
- if (localWriter == null && (netWriteRecipients == null || netWriteRecipients.isEmpty())) {
- return false;
- }
-
- final long start = getCachePerfStats().startCacheWriterCall();
- try {
- SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
- processor.initialize(this, "preDestroyRegion", null);
- processor.doNetWrite(event, netWriteRecipients, localWriter,
- SearchLoadAndWriteProcessor.BEFOREREGIONDESTROY);
- processor.release();
- } finally {
- getCachePerfStats().endCacheWriterCall(start);
- }
- return true;
+ SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
+ return triggerWriter(event, processor, SearchLoadAndWriteProcessor.BEFOREREGIONDESTROY,
+ "preDestroyRegion");
}
return false;
}
+ @Override
+ void cacheWriteBeforeRegionClear(RegionEventImpl event)
+ throws CacheWriterException, TimeoutException {
+ if (event.getOperation().isDistributed()) {
+ serverRegionClear(event);
+ SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
+ triggerWriter(event, processor, SearchLoadAndWriteProcessor.BEFOREREGIONCLEAR,
+ "preClearRegion");
+ }
+ }
+
/**
* Test Method: Get the DistributedMember identifier for the vm containing a key
*