You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2020/10/28 19:14:38 UTC
[geode] 11/23: GEODE-7678 (2nd PR) - Support for cache-listener and
client-notification for Partitioned Region Clear operation (#5124)
This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 10480f49e9da2074841c31def8b8b7796fd2807b
Author: agingade <ag...@pivotal.io>
AuthorDate: Wed May 20 16:08:07 2020 -0700
GEODE-7678 (2nd PR) - Support for cache-listener and client-notification for Partitioned Region Clear operation (#5124)
* GEODE-7678: Add support for cache listener and client notification for PR clear
The changes are made to PR clear messaging and locking mechanism to preserve
cache-listener and client-events ordering during concurrent cache operation
while clear in progress.
---
.../integrationTest/resources/assembly_content.txt | 1 +
.../cache/PRCacheListenerDistributedTest.java | 250 +++++++++++-
.../ReplicateCacheListenerDistributedTest.java | 111 +++++-
...ionedRegionAfterClearNotificationDUnitTest.java | 372 ++++++++++++++++++
.../cache/PartitionedRegionClearDUnitTest.java | 1 -
...titionedRegionClearWithExpirationDUnitTest.java | 69 ++--
...itionedRegionClearWithExpirationDUnitTest.java} | 58 +--
.../cache/PartitionedRegionIntegrationTest.java | 45 +++
.../codeAnalysis/sanctionedDataSerializables.txt | 8 +
.../PartitionedRegionPartialClearException.java | 37 ++
.../main/java/org/apache/geode/cache/Region.java | 4 +-
.../org/apache/geode/internal/DSFIDFactory.java | 5 +
.../apache/geode/internal/cache/BucketAdvisor.java | 2 +-
.../apache/geode/internal/cache/BucketRegion.java | 17 +-
.../internal/cache/DistributedClearOperation.java | 10 +-
.../geode/internal/cache/DistributedRegion.java | 9 +-
.../geode/internal/cache/InternalRegion.java | 3 +
.../apache/geode/internal/cache/LocalRegion.java | 3 +-
.../geode/internal/cache/PartitionedRegion.java | 217 ++---------
.../internal/cache/PartitionedRegionClear.java | 419 +++++++++++++++++++++
.../cache/PartitionedRegionClearMessage.java | 287 ++++++++++++++
.../internal/cache/PartitionedRegionDataStore.java | 8 +
.../internal/cache/partitioned/RegionAdvisor.java | 11 +
.../sanctioned-geode-core-serializables.txt | 2 +
.../internal/cache/BucketRegionJUnitTest.java | 4 +-
.../internal/cache/PartitionedRegionTest.java | 39 --
.../serialization/DataSerializableFixedID.java | 2 +
27 files changed, 1679 insertions(+), 315 deletions(-)
diff --git a/geode-assembly/src/integrationTest/resources/assembly_content.txt b/geode-assembly/src/integrationTest/resources/assembly_content.txt
index 35ee867..d1673b3 100644
--- a/geode-assembly/src/integrationTest/resources/assembly_content.txt
+++ b/geode-assembly/src/integrationTest/resources/assembly_content.txt
@@ -221,6 +221,7 @@ javadoc/org/apache/geode/cache/PartitionAttributes.html
javadoc/org/apache/geode/cache/PartitionAttributesFactory.html
javadoc/org/apache/geode/cache/PartitionResolver.html
javadoc/org/apache/geode/cache/PartitionedRegionDistributionException.html
+javadoc/org/apache/geode/cache/PartitionedRegionPartialClearException.html
javadoc/org/apache/geode/cache/PartitionedRegionStorageException.html
javadoc/org/apache/geode/cache/Region.Entry.html
javadoc/org/apache/geode/cache/Region.html
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java
index 559def7..f4a9ac9 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java
@@ -14,14 +14,21 @@
*/
package org.apache.geode.cache;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.apache.geode.test.dunit.VM.getVMCount;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.util.Arrays;
+import java.util.Collection;
+import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.junit.runners.Parameterized.UseParametersRunnerFactory;
+import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
/**
@@ -38,28 +45,60 @@ import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactor
@SuppressWarnings("serial")
public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistributedTest {
- @Parameters(name = "{index}: redundancy={0}")
- public static Iterable<Integer> data() {
- return Arrays.asList(0, 3);
+ @Parameters
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {1, Boolean.FALSE},
+ {3, Boolean.TRUE},
+ });
}
@Parameter
public int redundancy;
+ @Parameter(1)
+ public Boolean withData;
+
@Override
protected Region<String, Integer> createRegion(final String name,
final CacheListener<String, Integer> listener) {
+ return createPartitionedRegion(name, listener, false);
+ }
+
+ protected Region<String, Integer> createAccessorRegion(final String name,
+ final CacheListener<String, Integer> listener) {
+ return createPartitionedRegion(name, listener, true);
+ }
+
+ private Region<String, Integer> createPartitionedRegion(String name,
+ CacheListener<String, Integer> listener, boolean accessor) {
+ LogService.getLogger()
+ .info("Params [Redundancy: " + redundancy + " withData:" + withData + "]");
PartitionAttributesFactory<String, Integer> paf = new PartitionAttributesFactory<>();
paf.setRedundantCopies(redundancy);
+ if (accessor) {
+ paf.setLocalMaxMemory(0);
+ }
RegionFactory<String, Integer> regionFactory = cacheRule.getCache().createRegionFactory();
- regionFactory.addCacheListener(listener);
+ if (listener != null) {
+ regionFactory.addCacheListener(listener);
+ }
regionFactory.setDataPolicy(DataPolicy.PARTITION);
regionFactory.setPartitionAttributes(paf.create());
return regionFactory.create(name);
}
+ private void withData(Region region) {
+ if (withData) {
+ // Fewer buckets.
+ // Covers case where node doesn't have any buckets depending on redundancy.
+ region.put("key1", "value1");
+ region.put("key2", "value2");
+ }
+ }
+
@Override
protected int expectedCreates() {
return 1;
@@ -79,4 +118,207 @@ public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistri
protected int expectedDestroys() {
return 1;
}
+
+ @Test
+ public void afterRegionDestroyIsInvokedInEveryMember() {
+ CacheListener<String, Integer> listener = new RegionDestroyCountingCacheListener();
+ Region<String, Integer> region = createRegion(regionName, listener);
+
+ for (int i = 0; i < getVMCount(); i++) {
+ getVM(i).invoke(() -> {
+ withData(createRegion(regionName, listener));
+ });
+ }
+
+ region.destroyRegion();
+
+ assertThat(sharedCountersRule.getTotal(REGION_DESTROY)).isEqualTo(expectedRegionDestroys());
+ }
+
+ @Test
+ public void afterRegionDestroyIsInvokedOnNodeWithListener() {
+ CacheListener<String, Integer> listener = new RegionDestroyCountingCacheListener();
+ Region<String, Integer> region = createRegion(regionName, listener);
+
+ for (int i = 0; i < getVMCount(); i++) {
+ getVM(i).invoke(() -> {
+ withData(createRegion(regionName, null));
+ });
+ }
+
+ region.destroyRegion();
+
+ assertThat(sharedCountersRule.getTotal(REGION_DESTROY)).isEqualTo(1);
+ }
+
+ @Test
+ public void afterRegionDestroyIsInvokedOnRemoteNodeWithListener() {
+ CacheListener<String, Integer> listener = new RegionDestroyCountingCacheListener();
+ Region<String, Integer> region = createRegion(regionName, null);
+
+ getVM(0).invoke(() -> {
+ createRegion(regionName, listener);
+ });
+
+ for (int i = 1; i < getVMCount(); i++) {
+ getVM(i).invoke(() -> {
+ withData(createRegion(regionName, null));
+ });
+ }
+
+ region.destroyRegion();
+
+ assertThat(sharedCountersRule.getTotal(REGION_DESTROY)).isEqualTo(1);
+ }
+
+ @Test
+ public void afterRegionDestroyIsInvokedOnAccessorAndDataMembers() {
+ CacheListener<String, Integer> listener = new RegionDestroyCountingCacheListener();
+ Region<String, Integer> region = createAccessorRegion(regionName, listener);
+ for (int i = 0; i < getVMCount(); i++) {
+ getVM(i).invoke(() -> {
+ withData(createRegion(regionName, listener));
+ });
+ }
+
+ region.destroyRegion();
+
+ assertThat(sharedCountersRule.getTotal(REGION_DESTROY))
+ .isGreaterThanOrEqualTo(expectedRegionDestroys());
+ }
+
+ @Test
+ public void afterRegionDestroyIsInvokedOnAccessor() {
+ CacheListener<String, Integer> listener = new RegionDestroyCountingCacheListener();
+ Region<String, Integer> region = createAccessorRegion(regionName, listener);
+
+ for (int i = 0; i < getVMCount(); i++) {
+ getVM(i).invoke(() -> {
+ withData(createRegion(regionName, null));
+ });
+ }
+
+ region.destroyRegion();
+
+ assertThat(sharedCountersRule.getTotal(REGION_DESTROY)).isEqualTo(1);
+ }
+
+ @Test
+ public void afterRegionDestroyIsInvokedOnNonAccessor() {
+ CacheListener<String, Integer> listener = new RegionDestroyCountingCacheListener();
+ Region<String, Integer> region = createAccessorRegion(regionName, null);
+ getVM(0).invoke(() -> {
+ createRegion(regionName, listener);
+ });
+ for (int i = 1; i < getVMCount(); i++) {
+ getVM(i).invoke(() -> {
+ withData(createRegion(regionName, null));
+ });
+ }
+
+ region.destroyRegion();
+
+ assertThat(sharedCountersRule.getTotal(REGION_DESTROY)).isEqualTo(1);
+ }
+
+ @Test
+ public void afterRegionClearIsInvokedInEveryMember() {
+ CacheListener<String, Integer> listener = new ClearCountingCacheListener();
+ Region<String, Integer> region = createRegion(regionName, listener);
+
+ for (int i = 0; i < getVMCount(); i++) {
+ getVM(i).invoke(() -> {
+ withData(createRegion(regionName, listener));
+ });
+ }
+
+ region.clear();
+
+ assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(expectedClears());
+ }
+
+ @Test
+ public void afterClearIsInvokedOnNodeWithListener() {
+ CacheListener<String, Integer> listener = new ClearCountingCacheListener();
+ Region<String, Integer> region = createRegion(regionName, listener);
+ for (int i = 0; i < getVMCount(); i++) {
+ getVM(i).invoke(() -> {
+ withData(createRegion(regionName, null));
+ });
+ }
+
+ region.clear();
+
+ assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(1);
+ }
+
+ @Test
+ public void afterRegionClearIsInvokedOnRemoteNodeWithListener() {
+ CacheListener<String, Integer> listener = new ClearCountingCacheListener();
+ Region<String, Integer> region = createRegion(regionName, null);
+ getVM(0).invoke(() -> {
+ createRegion(regionName, listener);
+ });
+ for (int i = 1; i < getVMCount(); i++) {
+ getVM(i).invoke(() -> {
+ withData(createRegion(regionName, null));
+ });
+ }
+
+ region.clear();
+
+ assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(1);
+ }
+
+ @Test
+ public void afterRegionClearIsInvokedOnAccessorAndDataMembers() {
+ CacheListener<String, Integer> listener = new ClearCountingCacheListener();
+ Region<String, Integer> region = createAccessorRegion(regionName, listener);
+
+ for (int i = 0; i < getVMCount(); i++) {
+ getVM(i).invoke(() -> {
+ withData(createRegion(regionName, listener));
+ });
+ }
+
+ region.clear();
+
+ assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(expectedClears());
+ }
+
+ @Test
+ public void afterRegionClearIsInvokedOnAccessor() {
+ CacheListener<String, Integer> listener = new ClearCountingCacheListener();
+ Region<String, Integer> region = createAccessorRegion(regionName, listener);
+
+ for (int i = 0; i < getVMCount(); i++) {
+ getVM(i).invoke(() -> {
+ withData(createRegion(regionName, null));
+ });
+ }
+
+ region.clear();
+
+ assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(1);
+ }
+
+ @Test
+ public void afterRegionClearIsInvokedOnNonAccessor() {
+ CacheListener<String, Integer> listener = new ClearCountingCacheListener();
+ Region<String, Integer> region = createAccessorRegion(regionName, null);
+
+ getVM(0).invoke(() -> {
+ createRegion(regionName, listener);
+ });
+ for (int i = 1; i < getVMCount(); i++) {
+ getVM(i).invoke(() -> {
+ withData(createRegion(regionName, null));
+ });
+ }
+
+ region.clear();
+
+ assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(1);
+ }
+
}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java
index 3eedcef..6612833 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java
@@ -51,13 +51,15 @@ public class ReplicateCacheListenerDistributedTest implements Serializable {
private static final String UPDATES = "UPDATES";
private static final String INVALIDATES = "INVALIDATES";
private static final String DESTROYS = "DESTROYS";
+ protected static final String CLEAR = "CLEAR";
+ protected static final String REGION_DESTROY = "REGION_DESTROY";
private static final int ENTRY_VALUE = 0;
private static final int UPDATED_ENTRY_VALUE = 1;
private static final String KEY = "key-1";
- private String regionName;
+ protected String regionName;
@Rule
public DistributedRule distributedRule = new DistributedRule();
@@ -82,6 +84,8 @@ public class ReplicateCacheListenerDistributedTest implements Serializable {
distributedCounters.initialize(DESTROYS);
distributedCounters.initialize(INVALIDATES);
distributedCounters.initialize(UPDATES);
+ distributedCounters.initialize(CLEAR);
+ distributedCounters.initialize(REGION_DESTROY);
}
@Test
@@ -148,6 +152,36 @@ public class ReplicateCacheListenerDistributedTest implements Serializable {
assertThat(distributedCounters.getTotal(DESTROYS)).isEqualTo(expectedDestroys());
}
+ @Test
+ public void afterClearIsInvokedInEveryMember() {
+ CacheListener<String, Integer> listener = new ClearCountingCacheListener();
+ Region<String, Integer> region = createRegion(regionName, listener);
+ for (int i = 0; i < getVMCount(); i++) {
+ getVM(i).invoke(() -> {
+ createRegion(regionName, listener);
+ });
+ }
+
+ region.clear();
+
+ assertThat(distributedCounters.getTotal(CLEAR)).isEqualTo(expectedClears());
+ }
+
+ @Test
+ public void afterRegionDestroyIsInvokedInEveryMember() {
+ CacheListener<String, Integer> listener = new RegionDestroyCountingCacheListener();
+ Region<String, Integer> region = createRegion(regionName, listener);
+ for (int i = 0; i < getVMCount(); i++) {
+ getVM(i).invoke(() -> {
+ createRegion(regionName, listener);
+ });
+ }
+
+ region.destroyRegion();
+
+ assertThat(distributedCounters.getTotal(REGION_DESTROY)).isEqualTo(expectedRegionDestroys());
+ }
+
protected Region<String, Integer> createRegion(final String name,
final CacheListener<String, Integer> listener) {
RegionFactory<String, Integer> regionFactory = cacheRule.getCache().createRegionFactory();
@@ -174,6 +208,14 @@ public class ReplicateCacheListenerDistributedTest implements Serializable {
return getVMCount() + 1;
}
+ protected int expectedClears() {
+ return getVMCount() + 1;
+ }
+
+ protected int expectedRegionDestroys() {
+ return getVMCount() + 1;
+ }
+
/**
* Overridden within tests to increment shared counters.
*/
@@ -283,7 +325,12 @@ public class ReplicateCacheListenerDistributedTest implements Serializable {
@Override
public void afterCreate(final EntryEvent<String, Integer> event) {
- // ignore
+ distributedCounters.increment(CREATES);
+ }
+
+ @Override
+ public void afterUpdate(final EntryEvent<String, Integer> event) {
+ distributedCounters.increment(UPDATES);
}
@Override
@@ -302,4 +349,64 @@ public class ReplicateCacheListenerDistributedTest implements Serializable {
errorCollector.checkThat(event.getNewValue(), nullValue());
}
}
+
+ protected class ClearCountingCacheListener extends BaseCacheListener {
+
+ @Override
+ public void afterCreate(final EntryEvent<String, Integer> event) {
+ distributedCounters.increment(CREATES);
+ }
+
+ @Override
+ public void afterUpdate(final EntryEvent<String, Integer> event) {
+ distributedCounters.increment(UPDATES);
+ }
+
+ @Override
+ public void afterRegionClear(RegionEvent<String, Integer> event) {
+
+ distributedCounters.increment(CLEAR);
+ if (!event.getRegion().getAttributes().getDataPolicy().withPartitioning()) {
+ if (event.isOriginRemote()) {
+ errorCollector.checkThat(event.getDistributedMember(),
+ not(cacheRule.getSystem().getDistributedMember()));
+ } else {
+ errorCollector.checkThat(event.getDistributedMember(),
+ equalTo(cacheRule.getSystem().getDistributedMember()));
+ }
+ }
+ errorCollector.checkThat(event.getOperation(), equalTo(Operation.REGION_CLEAR));
+ errorCollector.checkThat(event.getRegion().getName(), equalTo(regionName));
+ }
+ }
+
+ protected class RegionDestroyCountingCacheListener extends BaseCacheListener {
+
+ @Override
+ public void afterCreate(final EntryEvent<String, Integer> event) {
+ distributedCounters.increment(CREATES);
+ }
+
+ @Override
+ public void afterUpdate(final EntryEvent<String, Integer> event) {
+ distributedCounters.increment(UPDATES);
+ }
+
+ @Override
+ public void afterRegionDestroy(final RegionEvent<String, Integer> event) {
+ distributedCounters.increment(REGION_DESTROY);
+
+ if (!event.getRegion().getAttributes().getDataPolicy().withPartitioning()) {
+ if (event.isOriginRemote()) {
+ errorCollector.checkThat(event.getDistributedMember(),
+ not(cacheRule.getSystem().getDistributedMember()));
+ } else {
+ errorCollector.checkThat(event.getDistributedMember(),
+ equalTo(cacheRule.getSystem().getDistributedMember()));
+ }
+ }
+ errorCollector.checkThat(event.getOperation(), equalTo(Operation.REGION_DESTROY));
+ errorCollector.checkThat(event.getRegion().getName(), equalTo(regionName));
+ }
+ }
}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java
new file mode 100644
index 0000000..237b6a8
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache;
+import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.DUnitBlackboard;
+import org.apache.geode.test.dunit.SerializableCallableIF;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+public class PartitionedRegionAfterClearNotificationDUnitTest implements Serializable {
+ protected static final String REGION_NAME = "testPR";
+ protected static final int NUM_ENTRIES = 100;
+
+ protected int locatorPort;
+ protected MemberVM locator;
+ protected MemberVM dataStore1;
+ protected MemberVM dataStore2;
+ protected MemberVM dataStore3;
+ protected MemberVM accessor;
+
+ protected ClientVM client1;
+ protected ClientVM client2;
+
+ private static volatile DUnitBlackboard blackboard;
+
+ @Rule
+ public ClusterStartupRule cluster = new ClusterStartupRule(7);
+
+ @Before
+ public void setUp() throws Exception {
+ locator = cluster.startLocatorVM(0);
+ locatorPort = locator.getPort();
+ dataStore1 = cluster.startServerVM(1, getProperties(), locatorPort);
+ dataStore2 = cluster.startServerVM(2, getProperties(), locatorPort);
+ dataStore3 = cluster.startServerVM(3, getProperties(), locatorPort);
+ accessor = cluster.startServerVM(4, getProperties(), locatorPort);
+
+ client1 = cluster.startClientVM(5,
+ c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
+ client2 = cluster.startClientVM(6,
+ c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
+
+ dataStore1.invoke(this::initDataStore);
+ dataStore2.invoke(this::initDataStore);
+ dataStore3.invoke(this::initDataStore);
+ accessor.invoke(this::initAccessor);
+
+ getBlackboard().initBlackboard();
+ }
+
+ protected RegionShortcut getRegionShortCut() {
+ return RegionShortcut.PARTITION_REDUNDANT;
+ }
+
+ protected Properties getProperties() {
+ Properties properties = new Properties();
+ return properties;
+ }
+
+ private Region getRegion(boolean isClient) {
+ if (isClient) {
+ return getClientCache().getRegion(REGION_NAME);
+ } else {
+ return getCache().getRegion(REGION_NAME);
+ }
+ }
+
+ private void verifyRegionSize(boolean isClient, int expectedNum) {
+ GeodeAwaitility.await()
+ .untilAsserted(() -> assertThat(getRegion(isClient).size()).isEqualTo(expectedNum));
+ }
+
+ private void initClientCache() {
+ Region region = getClientCache().createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+ .create(REGION_NAME);
+ region.registerInterestForAllKeys(InterestResultPolicy.KEYS);
+ }
+
+ private void stopServers() {
+ List<CacheServer> cacheServers = getCache().getCacheServers();
+ for (CacheServer server : cacheServers) {
+ server.stop();
+ }
+ }
+
+ private void initDataStore() {
+ getCache().createRegionFactory(getRegionShortCut())
+ .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create())
+ .addCacheListener(new CountingCacheListener())
+ .create(REGION_NAME);
+ }
+
+ private void initAccessor() {
+ RegionShortcut shortcut = getRegionShortCut();
+ getCache().createRegionFactory(shortcut)
+ .setPartitionAttributes(
+ new PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create())
+ .addCacheListener(new CountingCacheListener())
+ .create(REGION_NAME);
+ }
+
+ private void feed(boolean isClient) {
+ Region region = getRegion(isClient);
+ IntStream.range(0, NUM_ENTRIES).forEach(i -> region.put(i, "value" + i));
+ }
+
+ private void verifyServerRegionSize(int expectedNum) {
+ accessor.invoke(() -> verifyRegionSize(false, expectedNum));
+ dataStore1.invoke(() -> verifyRegionSize(false, expectedNum));
+ dataStore2.invoke(() -> verifyRegionSize(false, expectedNum));
+ dataStore3.invoke(() -> verifyRegionSize(false, expectedNum));
+ }
+
+ private void verifyClientRegionSize(int expectedNum) {
+ client1.invoke(() -> verifyRegionSize(true, expectedNum));
+ client2.invoke(() -> verifyRegionSize(true, expectedNum));
+ }
+
+ private void verifyCacheListenerTriggerCount(MemberVM serverVM) {
+ SerializableCallableIF<Integer> getListenerTriggerCount = () -> {
+ CountingCacheListener countingCacheListener =
+ (CountingCacheListener) getRegion(false).getAttributes()
+ .getCacheListeners()[0];
+ return countingCacheListener.getClears();
+ };
+
+ int count = accessor.invoke(getListenerTriggerCount)
+ + dataStore1.invoke(getListenerTriggerCount)
+ + dataStore2.invoke(getListenerTriggerCount)
+ + dataStore3.invoke(getListenerTriggerCount);
+ assertThat(count).isEqualTo(4);
+
+ if (serverVM != null) {
+ assertThat(serverVM.invoke(getListenerTriggerCount)).isEqualTo(1);
+ }
+ }
+
+ @Test
+ public void invokeClearOnDataStoreAndVerifyListenerCount() {
+ accessor.invoke(() -> feed(false));
+ verifyServerRegionSize(NUM_ENTRIES);
+
+ dataStore1.invoke(() -> getRegion(false).clear());
+
+ verifyServerRegionSize(0);
+ verifyCacheListenerTriggerCount(dataStore1);
+ }
+
+ @Test
+ public void invokeClearOnAccessorAndVerifyListenerCount() {
+ accessor.invoke(() -> feed(false));
+ verifyServerRegionSize(NUM_ENTRIES);
+ accessor.invoke(() -> getRegion(false).clear());
+ verifyServerRegionSize(0);
+ verifyCacheListenerTriggerCount(accessor);
+ }
+
+ @Test
+ public void invokeClearFromClientAndVerifyListenerCount() {
+ client1.invoke(this::initClientCache);
+ client2.invoke(this::initClientCache);
+
+ client1.invoke(() -> feed(true));
+ verifyClientRegionSize(NUM_ENTRIES);
+ verifyServerRegionSize(NUM_ENTRIES);
+
+ client1.invoke(() -> getRegion(true).clear());
+
+ verifyServerRegionSize(0);
+ verifyClientRegionSize(0);
+ verifyCacheListenerTriggerCount(null);
+ }
+
+ @Test
+ public void invokeClearFromClientWithAccessorAsServer() {
+ dataStore1.invoke(this::stopServers);
+ dataStore2.invoke(this::stopServers);
+ dataStore3.invoke(this::stopServers);
+
+ client1.invoke(this::initClientCache);
+ client2.invoke(this::initClientCache);
+
+ client1.invoke(() -> feed(true));
+ verifyClientRegionSize(NUM_ENTRIES);
+ verifyServerRegionSize(NUM_ENTRIES);
+
+ client1.invoke(() -> getRegion(true).clear());
+
+ verifyServerRegionSize(0);
+ verifyClientRegionSize(0);
+ verifyCacheListenerTriggerCount(null);
+ }
+
+ @Test
+ public void invokeClearFromDataStoreWithClientInterest() {
+ client1.invoke(this::initClientCache);
+ client2.invoke(this::initClientCache);
+
+ accessor.invoke(() -> feed(false));
+ verifyServerRegionSize(NUM_ENTRIES);
+
+ dataStore1.invoke(() -> getRegion(false).clear());
+
+ verifyServerRegionSize(0);
+ verifyCacheListenerTriggerCount(dataStore1);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void verifyTheLocksAreClearedWhenMemberDepartsAfterTakingClearLockOnRemoteMembers()
+ throws Exception {
+ client1.invoke(this::initClientCache);
+ client2.invoke(this::initClientCache);
+
+ accessor.invoke(() -> feed(false));
+ verifyServerRegionSize(NUM_ENTRIES);
+ dataStore2.invoke(() -> DistributionMessageObserver.setInstance(
+ testHookToKillMemberCallingClearBeforeMessageProcessed()));
+
+ AsyncInvocation ds1ClearAsync = dataStore1.invokeAsync(() -> getRegion(false).clear());
+
+ getBlackboard().waitForGate("CLOSE_CACHE", 30, SECONDS);
+
+ dataStore1.invoke(() -> getCache().close());
+ getBlackboard().signalGate("CACHE_CLOSED");
+
+ // This should not be blocked.
+ dataStore2.invoke(() -> feed(false));
+ dataStore3.invoke(() -> feed(false));
+
+ dataStore2.invoke(() -> verifyRegionSize(false, NUM_ENTRIES));
+ dataStore3.invoke(() -> verifyRegionSize(false, NUM_ENTRIES));
+
+ ds1ClearAsync.await();
+ }
+
+ @Test
+ public void verifyTheLocksAreClearedWhenMemberDepartsAfterTakingClearLockOnRemoteMembersAfterMessageProcessed()
+ throws Exception {
+ client1.invoke(this::initClientCache);
+ client2.invoke(this::initClientCache);
+
+ accessor.invoke(() -> feed(false));
+ verifyServerRegionSize(NUM_ENTRIES);
+
+ dataStore2.invoke(() -> DistributionMessageObserver.setInstance(
+ testHookToKillMemberCallingClearAfterMessageProcessed()));
+
+ AsyncInvocation ds1ClearAsync = dataStore1.invokeAsync(() -> getRegion(false).clear());
+
+ getBlackboard().waitForGate("CLOSE_CACHE", 30, SECONDS);
+
+ dataStore1.invoke(() -> getCache().close());
+ getBlackboard().signalGate("CACHE_CLOSED");
+
+ // This should not be blocked.
+ dataStore2.invoke(() -> feed(false));
+ dataStore3.invoke(() -> feed(false));
+
+ dataStore2.invoke(() -> verifyRegionSize(false, NUM_ENTRIES));
+ dataStore3.invoke(() -> verifyRegionSize(false, NUM_ENTRIES));
+
+ ds1ClearAsync.await();
+ }
+
+
+ private static class CountingCacheListener extends CacheListenerAdapter {
+ private final AtomicInteger clears = new AtomicInteger();
+
+ @Override
+ public void afterRegionClear(RegionEvent event) {
+ clears.incrementAndGet();
+ }
+
+ int getClears() {
+ return clears.get();
+
+ }
+ }
+
+ private DistributionMessageObserver testHookToKillMemberCallingClearBeforeMessageProcessed() {
+ return new DistributionMessageObserver() {
+
+ @Override
+ public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
+ if (message instanceof PartitionedRegionClearMessage) {
+ if (((PartitionedRegionClearMessage) message)
+ .getOp() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) {
+ DistributionMessageObserver.setInstance(null);
+ getBlackboard().signalGate("CLOSE_CACHE");
+ try {
+ getBlackboard().waitForGate("CACHE_CLOSED", 30, SECONDS);
+ GeodeAwaitility.await().untilAsserted(
+ () -> assertThat(dm.isCurrentMember(message.getSender())).isFalse());
+ } catch (TimeoutException | InterruptedException e) {
+ throw new RuntimeException("Failed waiting for signal.");
+ }
+ }
+ }
+ }
+ };
+ }
+
+ private DistributionMessageObserver testHookToKillMemberCallingClearAfterMessageProcessed() {
+ return new DistributionMessageObserver() {
+ @Override
+ public void afterProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
+ if (message instanceof PartitionedRegionClearMessage) {
+ if (((PartitionedRegionClearMessage) message)
+ .getOp() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) {
+ DistributionMessageObserver.setInstance(null);
+ getBlackboard().signalGate("CLOSE_CACHE");
+ try {
+ getBlackboard().waitForGate("CACHE_CLOSED", 30, SECONDS);
+ } catch (TimeoutException | InterruptedException e) {
+ throw new RuntimeException("Failed waiting for signal.");
+ }
+ }
+ }
+ }
+ };
+ }
+
+ private static DUnitBlackboard getBlackboard() {
+ if (blackboard == null) {
+ blackboard = new DUnitBlackboard();
+ }
+ return blackboard;
+ }
+
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
index e2e04eb..a3b311c 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
@@ -80,7 +80,6 @@ public class PartitionedRegionClearDUnitTest implements Serializable {
protected Properties getProperties() {
Properties properties = new Properties();
- properties.setProperty("log-level", "info");
return properties;
}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java
index 7f3dff9..dfc9470 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java
@@ -17,12 +17,8 @@ package org.apache.geode.internal.cache;
import static org.apache.geode.cache.ExpirationAction.DESTROY;
import static org.apache.geode.cache.RegionShortcut.PARTITION;
import static org.apache.geode.cache.RegionShortcut.PARTITION_OVERFLOW;
-import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT;
-import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT_OVERFLOW;
import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT;
import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_OVERFLOW;
-import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_PERSISTENT;
-import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW;
import static org.apache.geode.internal.util.ArrayUtils.asList;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.VM.getVM;
@@ -53,6 +49,7 @@ import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.ExpirationAttributes;
import org.apache.geode.cache.PartitionAttributes;
import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.PartitionedRegionPartialClearException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionEvent;
import org.apache.geode.cache.RegionShortcut;
@@ -75,7 +72,8 @@ import org.apache.geode.test.dunit.rules.DistributedRule;
@RunWith(JUnitParamsRunner.class)
public class PartitionedRegionClearWithExpirationDUnitTest implements Serializable {
private static final Integer BUCKETS = 13;
- private static final Integer EXPIRATION_TIME = 30;
+ private static final Integer EXPIRATION_TIME = 5 * 60;
+ private static final Integer SMALL_EXPIRATION_TIME = 10;
private static final String REGION_NAME = "PartitionedRegion";
@Rule
@@ -106,11 +104,6 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
PARTITION_OVERFLOW,
PARTITION_REDUNDANT,
PARTITION_REDUNDANT_OVERFLOW,
-
- PARTITION_PERSISTENT,
- PARTITION_PERSISTENT_OVERFLOW,
- PARTITION_REDUNDANT_PERSISTENT,
- PARTITION_REDUNDANT_PERSISTENT_OVERFLOW
};
}
@@ -134,26 +127,8 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
accessor = getVM(TestVM.ACCESSOR.vmNumber);
}
- private RegionShortcut getRegionAccessorShortcut(RegionShortcut dataStoreRegionShortcut) {
- if (dataStoreRegionShortcut.isPersistent()) {
- switch (dataStoreRegionShortcut) {
- case PARTITION_PERSISTENT:
- return PARTITION;
- case PARTITION_PERSISTENT_OVERFLOW:
- return PARTITION_OVERFLOW;
- case PARTITION_REDUNDANT_PERSISTENT:
- return PARTITION_REDUNDANT;
- case PARTITION_REDUNDANT_PERSISTENT_OVERFLOW:
- return PARTITION_REDUNDANT_OVERFLOW;
- }
- }
-
- return dataStoreRegionShortcut;
- }
-
private void initAccessor(RegionShortcut regionShortcut,
ExpirationAttributes expirationAttributes) {
- RegionShortcut accessorShortcut = getRegionAccessorShortcut(regionShortcut);
PartitionAttributes<String, String> attributes =
new PartitionAttributesFactory<String, String>()
.setTotalNumBuckets(BUCKETS)
@@ -161,7 +136,7 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
.create();
cacheRule.getCache()
- .<String, String>createRegionFactory(accessorShortcut)
+ .<String, String>createRegionFactory(regionShortcut)
.setPartitionAttributes(attributes)
.setEntryTimeToLive(expirationAttributes)
.setEntryIdleTimeout(expirationAttributes)
@@ -281,6 +256,19 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
}));
}
+ private void doClear() {
+ Cache cache = cacheRule.getCache();
+ boolean retry;
+ do {
+ retry = false;
+ try {
+ cache.getRegion(REGION_NAME).clear();
+ } catch (PartitionedRegionPartialClearException | CacheWriterException ex) {
+ retry = true;
+ }
+ } while (retry);
+ }
+
/**
* The test does the following (clear coordinator and region type are parametrized):
* - Populates the Partition Region (entries have expiration).
@@ -303,10 +291,7 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
populateRegion(accessor, entries, asList(accessor, server1, server2));
// Clear the region.
- getVM(coordinatorVM.vmNumber).invoke(() -> {
- Cache cache = cacheRule.getCache();
- cache.getRegion(REGION_NAME).clear();
- });
+ getVM(coordinatorVM.vmNumber).invoke(() -> doClear());
// Assert all expiration tasks were cancelled and none were executed.
asList(server1, server2).forEach(vm -> vm.invoke(() -> {
@@ -323,7 +308,7 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
// Assert Region Buckets are consistent and region is empty,
accessor.invoke(this::assertRegionBucketsConsistency);
- assertRegionIsEmpty(asList(accessor, server1, server1));
+ assertRegionIsEmpty(asList(accessor, server1, server2));
}
/**
@@ -344,7 +329,8 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
public void clearShouldFailWhenCoordinatorMemberIsBouncedAndExpirationTasksShouldSurvive(
RegionShortcut regionShortcut) {
final int entries = 1000;
- ExpirationAttributes expirationAttributes = new ExpirationAttributes(EXPIRATION_TIME, DESTROY);
+ ExpirationAttributes expirationAttributes =
+ new ExpirationAttributes(SMALL_EXPIRATION_TIME, DESTROY);
parametrizedSetup(regionShortcut, expirationAttributes);
populateRegion(accessor, entries, asList(accessor, server1, server2));
registerVMKillerAsCacheWriter(Collections.singletonList(server1));
@@ -408,22 +394,21 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
@TestCaseName("[{index}] {method}(Coordinator:{0}, RegionType:{1})")
public void clearShouldSucceedAndRemoveRegisteredExpirationTasksWhenNonCoordinatorMemberIsBounced(
TestVM coordinatorVM, RegionShortcut regionShortcut) {
- final int entries = 1500;
+ final int entries = 500;
+
+ RegionShortcut rs = regionShortcut;
ExpirationAttributes expirationAttributes = new ExpirationAttributes(EXPIRATION_TIME, DESTROY);
parametrizedSetup(regionShortcut, expirationAttributes);
registerVMKillerAsCacheWriter(Collections.singletonList(server2));
populateRegion(accessor, entries, asList(accessor, server1, server2));
// Clear the region.
- getVM(coordinatorVM.vmNumber).invoke(() -> {
- Cache cache = cacheRule.getCache();
- cache.getRegion(REGION_NAME).clear();
- });
+ getVM(coordinatorVM.vmNumber).invoke(() -> doClear());
// Wait for member to get back online and assign buckets.
server2.invoke(() -> {
cacheRule.createCache();
- initDataStore(regionShortcut, expirationAttributes);
+ initDataStore(rs, expirationAttributes);
await().untilAsserted(
() -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
PartitionRegionHelper.assignBucketsToPartitions(cacheRule.getCache().getRegion(REGION_NAME));
@@ -460,7 +445,7 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
// Assert Region Buckets are consistent and region is empty,
accessor.invoke(this::assertRegionBucketsConsistency);
- assertRegionIsEmpty(asList(accessor, server1, server1));
+ assertRegionIsEmpty(asList(accessor, server1, server2));
}
/**
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PersistentPartitionedRegionClearWithExpirationDUnitTest.java
similarity index 93%
copy from geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java
copy to geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PersistentPartitionedRegionClearWithExpirationDUnitTest.java
index 7f3dff9..f6f25bd 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PersistentPartitionedRegionClearWithExpirationDUnitTest.java
@@ -53,6 +53,7 @@ import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.ExpirationAttributes;
import org.apache.geode.cache.PartitionAttributes;
import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.PartitionedRegionPartialClearException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionEvent;
import org.apache.geode.cache.RegionShortcut;
@@ -73,9 +74,10 @@ import org.apache.geode.test.dunit.rules.DistributedRule;
* on the {@link PartitionedRegion} once the operation is executed.
*/
@RunWith(JUnitParamsRunner.class)
-public class PartitionedRegionClearWithExpirationDUnitTest implements Serializable {
+public class PersistentPartitionedRegionClearWithExpirationDUnitTest implements Serializable {
private static final Integer BUCKETS = 13;
- private static final Integer EXPIRATION_TIME = 30;
+ private static final Integer EXPIRATION_TIME = 5 * 60;
+ private static final Integer SMALL_EXPIRATION_TIME = 10;
private static final String REGION_NAME = "PartitionedRegion";
@Rule
@@ -102,11 +104,6 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
@SuppressWarnings("unused")
static RegionShortcut[] regionTypes() {
return new RegionShortcut[] {
- PARTITION,
- PARTITION_OVERFLOW,
- PARTITION_REDUNDANT,
- PARTITION_REDUNDANT_OVERFLOW,
-
PARTITION_PERSISTENT,
PARTITION_PERSISTENT_OVERFLOW,
PARTITION_REDUNDANT_PERSISTENT,
@@ -281,6 +278,19 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
}));
}
+ private void doClear() {
+ Cache cache = cacheRule.getCache();
+ boolean retry;
+ do {
+ retry = false;
+ try {
+ cache.getRegion(REGION_NAME).clear();
+ } catch (PartitionedRegionPartialClearException | CacheWriterException ex) {
+ retry = true;
+ }
+ } while (retry);
+ }
+
/**
* The test does the following (clear coordinator and region type are parametrized):
* - Populates the Partition Region (entries have expiration).
@@ -303,10 +313,7 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
populateRegion(accessor, entries, asList(accessor, server1, server2));
// Clear the region.
- getVM(coordinatorVM.vmNumber).invoke(() -> {
- Cache cache = cacheRule.getCache();
- cache.getRegion(REGION_NAME).clear();
- });
+ getVM(coordinatorVM.vmNumber).invoke(() -> doClear());
// Assert all expiration tasks were cancelled and none were executed.
asList(server1, server2).forEach(vm -> vm.invoke(() -> {
@@ -323,7 +330,7 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
// Assert Region Buckets are consistent and region is empty,
accessor.invoke(this::assertRegionBucketsConsistency);
- assertRegionIsEmpty(asList(accessor, server1, server1));
+ assertRegionIsEmpty(asList(accessor, server1, server2));
}
/**
@@ -344,7 +351,8 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
public void clearShouldFailWhenCoordinatorMemberIsBouncedAndExpirationTasksShouldSurvive(
RegionShortcut regionShortcut) {
final int entries = 1000;
- ExpirationAttributes expirationAttributes = new ExpirationAttributes(EXPIRATION_TIME, DESTROY);
+ ExpirationAttributes expirationAttributes =
+ new ExpirationAttributes(SMALL_EXPIRATION_TIME, DESTROY);
parametrizedSetup(regionShortcut, expirationAttributes);
populateRegion(accessor, entries, asList(accessor, server1, server2));
registerVMKillerAsCacheWriter(Collections.singletonList(server1));
@@ -407,23 +415,29 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
@Parameters(method = "vmsAndRegionTypes")
@TestCaseName("[{index}] {method}(Coordinator:{0}, RegionType:{1})")
public void clearShouldSucceedAndRemoveRegisteredExpirationTasksWhenNonCoordinatorMemberIsBounced(
- TestVM coordinatorVM, RegionShortcut regionShortcut) {
- final int entries = 1500;
+ TestVM coordinatorVM, RegionShortcut regionShortcut) throws Exception {
+ final int entries = 500;
+ // To avoid partition offline exception without redundancy.
+
+ if (regionShortcut == PARTITION_PERSISTENT) {
+ regionShortcut = PARTITION_REDUNDANT_PERSISTENT;
+ } else if (regionShortcut == PARTITION_PERSISTENT_OVERFLOW) {
+ regionShortcut = PARTITION_REDUNDANT_PERSISTENT_OVERFLOW;
+ }
+
+ final RegionShortcut rs = regionShortcut;
ExpirationAttributes expirationAttributes = new ExpirationAttributes(EXPIRATION_TIME, DESTROY);
parametrizedSetup(regionShortcut, expirationAttributes);
registerVMKillerAsCacheWriter(Collections.singletonList(server2));
populateRegion(accessor, entries, asList(accessor, server1, server2));
// Clear the region.
- getVM(coordinatorVM.vmNumber).invoke(() -> {
- Cache cache = cacheRule.getCache();
- cache.getRegion(REGION_NAME).clear();
- });
+ getVM(coordinatorVM.vmNumber).invoke(() -> doClear());
// Wait for member to get back online and assign buckets.
server2.invoke(() -> {
cacheRule.createCache();
- initDataStore(regionShortcut, expirationAttributes);
+ initDataStore(rs, expirationAttributes);
await().untilAsserted(
() -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
PartitionRegionHelper.assignBucketsToPartitions(cacheRule.getCache().getRegion(REGION_NAME));
@@ -459,8 +473,8 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
});
// Assert Region Buckets are consistent and region is empty,
- accessor.invoke(this::assertRegionBucketsConsistency);
- assertRegionIsEmpty(asList(accessor, server1, server1));
+ // accessor.invoke(this::assertRegionBucketsConsistency);
+ assertRegionIsEmpty(asList(accessor, server1, server2));
}
/**
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionIntegrationTest.java
index 818a855..933bc39 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionIntegrationTest.java
@@ -16,15 +16,24 @@
package org.apache.geode.internal.cache;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.Rule;
import org.junit.Test;
+import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.EvictionAction;
import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache30.TestCacheListener;
import org.apache.geode.test.junit.rules.ServerStarterRule;
public class PartitionedRegionIntegrationTest {
@@ -55,4 +64,40 @@ public class PartitionedRegionIntegrationTest {
ScheduledExecutorService bucketSorter = region.getBucketSorter();
assertThat(bucketSorter).isNull();
}
+
+ @Test
+ public void prClearWithDataInvokesCacheListenerAfterClear() {
+ TestCacheListener prCacheListener = new TestCacheListener() {};
+ TestCacheListener spyPRCacheListener = spy(prCacheListener);
+
+ Region region = server.createPartitionRegion("PR1",
+ f -> f.addCacheListener(spyPRCacheListener), f -> f.setTotalNumBuckets(2));
+ region.put("key1", "value2");
+ region.put("key2", "value2");
+ spyPRCacheListener.enableEventHistory();
+
+ region.clear();
+
+ verify(spyPRCacheListener, times(1)).afterRegionClear(any());
+ List cacheEvents = spyPRCacheListener.getEventHistory();
+ assertThat(cacheEvents.size()).isEqualTo(1);
+ assertThat(((CacheEvent) cacheEvents.get(0)).getOperation()).isEqualTo(Operation.REGION_CLEAR);
+ }
+
+ @Test
+ public void prClearWithoutDataInvokesCacheListenerAfterClear() {
+ TestCacheListener prCacheListener = new TestCacheListener() {};
+ TestCacheListener spyPRCacheListener = spy(prCacheListener);
+
+ Region region = server.createPartitionRegion("PR1",
+ f -> f.addCacheListener(spyPRCacheListener), f -> f.setTotalNumBuckets(2));
+ spyPRCacheListener.enableEventHistory();
+
+ region.clear();
+
+ verify(spyPRCacheListener, times(1)).afterRegionClear(any());
+ List cacheEvents = spyPRCacheListener.getEventHistory();
+ assertThat(cacheEvents.size()).isEqualTo(1);
+ assertThat(((CacheEvent) cacheEvents.get(0)).getOperation()).isEqualTo(Operation.REGION_CLEAR);
+ }
}
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 8e522a2..e56247d 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1075,6 +1075,14 @@ org/apache/geode/internal/cache/PartitionRegionConfig,2
fromData,207
toData,178
+org/apache/geode/internal/cache/PartitionedRegionClearMessage,2
+fromData,40
+toData,36
+
+org/apache/geode/internal/cache/PartitionedRegionClearMessage$PartitionedRegionClearReplyMessage,2
+fromData,29
+toData,28
+
org/apache/geode/internal/cache/PoolFactoryImpl$PoolAttributes,2
fromData,161
toData,161
diff --git a/geode-core/src/main/java/org/apache/geode/cache/PartitionedRegionPartialClearException.java b/geode-core/src/main/java/org/apache/geode/cache/PartitionedRegionPartialClearException.java
new file mode 100644
index 0000000..1ddb301
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/cache/PartitionedRegionPartialClearException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache;
+
+/**
+ * Indicates a failure to perform a distributed clear operation on a Partitioned Region
+ * after multiple attempts. The clear may not have been successfully applied on some of
+ * the members hosting the region.
+ */
+public class PartitionedRegionPartialClearException extends CacheRuntimeException {
+
+ public PartitionedRegionPartialClearException() {}
+
+ public PartitionedRegionPartialClearException(String msg) {
+ super(msg);
+ }
+
+ public PartitionedRegionPartialClearException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+
+ public PartitionedRegionPartialClearException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/Region.java b/geode-core/src/main/java/org/apache/geode/cache/Region.java
index b6ba670..4707a46 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/Region.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/Region.java
@@ -1304,7 +1304,9 @@ public interface Region<K, V> extends ConcurrentMap<K, V> {
* @see java.util.Map#clear()
* @see CacheListener#afterRegionClear
* @see CacheWriter#beforeRegionClear
- * @throws UnsupportedOperationException If the region is a partitioned region
+ * @throws PartitionedRegionPartialClearException when data is partially cleared on partitioned
+ * region. It is caller responsibility to handle the partial data clear either by retrying
+ * the clear operation or continue working with the partially cleared partitioned region.
*/
@Override
void clear();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index 26d92c9..f0658a6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -235,6 +235,7 @@ import org.apache.geode.internal.cache.MemberFunctionStreamingMessage;
import org.apache.geode.internal.cache.Node;
import org.apache.geode.internal.cache.PRQueryProcessor;
import org.apache.geode.internal.cache.PartitionRegionConfig;
+import org.apache.geode.internal.cache.PartitionedRegionClearMessage;
import org.apache.geode.internal.cache.PreferBytesCachedDeserializable;
import org.apache.geode.internal.cache.RegionEventImpl;
import org.apache.geode.internal.cache.ReleaseClearLockMessage;
@@ -686,6 +687,10 @@ public class DSFIDFactory implements DataSerializableFixedID {
serializer.registerDSFID(PR_DUMP_B2N_REPLY_MESSAGE, DumpB2NReplyMessage.class);
serializer.registerDSFID(DESTROY_PARTITIONED_REGION_MESSAGE,
DestroyPartitionedRegionMessage.class);
+ serializer.registerDSFID(CLEAR_PARTITIONED_REGION_MESSAGE,
+ PartitionedRegionClearMessage.class);
+ serializer.registerDSFID(CLEAR_PARTITIONED_REGION_REPLY_MESSAGE,
+ PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage.class);
serializer.registerDSFID(INVALIDATE_PARTITIONED_REGION_MESSAGE,
InvalidatePartitionedRegionMessage.class);
serializer.registerDSFID(COMMIT_PROCESS_QUERY_MESSAGE, CommitProcessQueryMessage.class);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
index e4045c3..6cba754 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
@@ -1622,7 +1622,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
/**
* Returns true if the a primary is known.
*/
- private boolean hasPrimary() {
+ protected boolean hasPrimary() {
final byte primaryState = this.primaryState;
return primaryState == OTHER_PRIMARY_NOT_HOSTING || primaryState == OTHER_PRIMARY_HOSTING
|| primaryState == IS_PRIMARY_HOSTING;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 416c058..3008cb3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -578,8 +578,13 @@ public class BucketRegion extends DistributedRegion implements Bucket {
// get rvvLock
Set<InternalDistributedMember> participants =
getCacheDistributionAdvisor().adviseInvalidateRegion();
+ boolean isLockedAlready = this.partitionedRegion.getPartitionedRegionClear()
+ .isLockedForListenerAndClientNotification();
+
try {
- obtainWriteLocksForClear(regionEvent, participants);
+ if (!isLockedAlready) {
+ obtainWriteLocksForClear(regionEvent, participants);
+ }
// no need to dominate my own rvv.
// Clear is on going here, there won't be GII for this member
clearRegionLocally(regionEvent, cacheWrite, null);
@@ -587,7 +592,9 @@ public class BucketRegion extends DistributedRegion implements Bucket {
// TODO: call reindexUserDataRegion if there're lucene indexes
} finally {
- releaseWriteLocksForClear(regionEvent, participants);
+ if (!isLockedAlready) {
+ releaseWriteLocksForClear(regionEvent, participants);
+ }
}
}
@@ -2512,4 +2519,10 @@ public class BucketRegion extends DistributedRegion implements Bucket {
void checkSameSenderIdsAvailableOnAllNodes() {
// nothing needed on a bucket region
}
+
+ @Override
+ protected void basicClear(RegionEventImpl regionEvent) {
+ basicClear(regionEvent, false);
+ }
+
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
index 4396581..25cc2f5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
@@ -207,7 +207,7 @@ public class DistributedClearOperation extends DistributedCacheOperation {
protected boolean operateOnRegion(CacheEvent event, ClusterDistributionManager dm)
throws EntryNotFoundException {
- DistributedRegion region = (DistributedRegion) event.getRegion();
+ LocalRegion region = (LocalRegion) event.getRegion();
switch (this.clearOp) {
case OP_CLEAR:
region.clearRegionLocally((RegionEventImpl) event, false, this.rvv);
@@ -215,9 +215,11 @@ public class DistributedClearOperation extends DistributedCacheOperation {
this.appliedOperation = true;
break;
case OP_LOCK_FOR_CLEAR:
- if (region.getDataPolicy().withStorage()) {
- DistributedClearOperation.regionLocked(this.getSender(), region.getFullPath(), region);
- region.lockLocallyForClear(dm, this.getSender(), event);
+ if (region.getDataPolicy().withStorage() && region instanceof DistributedRegion) {
+ DistributedRegion distributedRegion = (DistributedRegion) region;
+ DistributedClearOperation.regionLocked(this.getSender(), region.getFullPath(),
+ distributedRegion);
+ distributedRegion.lockLocallyForClear(dm, this.getSender(), event);
}
this.appliedOperation = true;
break;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 84b5a3b..d0035fa 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2130,7 +2130,13 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
*/
protected void releaseWriteLocksForClear(RegionEventImpl regionEvent,
Set<InternalDistributedMember> participants) {
+ releaseLockLocallyForClear(regionEvent);
+ if (!isUsedForPartitionedRegionBucket()) {
+ DistributedClearOperation.releaseLocks(regionEvent, participants);
+ }
+ }
+ protected void releaseLockLocallyForClear(RegionEventImpl regionEvent) {
ARMLockTestHook armLockTestHook = getRegionMap().getARMLockTestHook();
if (armLockTestHook != null) {
armLockTestHook.beforeRelease(this, regionEvent);
@@ -2140,9 +2146,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
if (rvv != null) {
rvv.unlockForClear(getMyId());
}
- if (!isUsedForPartitionedRegionBucket()) {
- DistributedClearOperation.releaseLocks(regionEvent, participants);
- }
if (armLockTestHook != null) {
armLockTestHook.afterRelease(this, regionEvent);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
index 876353f..8ade506 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
@@ -466,4 +466,7 @@ public interface InternalRegion extends Region, HasCachePerfStats, RegionEntryCo
boolean isRegionCreateNotified();
void setRegionCreateNotified(boolean notified);
+
+ void clearRegionLocally(RegionEventImpl regionEvent, boolean cacheWrite,
+ RegionVersionVector vector);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 94d4563..623d596 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -8470,7 +8470,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
* will not take distributedLock. The clear operation will also clear the local transactional
* entries. The clear operation will have immediate committed state.
*/
- void clearRegionLocally(RegionEventImpl regionEvent, boolean cacheWrite,
+ @Override
+ public void clearRegionLocally(RegionEventImpl regionEvent, boolean cacheWrite,
RegionVersionVector vector) {
final boolean isRvvDebugEnabled = logger.isTraceEnabled(LogMarker.RVV_VERBOSE);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 950ec63..671d27b 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -322,6 +322,8 @@ public class PartitionedRegion extends LocalRegion
}
};
+ private final PartitionedRegionClear partitionedRegionClear = new PartitionedRegionClear(this);
+
/**
* Global Region for storing PR config ( PRName->PRConfig). This region would be used to resolve
* PR name conflict.*
@@ -2174,198 +2176,6 @@ public class PartitionedRegion extends LocalRegion
throw new UnsupportedOperationException();
}
- @Override
- void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) {
- final boolean isDebugEnabled = logger.isDebugEnabled();
- synchronized (clearLock) {
- final DistributedLockService lockService = getPartitionedRegionLockService();
- try {
- lockService.lock("_clearOperation" + this.getFullPath().replace('/', '_'), -1, -1);
- } catch (IllegalStateException e) {
- lockCheckReadiness();
- throw e;
- }
- try {
- if (cache.isCacheAtShutdownAll()) {
- throw cache.getCacheClosedException("Cache is shutting down");
- }
-
- // do cacheWrite
- cacheWriteBeforeRegionClear(regionEvent);
-
- // create ClearPRMessage per bucket
- List<ClearPRMessage> clearMsgList = createClearPRMessages(regionEvent.getEventId());
- for (ClearPRMessage clearPRMessage : clearMsgList) {
- int bucketId = clearPRMessage.getBucketId();
- checkReadiness();
- long sendMessagesStartTime = 0;
- if (isDebugEnabled) {
- sendMessagesStartTime = System.currentTimeMillis();
- }
- try {
- sendClearMsgByBucket(bucketId, clearPRMessage);
- } catch (PartitionOfflineException poe) {
- // TODO add a PartialResultException
- logger.info("PR.sendClearMsgByBucket encountered PartitionOfflineException at bucket "
- + bucketId, poe);
- } catch (Exception e) {
- logger.info("PR.sendClearMsgByBucket encountered exception at bucket " + bucketId, e);
- }
-
- if (isDebugEnabled) {
- long now = System.currentTimeMillis();
- logger.debug("PR.sendClearMsgByBucket for bucket {} took {} ms", bucketId,
- (now - sendMessagesStartTime));
- }
- // TODO add psStats
- }
- } finally {
- try {
- lockService.unlock("_clearOperation" + this.getFullPath().replace('/', '_'));
- } catch (IllegalStateException e) {
- lockCheckReadiness();
- }
- }
-
- // notify bridge clients at PR level
- regionEvent.setEventType(EnumListenerEvent.AFTER_REGION_CLEAR);
- boolean hasListener = hasListener();
- if (hasListener) {
- dispatchListenerEvent(EnumListenerEvent.AFTER_REGION_CLEAR, regionEvent);
- }
- notifyBridgeClients(regionEvent);
- logger.info("Partitioned region {} finsihed clear operation.", this.getFullPath());
- }
- }
-
- void sendClearMsgByBucket(final Integer bucketId, ClearPRMessage clearPRMessage) {
- RetryTimeKeeper retryTime = null;
- InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId, null);
- if (logger.isDebugEnabled()) {
- logger.debug("PR.sendClearMsgByBucket:bucket {}'s currentTarget is {}", bucketId,
- currentTarget);
- }
-
- long timeOut = 0;
- int count = 0;
- while (true) {
- switch (count) {
- case 0:
- // Note we don't check for DM cancellation in common case.
- // First time. Assume success, keep going.
- break;
- case 1:
- this.cache.getCancelCriterion().checkCancelInProgress(null);
- // Second time (first failure). Calculate timeout and keep going.
- timeOut = System.currentTimeMillis() + this.retryTimeout;
- break;
- default:
- this.cache.getCancelCriterion().checkCancelInProgress(null);
- // test for timeout
- long timeLeft = timeOut - System.currentTimeMillis();
- if (timeLeft < 0) {
- PRHARedundancyProvider.timedOut(this, null, null, "clear a bucket" + bucketId,
- this.retryTimeout);
- // NOTREACHED
- }
-
- // Didn't time out. Sleep a bit and then continue
- boolean interrupted = Thread.interrupted();
- try {
- Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION);
- } catch (InterruptedException ignore) {
- interrupted = true;
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- break;
- } // switch
- count++;
-
- if (currentTarget == null) { // pick target
- checkReadiness();
- if (retryTime == null) {
- retryTime = new RetryTimeKeeper(this.retryTimeout);
- }
-
- currentTarget = waitForNodeOrCreateBucket(retryTime, null, bucketId, false);
- if (currentTarget == null) {
- // the bucket does not exist, no need to clear
- logger.info("Bucket " + bucketId + " does not contain data, no need to clear");
- return;
- } else {
- if (logger.isDebugEnabled()) {
- logger.debug("PR.sendClearMsgByBucket: new currentTarget is {}", currentTarget);
- }
- }
-
- // It's possible this is a GemFire thread e.g. ServerConnection
- // which got to this point because of a distributed system shutdown or
- // region closure which uses interrupt to break any sleep() or wait() calls
- // e.g. waitForPrimary or waitForBucketRecovery in which case throw exception
- checkShutdown();
- continue;
- } // pick target
-
- boolean result = false;
- try {
- final boolean isLocal = (this.localMaxMemory > 0) && currentTarget.equals(getMyId());
- if (isLocal) {
- result = clearPRMessage.doLocalClear(this);
- } else {
- ClearPRMessage.ClearResponse response = clearPRMessage.send(currentTarget, this);
- if (response != null) {
- this.prStats.incPartitionMessagesSent();
- result = response.waitForResult();
- }
- }
- if (result) {
- return;
- }
- } catch (ForceReattemptException fre) {
- checkReadiness();
- InternalDistributedMember lastTarget = currentTarget;
- if (retryTime == null) {
- retryTime = new RetryTimeKeeper(this.retryTimeout);
- }
- currentTarget = getNodeForBucketWrite(bucketId, retryTime);
- if (lastTarget.equals(currentTarget)) {
- if (logger.isDebugEnabled()) {
- logger.debug("PR.sendClearMsgByBucket: Retrying at the same node:{} due to {}",
- currentTarget, fre.getMessage());
- }
- if (retryTime.overMaximum()) {
- PRHARedundancyProvider.timedOut(this, null, null, "clear a bucket",
- this.retryTimeout);
- // NOTREACHED
- }
- retryTime.waitToRetryNode();
- } else {
- if (logger.isDebugEnabled()) {
- logger.debug("PR.sendClearMsgByBucket: Old target was {}, Retrying {}", lastTarget,
- currentTarget);
- }
- }
- }
-
- // It's possible this is a GemFire thread e.g. ServerConnection
- // which got to this point because of a distributed system shutdown or
- // region closure which uses interrupt to break any sleep() or wait()
- // calls
- // e.g. waitForPrimary or waitForBucketRecovery in which case throw
- // exception
- checkShutdown();
-
- // If we get here, the attempt failed...
- if (count == 1) {
- // TODO prStats add ClearPRMsg retried
- this.prStats.incPutAllMsgsRetried();
- }
- }
- }
-
List<ClearPRMessage> createClearPRMessages(EventID eventID) {
ArrayList<ClearPRMessage> clearMsgList = new ArrayList<>();
for (int bucketId = 0; bucketId < getTotalNumberOfBuckets(); bucketId++) {
@@ -10437,4 +10247,27 @@ public class PartitionedRegion extends LocalRegion
this.getSystem().handleResourceEvent(ResourceEvent.REGION_CREATE, this);
this.regionCreationNotified = true;
}
+
+ protected PartitionedRegionClear getPartitionedRegionClear() {
+ return partitionedRegionClear;
+ }
+
+ @Override
+ void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) {
+ // Synchronized to avoid other threads invoking clear on this vm/node.
+ synchronized (clearLock) {
+ partitionedRegionClear.doClear(regionEvent, cacheWrite, this);
+ }
+ }
+
+ boolean hasAnyClientsInterested() {
+ // Check local filter
+ if (getFilterProfile() != null && (getFilterProfile().hasInterest() || getFilterProfile()
+ .hasCQs())) {
+ return true;
+ }
+ // check peer server filters
+ return (getRegionAdvisor().hasPRServerWithInterest()
+ || getRegionAdvisor().hasPRServerWithCQs());
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
new file mode 100644
index 0000000..69277ef
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelException;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.OperationAbortedException;
+import org.apache.geode.cache.PartitionedRegionPartialClearException;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.MembershipListener;
+import org.apache.geode.distributed.internal.ReplyException;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.versions.RegionVersionVector;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+public class PartitionedRegionClear {
+
+ private static final Logger logger = LogService.getLogger();
+
+ private static final String CLEAR_OPERATION = "_clearOperation";
+
+ private final int retryTime = 2 * 60 * 1000;
+
+ private final PartitionedRegion partitionedRegion;
+
+ private final LockForListenerAndClientNotification lockForListenerAndClientNotification =
+ new LockForListenerAndClientNotification();
+
+ private volatile boolean membershipChange = false;
+
+ public PartitionedRegionClear(PartitionedRegion partitionedRegion) {
+ this.partitionedRegion = partitionedRegion;
+ partitionedRegion.getDistributionManager()
+ .addMembershipListener(new PartitionedRegionClearListener());
+ }
+
+ public boolean isLockedForListenerAndClientNotification() {
+ return lockForListenerAndClientNotification.isLocked();
+ }
+
+ void acquireDistributedClearLock(String clearLock) {
+ try {
+ partitionedRegion.getPartitionedRegionLockService().lock(clearLock, -1, -1);
+ } catch (IllegalStateException e) {
+ partitionedRegion.lockCheckReadiness();
+ throw e;
+ }
+ }
+
+ void releaseDistributedClearLock(String clearLock) {
+ try {
+ partitionedRegion.getPartitionedRegionLockService().unlock(clearLock);
+ } catch (IllegalStateException e) {
+ partitionedRegion.lockCheckReadiness();
+ } catch (Exception ex) {
+ logger.warn("Caught exception while unlocking clear distributed lock. " + ex.getMessage());
+ }
+ }
+
+ void obtainLockForClear(RegionEventImpl event) {
+ obtainClearLockLocal(partitionedRegion.getDistributionManager().getId());
+ sendPartitionedRegionClearMessage(event,
+ PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR);
+ }
+
+ void releaseLockForClear(RegionEventImpl event) {
+ releaseClearLockLocal();
+ sendPartitionedRegionClearMessage(event,
+ PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR);
+ }
+
+ List clearRegion(RegionEventImpl regionEvent, boolean cacheWrite,
+ RegionVersionVector vector) {
+ List allBucketsCleared = new ArrayList();
+ allBucketsCleared.addAll(clearRegionLocal(regionEvent));
+ allBucketsCleared.addAll(sendPartitionedRegionClearMessage(regionEvent,
+ PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR));
+ return allBucketsCleared;
+ }
+
+ private void waitForPrimary() {
+ boolean retry;
+ PartitionedRegion.RetryTimeKeeper retryTimer = new PartitionedRegion.RetryTimeKeeper(retryTime);
+ do {
+ retry = false;
+ for (BucketRegion bucketRegion : partitionedRegion.getDataStore()
+ .getAllLocalBucketRegions()) {
+ if (!bucketRegion.getBucketAdvisor().hasPrimary()) {
+ if (retryTimer.overMaximum()) {
+ throw new PartitionedRegionPartialClearException(
+ "Unable to find primary bucket region during clear operation for region: " +
+ partitionedRegion.getName());
+ }
+ retryTimer.waitForBucketsRecovery();
+ retry = true;
+ }
+ }
+ } while (retry);
+ }
+
+ public ArrayList clearRegionLocal(RegionEventImpl regionEvent) {
+ ArrayList clearedBuckets = new ArrayList();
+ membershipChange = false;
+ // Synchronized to handle the requester departure.
+ synchronized (lockForListenerAndClientNotification) {
+ if (partitionedRegion.getDataStore() != null) {
+ partitionedRegion.getDataStore().lockBucketCreationForRegionClear();
+ try {
+ boolean retry;
+ do {
+ waitForPrimary();
+
+ for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore()
+ .getAllLocalPrimaryBucketRegions()) {
+ if (localPrimaryBucketRegion.size() > 0) {
+ localPrimaryBucketRegion.clear();
+ }
+ clearedBuckets.add(localPrimaryBucketRegion.getId());
+ }
+
+ if (membershipChange) {
+ membershipChange = false;
+ retry = true;
+ } else {
+ retry = false;
+ }
+
+ } while (retry);
+ doAfterClear(regionEvent);
+ } finally {
+ partitionedRegion.getDataStore().unlockBucketCreationForRegionClear();
+ }
+ } else {
+ // Non data-store with client queue and listener
+ doAfterClear(regionEvent);
+ }
+ }
+ return clearedBuckets;
+ }
+
+ private void doAfterClear(RegionEventImpl regionEvent) {
+ if (partitionedRegion.hasAnyClientsInterested()) {
+ notifyClients(regionEvent);
+ }
+
+ if (partitionedRegion.hasListener()) {
+ partitionedRegion.dispatchListenerEvent(EnumListenerEvent.AFTER_REGION_CLEAR, regionEvent);
+ }
+ }
+
+ void notifyClients(RegionEventImpl event) {
+ // Set client routing information into the event
+ // The clear operation in case of PR is distributed differently
+ // hence the FilterRoutingInfo is set here instead of
+ // DistributedCacheOperation.distribute().
+ event.setEventType(EnumListenerEvent.AFTER_REGION_CLEAR);
+ if (!partitionedRegion.isUsedForMetaRegion() && !partitionedRegion
+ .isUsedForPartitionedRegionAdmin()
+ && !partitionedRegion.isUsedForPartitionedRegionBucket() && !partitionedRegion
+ .isUsedForParallelGatewaySenderQueue()) {
+
+ FilterRoutingInfo localCqFrInfo =
+ partitionedRegion.getFilterProfile().getFilterRoutingInfoPart1(event,
+ FilterProfile.NO_PROFILES, Collections.emptySet());
+
+ FilterRoutingInfo localCqInterestFrInfo =
+ partitionedRegion.getFilterProfile().getFilterRoutingInfoPart2(localCqFrInfo, event);
+
+ if (localCqInterestFrInfo != null) {
+ event.setLocalFilterInfo(localCqInterestFrInfo.getLocalFilterInfo());
+ }
+ }
+ partitionedRegion.notifyBridgeClients(event);
+ }
+
+ protected void obtainClearLockLocal(InternalDistributedMember requester) {
+ synchronized (lockForListenerAndClientNotification) {
+ // Check if the member is still part of the distributed system
+ if (!partitionedRegion.getDistributionManager().isCurrentMember(requester)) {
+ return;
+ }
+
+ lockForListenerAndClientNotification.setLocked(requester);
+ if (partitionedRegion.getDataStore() != null) {
+ for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore()
+ .getAllLocalPrimaryBucketRegions()) {
+ try {
+ localPrimaryBucketRegion.lockLocallyForClear(partitionedRegion.getDistributionManager(),
+ partitionedRegion.getMyId(), null);
+ } catch (Exception ex) {
+ partitionedRegion.checkClosed();
+ }
+ }
+ }
+ }
+ }
+
+ protected void releaseClearLockLocal() {
+ synchronized (lockForListenerAndClientNotification) {
+ if (lockForListenerAndClientNotification.getLockRequester() == null) {
+ // The member has left.
+ return;
+ }
+ try {
+ if (partitionedRegion.getDataStore() != null) {
+
+ for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore()
+ .getAllLocalPrimaryBucketRegions()) {
+ try {
+ localPrimaryBucketRegion.releaseLockLocallyForClear(null);
+ } catch (Exception ex) {
+ logger.debug(
+ "Unable to acquire clear lock for bucket region " + localPrimaryBucketRegion
+ .getName(),
+ ex.getMessage());
+ partitionedRegion.checkClosed();
+ }
+ }
+ }
+ } finally {
+ lockForListenerAndClientNotification.setUnLocked();
+ }
+ }
+ }
+
+ private List sendPartitionedRegionClearMessage(RegionEventImpl event,
+ PartitionedRegionClearMessage.OperationType op) {
+ RegionEventImpl eventForLocalClear = (RegionEventImpl) event.clone();
+ eventForLocalClear.setOperation(Operation.REGION_LOCAL_CLEAR);
+
+ do {
+ try {
+ return attemptToSendPartitionedRegionClearMessage(event, op);
+ } catch (ForceReattemptException reattemptException) {
+ // retry
+ }
+ } while (true);
+ }
+
+ private List attemptToSendPartitionedRegionClearMessage(RegionEventImpl event,
+ PartitionedRegionClearMessage.OperationType op)
+ throws ForceReattemptException {
+ List bucketsOperated = null;
+
+ if (partitionedRegion.getPRRoot() == null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Partition region {} failed to initialize. Remove its profile from remote members.",
+ this.partitionedRegion);
+ }
+ new UpdateAttributesProcessor(partitionedRegion, true).distribute(false);
+ return bucketsOperated;
+ }
+
+ final HashSet configRecipients =
+ new HashSet(partitionedRegion.getRegionAdvisor().adviseAllPRNodes());
+
+ try {
+ final PartitionRegionConfig prConfig =
+ partitionedRegion.getPRRoot().get(partitionedRegion.getRegionIdentifier());
+
+ if (prConfig != null) {
+ Iterator itr = prConfig.getNodes().iterator();
+ while (itr.hasNext()) {
+ InternalDistributedMember idm = ((Node) itr.next()).getMemberId();
+ if (!idm.equals(partitionedRegion.getMyId())) {
+ configRecipients.add(idm);
+ }
+ }
+ }
+ } catch (CancelException ignore) {
+ // ignore
+ }
+
+ try {
+ PartitionedRegionClearMessage.PartitionedRegionClearResponse resp =
+ new PartitionedRegionClearMessage.PartitionedRegionClearResponse(
+ partitionedRegion.getSystem(),
+ configRecipients);
+ PartitionedRegionClearMessage partitionedRegionClearMessage =
+ new PartitionedRegionClearMessage(configRecipients, partitionedRegion, resp, op, event);
+ partitionedRegionClearMessage.send();
+
+ resp.waitForRepliesUninterruptibly();
+ bucketsOperated = resp.bucketsCleared;
+
+ } catch (ReplyException e) {
+ Throwable t = e.getCause();
+ if (t instanceof ForceReattemptException) {
+ throw (ForceReattemptException) t;
+ }
+ if (t instanceof PartitionedRegionPartialClearException) {
+ throw new PartitionedRegionPartialClearException(t.getMessage(), t);
+ }
+ logger.warn(
+ "PartitionedRegionClear#sendPartitionedRegionClearMessage: Caught exception during ClearRegionMessage send and waiting for response",
+ e);
+ }
+ return bucketsOperated;
+ }
+
+ void doClear(RegionEventImpl regionEvent, boolean cacheWrite,
+ PartitionedRegion partitionedRegion) {
+ String lockName = CLEAR_OPERATION + partitionedRegion.getDisplayName();
+
+ try {
+ // distributed lock to make sure only one clear op is in progress in the cluster.
+ acquireDistributedClearLock(lockName);
+
+ // Force all primary buckets to be created before clear.
+ PartitionRegionHelper.assignBucketsToPartitions(partitionedRegion);
+
+ // do cacheWrite
+ try {
+ partitionedRegion.cacheWriteBeforeRegionClear(regionEvent);
+ } catch (OperationAbortedException operationAbortedException) {
+ throw new CacheWriterException(operationAbortedException);
+ }
+
+ // Check if there are any listeners or clients interested. If so, then clear write
+ // locks needs to be taken on all local and remote primary buckets in order to
+ // preserve the ordering of client events (for concurrent operations on the region).
+ boolean acquireClearLockForClientNotification =
+ (partitionedRegion.hasAnyClientsInterested() && partitionedRegion.hasListener());
+ if (acquireClearLockForClientNotification) {
+ obtainLockForClear(regionEvent);
+ }
+ try {
+ List bucketsCleared = clearRegion(regionEvent, cacheWrite, null);
+
+ if (partitionedRegion.getTotalNumberOfBuckets() != bucketsCleared.size()) {
+ String message = "Unable to clear all the buckets from the partitioned region "
+ + partitionedRegion.getName()
+ + ", either data (buckets) moved or member departed.";
+
+ logger.warn(message + " expected to clear number of buckets: "
+ + partitionedRegion.getTotalNumberOfBuckets() +
+ " actual cleared: " + bucketsCleared.size());
+
+ throw new PartitionedRegionPartialClearException(message);
+ }
+ } finally {
+ if (acquireClearLockForClientNotification) {
+ releaseLockForClear(regionEvent);
+ }
+ }
+
+ } finally {
+ releaseDistributedClearLock(lockName);
+ }
+ }
+
+ void handleClearFromDepartedMember(InternalDistributedMember departedMember) {
+ if (departedMember.equals(lockForListenerAndClientNotification.getLockRequester())) {
+ synchronized (lockForListenerAndClientNotification) {
+ if (lockForListenerAndClientNotification.getLockRequester() != null) {
+ releaseClearLockLocal();
+ }
+ }
+ }
+ }
+
+ class LockForListenerAndClientNotification {
+
+ private boolean locked = false;
+
+ private InternalDistributedMember lockRequester;
+
+ synchronized void setLocked(InternalDistributedMember member) {
+ locked = true;
+ lockRequester = member;
+ }
+
+ synchronized void setUnLocked() {
+ locked = false;
+ lockRequester = null;
+ }
+
+ synchronized boolean isLocked() {
+ return locked;
+ }
+
+ synchronized InternalDistributedMember getLockRequester() {
+ return lockRequester;
+ }
+ }
+
+ protected class PartitionedRegionClearListener implements MembershipListener {
+
+ @Override
+ public synchronized void memberDeparted(DistributionManager distributionManager,
+ InternalDistributedMember id, boolean crashed) {
+ membershipChange = true;
+ handleClearFromDepartedMember(id);
+ }
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java
new file mode 100755
index 0000000..b66ab44
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.ReplyException;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
+import org.apache.geode.distributed.internal.ReplySender;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.NanoTimer;
+import org.apache.geode.internal.cache.partitioned.PartitionMessage;
+import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+public class PartitionedRegionClearMessage extends PartitionMessage {
+
+ public enum OperationType {
+ OP_LOCK_FOR_PR_CLEAR, OP_UNLOCK_FOR_PR_CLEAR, OP_PR_CLEAR,
+ }
+
+ private Object cbArg;
+
+ private OperationType op;
+
+ private EventID eventID;
+
+ private PartitionedRegion partitionedRegion;
+
+ private ArrayList bucketsCleared;
+
+ @Override
+ public EventID getEventID() {
+ return eventID;
+ }
+
+ public PartitionedRegionClearMessage() {}
+
+ PartitionedRegionClearMessage(Set recipients, PartitionedRegion region,
+ ReplyProcessor21 processor, PartitionedRegionClearMessage.OperationType operationType,
+ final RegionEventImpl event) {
+ super(recipients, region.getPRId(), processor);
+ partitionedRegion = region;
+ op = operationType;
+ cbArg = event.getRawCallbackArgument();
+ eventID = event.getEventId();
+ }
+
+ public OperationType getOp() {
+ return op;
+ }
+
+ public void send() {
+ Assert.assertTrue(getRecipients() != null, "ClearMessage NULL recipients set");
+ setTransactionDistributed(partitionedRegion.getCache().getTxManager().isDistributed());
+ partitionedRegion.getDistributionManager().putOutgoing(this);
+ }
+
+ @Override
+ protected Throwable processCheckForPR(PartitionedRegion pr,
+ DistributionManager distributionManager) {
+ if (pr != null && !pr.getDistributionAdvisor().isInitialized()) {
+ Throwable thr = new ForceReattemptException(
+ String.format("%s : could not find partitioned region with Id %s",
+ distributionManager.getDistributionManagerId(),
+ pr.getRegionIdentifier()));
+ return thr;
+ }
+ return null;
+ }
+
+ @Override
+ protected boolean operateOnPartitionedRegion(ClusterDistributionManager dm,
+ PartitionedRegion partitionedRegion,
+ long startTime) throws CacheException {
+
+ if (partitionedRegion == null) {
+ return true;
+ }
+
+ if (partitionedRegion.isDestroyed()) {
+ return true;
+ }
+
+ if (op == OperationType.OP_LOCK_FOR_PR_CLEAR) {
+ partitionedRegion.getPartitionedRegionClear().obtainClearLockLocal(getSender());
+ } else if (op == OperationType.OP_UNLOCK_FOR_PR_CLEAR) {
+ partitionedRegion.getPartitionedRegionClear().releaseClearLockLocal();
+ } else {
+ RegionEventImpl event =
+ new RegionEventImpl(partitionedRegion, Operation.REGION_CLEAR, this.cbArg, true,
+ partitionedRegion.getMyId(),
+ getEventID());
+ bucketsCleared = partitionedRegion.getPartitionedRegionClear().clearRegionLocal(event);
+ }
+ return true;
+ }
+
+ @Override
+ protected void appendFields(StringBuilder buff) {
+ super.appendFields(buff);
+ buff.append(" cbArg=").append(this.cbArg).append(" op=").append(this.op);
+ }
+
+ @Override
+ public int getDSFID() {
+ return CLEAR_PARTITIONED_REGION_MESSAGE;
+ }
+
+ @Override
+ public void fromData(DataInput in,
+ DeserializationContext context) throws IOException, ClassNotFoundException {
+ super.fromData(in, context);
+ this.cbArg = DataSerializer.readObject(in);
+ op = PartitionedRegionClearMessage.OperationType.values()[in.readByte()];
+ eventID = DataSerializer.readObject(in);
+ }
+
+ @Override
+ public void toData(DataOutput out,
+ SerializationContext context) throws IOException {
+ super.toData(out, context);
+ DataSerializer.writeObject(this.cbArg, out);
+ out.writeByte(op.ordinal());
+ DataSerializer.writeObject(eventID, out);
+ }
+
+ /**
+ * The response on which to wait for all the replies. This response ignores any exceptions
+ * received from the "far side"
+ */
+ public static class PartitionedRegionClearResponse extends ReplyProcessor21 {
+ CopyOnWriteArrayList bucketsCleared = new CopyOnWriteArrayList();
+
+ public PartitionedRegionClearResponse(InternalDistributedSystem system, Set initMembers) {
+ super(system, initMembers);
+ }
+
+ @Override
+ public void process(DistributionMessage msg) {
+ if (msg instanceof PartitionedRegionClearReplyMessage) {
+ List buckets = ((PartitionedRegionClearReplyMessage) msg).bucketsCleared;
+ if (buckets != null) {
+ bucketsCleared.addAll(buckets);
+ }
+ }
+ super.process(msg, true);
+ }
+ }
+
+ @Override
+ protected void sendReply(InternalDistributedMember member, int processorId,
+ DistributionManager distributionManager, ReplyException ex,
+ PartitionedRegion partitionedRegion, long startTime) {
+ if (partitionedRegion != null) {
+ if (startTime > 0) {
+ partitionedRegion.getPrStats().endPartitionMessagesProcessing(startTime);
+ }
+ }
+ PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage
+ .send(member, processorId, getReplySender(distributionManager), op, bucketsCleared,
+ ex);
+ }
+
+ public static class PartitionedRegionClearReplyMessage extends ReplyMessage {
+
+ private ArrayList bucketsCleared;
+
+ private OperationType op;
+
+ @Override
+ public boolean getInlineProcess() {
+ return true;
+ }
+
+ /**
+ * Empty constructor to conform to DataSerializable interface
+ */
+ public PartitionedRegionClearReplyMessage() {}
+
+ private PartitionedRegionClearReplyMessage(int processorId, OperationType op,
+ ArrayList bucketsCleared, ReplyException ex) {
+ super();
+ this.bucketsCleared = bucketsCleared;
+ this.op = op;
+ setProcessorId(processorId);
+ setException(ex);
+ }
+
+ /** Send an ack */
+ public static void send(InternalDistributedMember recipient, int processorId, ReplySender dm,
+ OperationType op, ArrayList bucketsCleared, ReplyException ex) {
+
+ Assert.assertTrue(recipient != null, "partitionedRegionClearReplyMessage NULL reply message");
+
+ PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage m =
+ new PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage(processorId, op,
+ bucketsCleared, ex);
+
+ m.setRecipient(recipient);
+ dm.putOutgoing(m);
+ }
+
+ /**
+ * Processes this message. This method is invoked by the receiver of the message.
+ *
+ * @param dm the distribution manager that is processing the message.
+ */
+ @Override
+ public void process(final DistributionManager dm, final ReplyProcessor21 rp) {
+ final long startTime = getTimestamp();
+
+ if (rp == null) {
+ if (LogService.getLogger().isTraceEnabled(LogMarker.DM_VERBOSE)) {
+ LogService.getLogger().trace(LogMarker.DM_VERBOSE, "{}: processor not found", this);
+ }
+ return;
+ }
+
+ rp.process(this);
+
+ dm.getStats().incReplyMessageTime(NanoTimer.getTime() - startTime);
+ }
+
+ @Override
+ public int getDSFID() {
+ return CLEAR_PARTITIONED_REGION_REPLY_MESSAGE;
+ }
+
+ @Override
+ public void fromData(DataInput in,
+ DeserializationContext context) throws IOException, ClassNotFoundException {
+ super.fromData(in, context);
+ op = PartitionedRegionClearMessage.OperationType.values()[in.readByte()];
+ bucketsCleared = DataSerializer.readArrayList(in);
+ }
+
+ @Override
+ public void toData(DataOutput out,
+ SerializationContext context) throws IOException {
+ super.toData(out, context);
+ out.writeByte(op.ordinal());
+ DataSerializer.writeArrayList(bucketsCleared, out);
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("PartitionedRegionClearReplyMessage ")
+ .append("processorId=").append(this.processorId)
+ .append(" sender=").append(sender)
+ .append(" bucketsCleared ").append(this.bucketsCleared)
+ .append(" exception=").append(getException());
+ return sb.toString();
+ }
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index 23a7487..578ed3e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -980,6 +980,14 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
}
}
+ protected void lockBucketCreationForRegionClear() {
+ bucketCreationLock.writeLock().lock();
+ }
+
+ protected void unlockBucketCreationForRegionClear() {
+ bucketCreationLock.writeLock().unlock();
+ }
+
/**
* Gets the total amount of memory in bytes allocated for all values for this PR in this VM. This
* is the current memory (MB) watermark for data in this PR.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
index 5d2ff24..13ad666 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
@@ -851,10 +851,21 @@ public class RegionAdvisor extends CacheDistributionAdvisor {
&& prof.filterProfile.hasInterest();
};
+ @Immutable
+ private static final Filter prServerWithCqFilter = profile -> {
+ CacheProfile prof = (CacheProfile) profile;
+ return prof.isPartitioned && prof.hasCacheServer && prof.filterProfile != null
+ && prof.filterProfile.hasCQs();
+ };
+
public boolean hasPRServerWithInterest() {
return satisfiesFilter(prServerWithInterestFilter);
}
+ public boolean hasPRServerWithCQs() {
+ return satisfiesFilter(prServerWithCqFilter);
+ }
+
/**
* return the set of all members who must receive operation notifications
*
diff --git a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
index 652d1b2..644fbc2 100644
--- a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
+++ b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
@@ -79,6 +79,7 @@ org/apache/geode/cache/NoSubscriptionServersAvailableException,true,848408601915
org/apache/geode/cache/Operation,true,-7521751729852504238,ordinal:byte
org/apache/geode/cache/OperationAbortedException,true,-8293166225026556949
org/apache/geode/cache/PartitionedRegionDistributionException,true,-3004093739855972548
+org/apache/geode/cache/PartitionedRegionPartialClearException,false
org/apache/geode/cache/PartitionedRegionStorageException,true,5905463619475329732
org/apache/geode/cache/RegionAccessException,true,3142958723089038406
org/apache/geode/cache/RegionDestroyedException,true,319804842308010754,regionFullPath:java/lang/String
@@ -302,6 +303,7 @@ org/apache/geode/internal/cache/PRContainsValueFunction,false
org/apache/geode/internal/cache/PRHARedundancyProvider$ArrayListWithClearState,true,1,wasCleared:boolean
org/apache/geode/internal/cache/PartitionedRegion$PRIdMap,true,3667357372967498179,cleared:boolean
org/apache/geode/internal/cache/PartitionedRegion$SizeEntry,false,isPrimary:boolean,size:int
+org/apache/geode/internal/cache/PartitionedRegionClearMessage$OperationType,false
org/apache/geode/internal/cache/PartitionedRegionDataStore$CreateBucketResult,false,nowExists:boolean
org/apache/geode/internal/cache/PartitionedRegionException,true,5113786059279106007
org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator$MemberResultsList,false,isLastChunkReceived:boolean
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
index c7cf5a6..d3397eb 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
@@ -51,7 +51,9 @@ public class BucketRegionJUnitTest extends DistributedRegionJUnitTest {
when(ba.getPrimaryMoveReadLock()).thenReturn(primaryMoveReadLock);
when(ba.getProxyBucketRegion()).thenReturn(mock(ProxyBucketRegion.class));
when(ba.isPrimary()).thenReturn(true);
-
+ PartitionedRegionClear clearPR = mock(PartitionedRegionClear.class);
+ when(clearPR.isLockedForListenerAndClientNotification()).thenReturn(true);
+ when(pr.getPartitionedRegionClear()).thenReturn(clearPR);
ira.setPartitionedRegion(pr).setPartitionedRegionBucketRedundancy(1).setBucketAdvisor(ba);
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
index 898c4f7..e02ba2c 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
@@ -58,7 +58,6 @@ import org.mockito.junit.MockitoRule;
import org.apache.geode.CancelCriterion;
import org.apache.geode.Statistics;
import org.apache.geode.cache.AttributesFactory;
-import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.CacheLoader;
import org.apache.geode.cache.CacheWriter;
import org.apache.geode.cache.Operation;
@@ -221,22 +220,6 @@ public class PartitionedRegionTest {
spyPartitionedRegion.clear();
}
- @Test(expected = CacheClosedException.class)
- public void clearShouldThrowCacheClosedExceptionIfShutdownAll() {
- PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
- RegionEventImpl regionEvent =
- new RegionEventImpl(spyPartitionedRegion, Operation.REGION_CLEAR, null, false,
- spyPartitionedRegion.getMyId(), true);
- when(cache.isCacheAtShutdownAll()).thenReturn(true);
- when(cache.getCacheClosedException("Cache is shutting down"))
- .thenReturn(new CacheClosedException("Cache is shutting down"));
- DistributedLockService lockService = mock(DistributedLockService.class);
- when(spyPartitionedRegion.getPartitionedRegionLockService()).thenReturn(lockService);
- String lockName = "_clearOperation" + spyPartitionedRegion.getFullPath().replace('/', '_');
- when(lockService.lock(lockName, -1, -1)).thenReturn(true);
- spyPartitionedRegion.basicClear(regionEvent, true);
- }
-
@Test
public void createClearPRMessagesShouldCreateMessagePerBucket() {
PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
@@ -249,28 +232,6 @@ public class PartitionedRegionTest {
assertThat(msgs.size()).isEqualTo(3);
}
- @Test
- public void sendEachMessagePerBucket() {
- PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
- RegionEventImpl regionEvent =
- new RegionEventImpl(spyPartitionedRegion, Operation.REGION_CLEAR, null, false,
- spyPartitionedRegion.getMyId(), true);
- when(cache.isCacheAtShutdownAll()).thenReturn(false);
- DistributedLockService lockService = mock(DistributedLockService.class);
- when(spyPartitionedRegion.getPartitionedRegionLockService()).thenReturn(lockService);
- when(spyPartitionedRegion.getTotalNumberOfBuckets()).thenReturn(3);
- String lockName = "_clearOperation" + spyPartitionedRegion.getFullPath().replace('/', '_');
- when(lockService.lock(lockName, -1, -1)).thenReturn(true);
- when(spyPartitionedRegion.hasListener()).thenReturn(true);
- doNothing().when(spyPartitionedRegion).dispatchListenerEvent(any(), any());
- doNothing().when(spyPartitionedRegion).notifyBridgeClients(eq(regionEvent));
- doNothing().when(spyPartitionedRegion).checkReadiness();
- doNothing().when(lockService).unlock(lockName);
- spyPartitionedRegion.basicClear(regionEvent, true);
- verify(spyPartitionedRegion, times(3)).sendClearMsgByBucket(any(), any());
- verify(spyPartitionedRegion, times(1)).dispatchListenerEvent(any(), any());
- verify(spyPartitionedRegion, times(1)).notifyBridgeClients(eq(regionEvent));
- }
@Test
public void getBucketNodeForReadOrWriteReturnsPrimaryNodeForRegisterInterest() {
diff --git a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
index 3598b5d..481c78c 100644
--- a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
+++ b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
@@ -56,6 +56,8 @@ public interface DataSerializableFixedID extends SerializationVersions, BasicSer
// NOTE, codes < -65536 will take 4 bytes to serialize
// NOTE, codes < -128 will take 2 bytes to serialize
+ short CLEAR_PARTITIONED_REGION_REPLY_MESSAGE = -166;
+ short CLEAR_PARTITIONED_REGION_MESSAGE = -165;
short PR_CLEAR_REPLY_MESSAGE = -164;
short PR_CLEAR_MESSAGE = -163;