You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2021/04/05 23:54:25 UTC
[geode] 25/28: GEODE-7679 Partitioned Region clear is successful
while region is being altered (#5516)
This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git
commit fdefc1a7aca1503fc6b542729f50f0268981caf0
Author: Jianxia Chen <11...@users.noreply.github.com>
AuthorDate: Tue Nov 3 09:57:49 2020 -0800
GEODE-7679 Partitioned Region clear is successful while region is being altered (#5516)
---
...itionedRegionClearWithAlterRegionDUnitTest.java | 803 +++++++++++++++++++++
1 file changed, 803 insertions(+)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithAlterRegionDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithAlterRegionDUnitTest.java
new file mode 100644
index 0000000..fb74eb3
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithAlterRegionDUnitTest.java
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.Serializable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.IntStream;
+
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.AttributesMutator;
+import org.apache.geode.cache.CacheListener;
+import org.apache.geode.cache.CacheLoader;
+import org.apache.geode.cache.CacheLoaderException;
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.ExpirationAction;
+import org.apache.geode.cache.ExpirationAttributes;
+import org.apache.geode.cache.LoaderHelper;
+import org.apache.geode.cache.PartitionedRegionPartialClearException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.DUnitBlackboard;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class PartitionedRegionClearWithAlterRegionDUnitTest implements Serializable {
+
+ @Rule
+ public DistributedRule distributedRule = new DistributedRule();
+
+ @Rule
+ public CacheRule cacheRule = new CacheRule();
+
+ @Rule
+ public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+
+ private VM server1;
+
+ private VM server2;
+
+ private VM server3;
+
+ private static volatile DUnitBlackboard blackboard;
+
+ private static final String REGION_NAME = "testRegion";
+
+ private static final int NUM_ENTRIES = 1000000;
+
+ private static final String GATE_NAME = "ALLOW_ALTER_REGION";
+
+ private void initialize() {
+ server1 = VM.getVM(0);
+ server2 = VM.getVM(1);
+
+ server1.invoke(() -> {
+ cacheRule.createCache();
+ cacheRule.getCache().createRegionFactory(RegionShortcut.PARTITION).setStatisticsEnabled(true)
+ .create(REGION_NAME);
+ });
+
+ server2.invoke(() -> {
+ cacheRule.createCache();
+ cacheRule.getCache().createRegionFactory(RegionShortcut.PARTITION).setStatisticsEnabled(true)
+ .create(REGION_NAME);
+ });
+
+ server1.invoke(() -> {
+ populateRegion();
+ Region region = cacheRule.getCache().getRegion(REGION_NAME);
+ assertThat(region.size()).isEqualTo(NUM_ENTRIES);
+ });
+
+ server2.invoke(() -> {
+ Region region = cacheRule.getCache().getRegion(REGION_NAME);
+ assertThat(region.size()).isEqualTo(NUM_ENTRIES);
+ });
+ }
+
+ @Test
+ public void testClearRegionWhileAddingCacheLoaderBeforeProcessMessage()
+ throws InterruptedException {
+ initialize();
+
+ server1.invoke(() -> DistributionMessageObserver.setInstance(
+ getDistributionMessageObserverBeforeProcessMessage()));
+
+ AsyncInvocation asyncInvocation1 = server1.invokeAsync(() -> {
+ alterRegionSetCacheLoader();
+ });
+
+ AsyncInvocation asyncInvocation2 = server2.invokeAsync(() -> {
+ cacheRule.getCache().getRegion(REGION_NAME).clear();
+ assertThat(cacheRule.getCache().getRegion(REGION_NAME).size()).isEqualTo(0);
+ });
+
+ asyncInvocation1.await();
+ asyncInvocation2.await();
+ }
+
+ @Test
+ public void testClearRegionWhileAddingCacheLoaderAfterProcessMessage()
+ throws InterruptedException {
+ initialize();
+
+ server1.invoke(() -> DistributionMessageObserver.setInstance(
+ getDistributionMessageObserverAfterProcessMessage()));
+
+ AsyncInvocation asyncInvocation1 = server1.invokeAsync(() -> {
+ alterRegionSetCacheLoader();
+ });
+
+ AsyncInvocation asyncInvocation2 = server2.invokeAsync(() -> {
+ cacheRule.getCache().getRegion(REGION_NAME).clear();
+ assertThat(cacheRule.getCache().getRegion(REGION_NAME).size()).isEqualTo(0);
+ });
+
+ asyncInvocation1.await();
+ asyncInvocation2.await();
+ }
+
+ @Test
+ public void testClearRegionWhileAddingCacheWriterBeforeProcessMessage()
+ throws InterruptedException {
+ initialize();
+
+ server1.invoke(() -> DistributionMessageObserver.setInstance(
+ getDistributionMessageObserverBeforeProcessMessage()));
+
+ AsyncInvocation asyncInvocation1 = server1.invokeAsync(() -> {
+ alterRegionSetCacheWriter();
+ });
+
+ AsyncInvocation asyncInvocation2 = server2.invokeAsync(() -> {
+ cacheRule.getCache().getRegion(REGION_NAME).clear();
+ assertThat(cacheRule.getCache().getRegion(REGION_NAME).size()).isEqualTo(0);
+ });
+
+ asyncInvocation1.await();
+ asyncInvocation2.await();
+ }
+
+ @Test
+ public void testClearRegionWhileAddingCacheWriterAfterProcessMessage()
+ throws InterruptedException {
+ initialize();
+
+ server1.invoke(() -> DistributionMessageObserver.setInstance(
+ getDistributionMessageObserverAfterProcessMessage()));
+
+ AsyncInvocation asyncInvocation1 = server1.invokeAsync(() -> {
+ alterRegionSetCacheWriter();
+ });
+
+ AsyncInvocation asyncInvocation2 = server2.invokeAsync(() -> {
+ cacheRule.getCache().getRegion(REGION_NAME).clear();
+ assertThat(cacheRule.getCache().getRegion(REGION_NAME).size()).isEqualTo(0);
+ });
+
+ asyncInvocation1.await();
+ asyncInvocation2.await();
+ }
+
+ @Test
+ public void testClearRegionWhileAddingCacheListenerBeforeProcessMessage()
+ throws InterruptedException {
+ initialize();
+
+ server1.invoke(() -> DistributionMessageObserver.setInstance(
+ getDistributionMessageObserverBeforeProcessMessage()));
+
+ AsyncInvocation asyncInvocation1 = server1.invokeAsync(() -> {
+ alterRegionSetCacheListener();
+ });
+
+ AsyncInvocation asyncInvocation2 = server2.invokeAsync(() -> {
+ cacheRule.getCache().getRegion(REGION_NAME).clear();
+ assertThat(cacheRule.getCache().getRegion(REGION_NAME).size()).isEqualTo(0);
+ });
+
+ asyncInvocation1.await();
+ asyncInvocation2.await();
+ }
+
+ @Test
+ public void testClearRegionWhileAddingCacheListenerAfterProcessMessage()
+ throws InterruptedException {
+ initialize();
+
+ server1.invoke(() -> DistributionMessageObserver.setInstance(
+ getDistributionMessageObserverAfterProcessMessage()));
+
+ AsyncInvocation asyncInvocation1 = server1.invokeAsync(() -> {
+ alterRegionSetCacheListener();
+ });
+
+ AsyncInvocation asyncInvocation2 = server2.invokeAsync(() -> {
+ cacheRule.getCache().getRegion(REGION_NAME).clear();
+ assertThat(cacheRule.getCache().getRegion(REGION_NAME).size()).isEqualTo(0);
+ });
+
+ asyncInvocation1.await();
+ asyncInvocation2.await();
+ }
+
+ @Test
+ public void testClearRegionWhileChangingEvictionBeforeProcessMessage()
+ throws InterruptedException {
+ initialize();
+
+ server1.invoke(() -> DistributionMessageObserver.setInstance(
+ getDistributionMessageObserverBeforeProcessMessage()));
+
+ AsyncInvocation asyncInvocation1 = server1.invokeAsync(() -> {
+ Region region = cacheRule.getCache().getRegion(REGION_NAME);
+ AttributesMutator attributesMutator = region.getAttributesMutator();
+ getBlackboard().waitForGate(GATE_NAME);
+ attributesMutator.getEvictionAttributesMutator().setMaximum(1);
+ assertThat(region.getAttributes().getEvictionAttributes().getMaximum()).isEqualTo(1);
+ });
+
+ AsyncInvocation asyncInvocation2 = server2.invokeAsync(() -> {
+ cacheRule.getCache().getRegion(REGION_NAME).clear();
+ assertThat(cacheRule.getCache().getRegion(REGION_NAME).size()).isEqualTo(0);
+ });
+
+ asyncInvocation1.await();
+ asyncInvocation2.await();
+ }
+
+ @Test
+ public void testClearRegionWhileChangingEvictionAfterProcessMessage()
+ throws InterruptedException {
+ initialize();
+
+ server1.invoke(() -> DistributionMessageObserver.setInstance(
+ getDistributionMessageObserverAfterProcessMessage()));
+
+ AsyncInvocation asyncInvocation1 = server1.invokeAsync(() -> {
+ Region region = cacheRule.getCache().getRegion(REGION_NAME);
+ AttributesMutator attributesMutator = region.getAttributesMutator();
+ getBlackboard().waitForGate(GATE_NAME);
+ attributesMutator.getEvictionAttributesMutator().setMaximum(1);
+ assertThat(region.getAttributes().getEvictionAttributes().getMaximum()).isEqualTo(1);
+ });
+
+ AsyncInvocation asyncInvocation2 = server2.invokeAsync(() -> {
+ cacheRule.getCache().getRegion(REGION_NAME).clear();
+ assertThat(cacheRule.getCache().getRegion(REGION_NAME).size()).isEqualTo(0);
+ });
+
+ asyncInvocation1.await();
+ asyncInvocation2.await();
+ }
+
+ @Test
+ public void testClearRegionWhileChangingRegionTTLExpirationBeforeProcessMessage()
+ throws InterruptedException {
+ initialize();
+
+ server1.invoke(() -> DistributionMessageObserver.setInstance(
+ getDistributionMessageObserverBeforeProcessMessage()));
+
+ AsyncInvocation asyncInvocation1 = server1.invokeAsync(() -> {
+ Region region = cacheRule.getCache().getRegion(REGION_NAME);
+ AttributesMutator attributesMutator = region.getAttributesMutator();
+ ExpirationAttributes expirationAttributes = new ExpirationAttributes();
+ getBlackboard().waitForGate(GATE_NAME);
+ attributesMutator.setRegionTimeToLive(expirationAttributes);
+ assertThat(region.getAttributes().getRegionTimeToLive()).isEqualTo(expirationAttributes);
+ });
+
+ AsyncInvocation asyncInvocation2 = server2.invokeAsync(() -> {
+ cacheRule.getCache().getRegion(REGION_NAME).clear();
+ assertThat(cacheRule.getCache().getRegion(REGION_NAME).size()).isEqualTo(0);
+ });
+
+ asyncInvocation1.await();
+ asyncInvocation2.await();
+ }
+
+ @Test
+ public void testClearRegionWhileChangingRegionTTLExpirationAfterProcessMessage()
+ throws InterruptedException {
+ initialize();
+
+ server1.invoke(() -> DistributionMessageObserver.setInstance(
+ getDistributionMessageObserverAfterProcessMessage()));
+
+ AsyncInvocation asyncInvocation1 = server1.invokeAsync(() -> {
+ Region region = cacheRule.getCache().getRegion(REGION_NAME);
+ AttributesMutator attributesMutator = region.getAttributesMutator();
+ ExpirationAttributes expirationAttributes = new ExpirationAttributes();
+ getBlackboard().waitForGate(GATE_NAME);
+ attributesMutator.setRegionTimeToLive(expirationAttributes);
+ assertThat(region.getAttributes().getRegionTimeToLive()).isEqualTo(expirationAttributes);
+ });
+
+ AsyncInvocation asyncInvocation2 = server2.invokeAsync(() -> {
+ cacheRule.getCache().getRegion(REGION_NAME).clear();
+ assertThat(cacheRule.getCache().getRegion(REGION_NAME).size()).isEqualTo(0);
+ });
+
+ asyncInvocation1.await();
+ asyncInvocation2.await();
+ }
+
+ @Test
+ public void testClearRegionWhileChangingEntryTTLExpirationBeforeProcessMessage()
+ throws InterruptedException {
+ initialize();
+
+ server1.invoke(() -> DistributionMessageObserver.setInstance(
+ getDistributionMessageObserverBeforeProcessMessage()));
+
+ AsyncInvocation asyncInvocation1 = server1.invokeAsync(() -> {
+ Region region = cacheRule.getCache().getRegion(REGION_NAME);
+ AttributesMutator attributesMutator = region.getAttributesMutator();
+ ExpirationAttributes expirationAttributes = new ExpirationAttributes();
+ getBlackboard().waitForGate(GATE_NAME);
+ attributesMutator.setEntryTimeToLive(expirationAttributes);
+ assertThat(region.getAttributes().getEntryTimeToLive()).isEqualTo(expirationAttributes);
+ });
+
+ AsyncInvocation asyncInvocation2 = server2.invokeAsync(() -> {
+ cacheRule.getCache().getRegion(REGION_NAME).clear();
+ assertThat(cacheRule.getCache().getRegion(REGION_NAME).size()).isEqualTo(0);
+ });
+
+ asyncInvocation1.await();
+ asyncInvocation2.await();
+ }
+
+
+ @Test
+ public void testClearRegionWhileChangingEntryTTLExpirationAfterProcessMessage()
+ throws InterruptedException {
+ initialize();
+
+ server1.invoke(() -> DistributionMessageObserver.setInstance(
+ getDistributionMessageObserverAfterProcessMessage()));
+
+ AsyncInvocation asyncInvocation1 = server1.invokeAsync(() -> {
+ Region region = cacheRule.getCache().getRegion(REGION_NAME);
+ AttributesMutator attributesMutator = region.getAttributesMutator();
+ ExpirationAttributes expirationAttributes = new ExpirationAttributes();
+ getBlackboard().waitForGate(GATE_NAME);
+ attributesMutator.setEntryTimeToLive(expirationAttributes);
+ assertThat(region.getAttributes().getEntryTimeToLive()).isEqualTo(expirationAttributes);
+ });
+
+ AsyncInvocation asyncInvocation2 = server2.invokeAsync(() -> {
+ cacheRule.getCache().getRegion(REGION_NAME).clear();
+ assertThat(cacheRule.getCache().getRegion(REGION_NAME).size()).isEqualTo(0);
+ });
+
+ asyncInvocation1.await();
+ asyncInvocation2.await();
+ }
+
+ @Test
+ public void testClearRegionWhileChangingRegionIdleExpirationBeforeProcessMessage()
+ throws InterruptedException {
+ initialize();
+
+ server1.invoke(() -> DistributionMessageObserver.setInstance(
+ getDistributionMessageObserverBeforeProcessMessage()));
+
+ AsyncInvocation asyncInvocation1 = server1.invokeAsync(() -> {
+ Region region = cacheRule.getCache().getRegion(REGION_NAME);
+ AttributesMutator attributesMutator = region.getAttributesMutator();
+ ExpirationAttributes expirationAttributes = new ExpirationAttributes();
+ getBlackboard().waitForGate(GATE_NAME);
+ attributesMutator.setRegionIdleTimeout(expirationAttributes);
+ assertThat(region.getAttributes().getRegionIdleTimeout()).isEqualTo(expirationAttributes);
+ });
+
+ AsyncInvocation asyncInvocation2 = server2.invokeAsync(() -> {
+ cacheRule.getCache().getRegion(REGION_NAME).clear();
+ assertThat(cacheRule.getCache().getRegion(REGION_NAME).size()).isEqualTo(0);
+ });
+
+ asyncInvocation1.await();
+ asyncInvocation2.await();
+ }
+
+ @Test
+ public void testClearRegionWhileChangingRegionIdleExpirationAfterProcessMessage()
+ throws InterruptedException {
+ initialize();
+
+ server1.invoke(() -> DistributionMessageObserver.setInstance(
+ getDistributionMessageObserverAfterProcessMessage()));
+
+ AsyncInvocation asyncInvocation1 = server1.invokeAsync(() -> {
+ Region region = cacheRule.getCache().getRegion(REGION_NAME);
+ AttributesMutator attributesMutator = region.getAttributesMutator();
+ ExpirationAttributes expirationAttributes = new ExpirationAttributes();
+ getBlackboard().waitForGate(GATE_NAME);
+ attributesMutator.setRegionIdleTimeout(expirationAttributes);
+ assertThat(region.getAttributes().getRegionIdleTimeout()).isEqualTo(expirationAttributes);
+ });
+
+ AsyncInvocation asyncInvocation2 = server2.invokeAsync(() -> {
+ cacheRule.getCache().getRegion(REGION_NAME).clear();
+ assertThat(cacheRule.getCache().getRegion(REGION_NAME).size()).isEqualTo(0);
+ });
+
+ asyncInvocation1.await();
+ asyncInvocation2.await();
+ }
+
+ @Test
+ @Ignore // See GEODE-8680
+ public void testClearRegionWhileChangingEntryIdleExpirationBeforeProcessMessage()
+ throws InterruptedException {
+ initialize();
+
+ server1.invoke(() -> DistributionMessageObserver.setInstance(
+ getDistributionMessageObserverBeforeProcessMessage()));
+
+ AsyncInvocation asyncInvocation1 = server1.invokeAsync(() -> {
+ Region region = cacheRule.getCache().getRegion(REGION_NAME);
+ AttributesMutator attributesMutator = region.getAttributesMutator();
+ ExpirationAttributes expirationAttributes =
+ new ExpirationAttributes(1, ExpirationAction.DESTROY);
+ getBlackboard().waitForGate(GATE_NAME);
+ attributesMutator.setEntryIdleTimeout(expirationAttributes);
+ assertThat(region.getAttributes().getEntryIdleTimeout()).isEqualTo(expirationAttributes);
+ });
+
+ AsyncInvocation asyncInvocation2 = server2.invokeAsync(() -> {
+ cacheRule.getCache().getRegion(REGION_NAME).clear();
+ assertThat(cacheRule.getCache().getRegion(REGION_NAME).size()).isEqualTo(0);
+ });
+
+ asyncInvocation1.await();
+ asyncInvocation2.await();
+ }
+
+ @Test
+ @Ignore // See GEODE-8680
+ public void testClearRegionWhileChangingEntryIdleExpirationAfterProcessMessage()
+ throws InterruptedException {
+ initialize();
+
+ server1.invoke(() -> DistributionMessageObserver.setInstance(
+ getDistributionMessageObserverAfterProcessMessage()));
+
+ AsyncInvocation asyncInvocation1 = server1.invokeAsync(() -> {
+ Region region = cacheRule.getCache().getRegion(REGION_NAME);
+ AttributesMutator attributesMutator = region.getAttributesMutator();
+ ExpirationAttributes expirationAttributes =
+ new ExpirationAttributes(1, ExpirationAction.DESTROY);
+ getBlackboard().waitForGate(GATE_NAME);
+ attributesMutator.setEntryIdleTimeout(expirationAttributes);
+ assertThat(region.getAttributes().getEntryIdleTimeout()).isEqualTo(expirationAttributes);
+ });
+
+ AsyncInvocation asyncInvocation2 = server2.invokeAsync(() -> {
+ cacheRule.getCache().getRegion(REGION_NAME).clear();
+ assertThat(cacheRule.getCache().getRegion(REGION_NAME).size()).isEqualTo(0);
+ });
+
+ asyncInvocation1.await();
+ asyncInvocation2.await();
+ }
+
+ @Test
+ public void testMemberLeaveBeforeProcessMessage() throws InterruptedException {
+ initialize();
+
+ server3 = VM.getVM(2);
+
+ server3.invoke(() -> {
+ cacheRule.createCache();
+ cacheRule.getCache().createRegionFactory(RegionShortcut.PARTITION).setStatisticsEnabled(true)
+ .create(REGION_NAME);
+ Region region = cacheRule.getCache().getRegion(REGION_NAME);
+ assertThat(region.size()).isEqualTo(NUM_ENTRIES);
+ });
+
+ server2.invoke(() -> {
+ DistributionMessageObserver
+ .setInstance(
+ new MemberKiller(false));
+ });
+
+ server3.invoke(() -> DistributionMessageObserver.setInstance(
+ getDistributionMessageObserverBeforeProcessMessage()));
+
+ AsyncInvocation asyncInvocation1 = server1.invokeAsync(() -> {
+ assertThatThrownBy(() -> cacheRule.getCache().getRegion(REGION_NAME).clear())
+ .isInstanceOf(PartitionedRegionPartialClearException.class);
+ });
+
+ AsyncInvocation asyncInvocation2 = server3.invokeAsync(() -> {
+ alterRegionSetCacheWriter();
+ });
+
+ asyncInvocation1.await();
+ asyncInvocation2.await();
+ }
+
+ @Test
+ public void testMemberLeaveAfterProcessMessage() throws InterruptedException {
+ initialize();
+
+ server3 = VM.getVM(2);
+
+ server3.invoke(() -> {
+ cacheRule.createCache();
+ cacheRule.getCache().createRegionFactory(RegionShortcut.PARTITION).setStatisticsEnabled(true)
+ .create(REGION_NAME);
+ Region region = cacheRule.getCache().getRegion(REGION_NAME);
+ assertThat(region.size()).isEqualTo(NUM_ENTRIES);
+ });
+
+ server2.invoke(() -> {
+ DistributionMessageObserver
+ .setInstance(
+ new MemberKiller(false));
+ });
+
+ server3.invoke(() -> DistributionMessageObserver.setInstance(
+ getDistributionMessageObserverAfterProcessMessage()));
+
+ AsyncInvocation asyncInvocation1 = server1.invokeAsync(() -> {
+ assertThatThrownBy(() -> cacheRule.getCache().getRegion(REGION_NAME).clear())
+ .isInstanceOf(PartitionedRegionPartialClearException.class);
+ });
+
+ AsyncInvocation asyncInvocation2 = server3.invokeAsync(() -> {
+ alterRegionSetCacheWriter();
+ });
+
+ asyncInvocation1.await();
+ asyncInvocation2.await();
+ }
+
+ @Test
+ public void testSingleServer() throws InterruptedException, ExecutionException {
+ cacheRule.createCache();
+ cacheRule.getCache().createRegionFactory(RegionShortcut.PARTITION).setStatisticsEnabled(true)
+ .create(REGION_NAME);
+ populateRegion();
+ Region region = cacheRule.getCache().getRegion(REGION_NAME);
+ assertThat(region.size()).isEqualTo(NUM_ENTRIES);
+
+ Future future1 = executorServiceRule.runAsync(() -> {
+ cacheRule.getCache().getRegion(REGION_NAME).clear();
+ assertThat(cacheRule.getCache().getRegion(REGION_NAME).size()).isEqualTo(0);
+ });
+
+ Future future2 = executorServiceRule.runAsync(() -> {
+ AttributesMutator attributesMutator = region.getAttributesMutator();
+ TestCacheLoader testCacheLoader = new TestCacheLoader();
+ attributesMutator.setCacheLoader(testCacheLoader);
+ assertThat(region.getAttributes().getCacheLoader()).isEqualTo(testCacheLoader);
+ });
+
+ future1.get();
+ future2.get();
+ }
+
+ private void populateRegion() {
+ Region region = cacheRule.getCache().getRegion(REGION_NAME);
+ IntStream.range(0, NUM_ENTRIES).forEach(i -> region.put(i, i));
+ }
+
+ private void alterRegionSetCacheLoader() throws TimeoutException, InterruptedException {
+ Region region = cacheRule.getCache().getRegion(REGION_NAME);
+ AttributesMutator attributesMutator = region.getAttributesMutator();
+ TestCacheLoader testCacheLoader = new TestCacheLoader();
+ getBlackboard().waitForGate(GATE_NAME);
+ attributesMutator.setCacheLoader(testCacheLoader);
+ assertThat(region.getAttributes().getCacheLoader()).isEqualTo(testCacheLoader);
+ }
+
+ private void alterRegionSetCacheWriter() throws TimeoutException, InterruptedException {
+ Region region = cacheRule.getCache().getRegion(REGION_NAME);
+ AttributesMutator attributesMutator = region.getAttributesMutator();
+ TestCacheWriter testCacheWriter = new TestCacheWriter();
+ getBlackboard().waitForGate(GATE_NAME);
+ attributesMutator.setCacheWriter(testCacheWriter);
+ assertThat(region.getAttributes().getCacheWriter()).isEqualTo(testCacheWriter);
+ }
+
+ private void alterRegionSetCacheListener() throws TimeoutException, InterruptedException {
+ Region region = cacheRule.getCache().getRegion(REGION_NAME);
+ AttributesMutator attributesMutator = region.getAttributesMutator();
+ TestCacheListener testCacheListener = new TestCacheListener();
+ getBlackboard().waitForGate(GATE_NAME);
+ attributesMutator.addCacheListener(testCacheListener);
+ assertThat(region.getAttributes().getCacheListeners()).contains(testCacheListener);
+ }
+
+ private class TestCacheLoader implements CacheLoader {
+
+ @Override
+ public Object load(LoaderHelper helper) throws CacheLoaderException {
+ return new Integer(NUM_ENTRIES);
+ }
+ }
+
+ private class TestCacheWriter implements CacheWriter {
+
+ @Override
+ public void beforeUpdate(EntryEvent event) throws CacheWriterException {
+
+ }
+
+ @Override
+ public void beforeCreate(EntryEvent event) throws CacheWriterException {
+
+ }
+
+ @Override
+ public void beforeDestroy(EntryEvent event) throws CacheWriterException {
+
+ }
+
+ @Override
+ public void beforeRegionDestroy(RegionEvent event) throws CacheWriterException {
+
+ }
+
+ @Override
+ public void beforeRegionClear(RegionEvent event) throws CacheWriterException {
+ System.out.println("beforeRegionClear");
+ }
+ }
+
+ private class TestCacheListener implements CacheListener {
+
+ @Override
+ public void afterCreate(EntryEvent event) {
+
+ }
+
+ @Override
+ public void afterUpdate(EntryEvent event) {
+
+ }
+
+ @Override
+ public void afterInvalidate(EntryEvent event) {
+
+ }
+
+ @Override
+ public void afterDestroy(EntryEvent event) {
+
+ }
+
+ @Override
+ public void afterRegionInvalidate(RegionEvent event) {
+
+ }
+
+ @Override
+ public void afterRegionDestroy(RegionEvent event) {
+
+ }
+
+ @Override
+ public void afterRegionClear(RegionEvent event) {
+ System.out.println("afterRegionClear");
+ }
+
+ @Override
+ public void afterRegionCreate(RegionEvent event) {
+
+ }
+
+ @Override
+ public void afterRegionLive(RegionEvent event) {
+
+ }
+ }
+
+ private static DUnitBlackboard getBlackboard() {
+ if (blackboard == null) {
+ blackboard = new DUnitBlackboard();
+ }
+ return blackboard;
+ }
+
+ private DistributionMessageObserver getDistributionMessageObserverBeforeProcessMessage() {
+ return new DistributionMessageObserver() {
+ @Override
+ public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
+ super.beforeProcessMessage(dm, message);
+ if (message instanceof PartitionedRegionClearMessage) {
+ DistributionMessageObserver.setInstance(null);
+ getBlackboard().signalGate(GATE_NAME);
+ }
+ }
+ };
+ }
+
+ private DistributionMessageObserver getDistributionMessageObserverAfterProcessMessage() {
+ return new DistributionMessageObserver() {
+ @Override
+ public void afterProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
+ super.afterProcessMessage(dm, message);
+ if (message instanceof PartitionedRegionClearMessage) {
+ DistributionMessageObserver.setInstance(null);
+ getBlackboard().signalGate(GATE_NAME);
+ }
+ }
+ };
+ }
+
+ /**
+ * Shutdowns a coordinator member while the clear operation is in progress.
+ */
+ public static class MemberKiller extends DistributionMessageObserver {
+ private final boolean coordinator;
+
+ public MemberKiller(boolean coordinator) {
+ this.coordinator = coordinator;
+ }
+
+ /**
+ * Shutdowns the VM whenever the message is an instance of
+ * {@link PartitionedRegionClearMessage}.
+ */
+ private void shutdownMember(DistributionMessage message) {
+ if (message instanceof PartitionedRegionClearMessage) {
+ if (((PartitionedRegionClearMessage) message)
+ .getOp() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) {
+ DistributionMessageObserver.setInstance(null);
+ InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect();
+ MembershipManagerHelper
+ .crashDistributedSystem(InternalDistributedSystem.getConnectedInstance());
+ await().untilAsserted(
+ () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNull());
+ }
+ }
+ }
+
+ /**
+ * Invoked only on clear coordinator VM.
+ *
+ * @param dm the distribution manager that received the message
+ * @param message The message itself
+ */
+ @Override
+ public void beforeSendMessage(ClusterDistributionManager dm, DistributionMessage message) {
+ if (coordinator) {
+ shutdownMember(message);
+ } else {
+ super.beforeSendMessage(dm, message);
+ }
+ }
+
+ /**
+ * Invoked only on non clear coordinator VM.
+ *
+ * @param dm the distribution manager that received the message
+ * @param message The message itself
+ */
+ @Override
+ public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
+ if (!coordinator) {
+ shutdownMember(message);
+ } else {
+ super.beforeProcessMessage(dm, message);
+ }
+ }
+ }
+}