You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2020/10/14 18:22:32 UTC
[geode] 17/22: GEODE-7680: PR.clear must be successful when
interacting with rebalance (#5095)
This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 02336cdcd9cfb734c7f3abff97a65f51bec0e727
Author: Donal Evans <do...@pivotal.io>
AuthorDate: Thu Jul 23 08:14:17 2020 -0700
GEODE-7680: PR.clear must be successful when interacting with rebalance (#5095)
- Added DUnit tests to confirm that clear does not interfere with
rebalance or vice versa
- Test when member departs during clear/rebalance
- Test when member joins during clear/rebalance
- Fixed typo in PartitionedRegionClearWithExpirationDUnitTest
- Fixed typo in PartitionedRegion
- Call assignBucketsToPartitions() on leader colocated region during clear
instead of target region
Authored-by: Donal Evans <do...@pivotal.io>
---
...rtitionedRegionClearWithRebalanceDUnitTest.java | 578 +++++++++++++++++++++
.../geode/internal/cache/ColocationHelper.java | 10 +-
.../internal/cache/PartitionedRegionClear.java | 3 +-
3 files changed, 583 insertions(+), 8 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithRebalanceDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithRebalanceDUnitTest.java
new file mode 100644
index 0000000..f53fab7
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithRebalanceDUnitTest.java
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.cache.PartitionAttributesFactory.GLOBAL_MAX_BUCKETS_DEFAULT;
+import static org.apache.geode.cache.RegionShortcut.PARTITION;
+import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT;
+import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_PERSISTENT;
+import static org.apache.geode.internal.util.ArrayUtils.asList;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.IntStream;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.control.RebalanceFactory;
+import org.apache.geode.cache.control.RebalanceOperation;
+import org.apache.geode.cache.control.RebalanceResults;
+import org.apache.geode.cache.util.CacheWriterAdapter;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.DUnitBlackboard;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedDiskDirRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+
+@RunWith(JUnitParamsRunner.class)
+public class PartitionedRegionClearWithRebalanceDUnitTest implements Serializable {
+ private static final long serialVersionUID = -7183993832801073933L;
+
+ private static final Integer BUCKETS = GLOBAL_MAX_BUCKETS_DEFAULT;
+ private static final String REGION_NAME = "testRegion";
+ private static final String COLOCATED_REGION = "childColocatedRegion";
+ private static final int ENTRIES = 10000;
+ private static final String DISK_STORE_SUFFIX = "DiskStore";
+ private static final String REBALANCE_HAS_BEGUN = "rebalance-begun";
+ private static final String CLEAR_HAS_BEGUN = "clear-begun";
+
+ @Rule
+ public DistributedRule distributedRule = new DistributedRule(4);
+
+ @Rule
+ public CacheRule cacheRule = CacheRule.builder().createCacheInAll().build();
+
+ @Rule
+ public DistributedDiskDirRule distributedDiskDirRule = new DistributedDiskDirRule();
+
+ private static transient DUnitBlackboard blackboard;
+
+ private VM accessor;
+ private VM server1;
+ private VM server2;
+ private VM server3;
+
+ private enum TestVM {
+ ACCESSOR(0), SERVER1(1), SERVER2(2), SERVER3(3);
+
+ final int vmNumber;
+
+ TestVM(int vmNumber) {
+ this.vmNumber = vmNumber;
+ }
+ }
+
+ @SuppressWarnings("unused")
+ static Object[] coordinatorVMsAndRegionTypes() {
+ return new Object[] {
+ // {ClearCoordinatorVM, regionShortcut}
+ new Object[] {TestVM.SERVER1, PARTITION_REDUNDANT},
+ new Object[] {TestVM.ACCESSOR, PARTITION_REDUNDANT},
+ new Object[] {TestVM.SERVER1, PARTITION_REDUNDANT_PERSISTENT},
+ new Object[] {TestVM.ACCESSOR, PARTITION_REDUNDANT_PERSISTENT}
+ };
+ }
+
+ @SuppressWarnings("unused")
+ static Object[] coordinatorVMsAndRegionTypesNoAccessor() {
+ return new Object[] {
+ // {ClearCoordinatorVM, regionShortcut}
+ new Object[] {TestVM.SERVER1, PARTITION_REDUNDANT},
+ new Object[] {TestVM.SERVER2, PARTITION_REDUNDANT},
+ new Object[] {TestVM.SERVER1, PARTITION_REDUNDANT_PERSISTENT},
+ new Object[] {TestVM.SERVER2, PARTITION_REDUNDANT_PERSISTENT}
+ };
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ getBlackboard().initBlackboard();
+ server1 = getVM(TestVM.SERVER1.vmNumber);
+ server2 = getVM(TestVM.SERVER2.vmNumber);
+ server3 = getVM(TestVM.SERVER3.vmNumber);
+ accessor = getVM(TestVM.ACCESSOR.vmNumber);
+ }
+
+ private static DUnitBlackboard getBlackboard() {
+ if (blackboard == null) {
+ blackboard = new DUnitBlackboard();
+ }
+ return blackboard;
+ }
+
+ private RegionShortcut getRegionAccessorShortcut(RegionShortcut dataStoreRegionShortcut) {
+ if (dataStoreRegionShortcut.isPersistent()) {
+ switch (dataStoreRegionShortcut) {
+ case PARTITION_PERSISTENT:
+ return PARTITION;
+ case PARTITION_REDUNDANT_PERSISTENT:
+ return PARTITION_REDUNDANT;
+ default:
+ throw new IllegalArgumentException(
+ "Invalid RegionShortcut specified: " + dataStoreRegionShortcut);
+ }
+ }
+
+ return dataStoreRegionShortcut;
+ }
+
+ private void initAccessor(RegionShortcut regionShortcut, Collection<String> regionNames) {
+ RegionShortcut accessorShortcut = getRegionAccessorShortcut(regionShortcut);
+ // StartupRecoveryDelay is set to infinite to prevent automatic rebalancing when creating the
+ // region on other members
+ regionNames.forEach(regionName -> {
+ PartitionAttributesFactory<String, String> attributesFactory =
+ new PartitionAttributesFactory<String, String>()
+ .setTotalNumBuckets(BUCKETS)
+ .setStartupRecoveryDelay(-1)
+ .setLocalMaxMemory(0);
+
+ if (regionName.equals(COLOCATED_REGION)) {
+ attributesFactory.setColocatedWith(REGION_NAME);
+ }
+
+ cacheRule.getCache()
+ .<String, String>createRegionFactory(accessorShortcut)
+ .setPartitionAttributes(attributesFactory.create())
+ .create(regionName);
+ });
+ }
+
+ private void initDataStore(RegionShortcut regionShortcut, Collection<String> regionNames) {
+ // StartupRecoveryDelay is set to infinite to prevent automatic rebalancing when creating the
+ // region on other members
+ regionNames.forEach(regionName -> {
+ PartitionAttributesFactory<String, String> attributesFactory =
+ new PartitionAttributesFactory<String, String>()
+ .setTotalNumBuckets(BUCKETS)
+ .setStartupRecoveryDelay(-1);
+
+ if (regionName.equals(COLOCATED_REGION)) {
+ attributesFactory.setColocatedWith(REGION_NAME);
+ }
+
+ RegionFactory<String, String> factory = cacheRule.getCache()
+ .<String, String>createRegionFactory(regionShortcut)
+ .setPartitionAttributes(attributesFactory.create())
+ .setCacheWriter(new BlackboardSignaller());
+
+ // Set up the disk store if the region is persistent
+ if (regionShortcut.isPersistent()) {
+ factory.setDiskStoreName(cacheRule.getCache()
+ .createDiskStoreFactory()
+ .create(regionName + DISK_STORE_SUFFIX)
+ .getName());
+ }
+
+ factory.create(regionName);
+ });
+ }
+
+ private void parametrizedSetup(RegionShortcut regionShortcut, Collection<String> regionNames,
+ boolean useAccessor) {
+ // Create and populate the region on server1 first, to create an unbalanced distribution of data
+ server1.invoke(() -> {
+ initDataStore(regionShortcut, regionNames);
+ regionNames.forEach(regionName -> {
+ Region<String, String> region = cacheRule.getCache().getRegion(regionName);
+ IntStream.range(0, ENTRIES).forEach(i -> region.put("key" + i, "value" + i));
+ });
+ });
+ server2.invoke(() -> initDataStore(regionShortcut, regionNames));
+ if (useAccessor) {
+ accessor.invoke(() -> initAccessor(regionShortcut, regionNames));
+ } else {
+ server3.invoke(() -> initDataStore(regionShortcut, regionNames));
+ }
+ }
+
+ private void setBlackboardSignallerCacheWriter(String regionName) {
+ cacheRule.getCache().<String, String>getRegion(regionName).getAttributesMutator()
+ .setCacheWriter(new BlackboardSignaller());
+ }
+
+ private AsyncInvocation<?> startClearAsync(TestVM clearCoordinatorVM, String regionName,
+ boolean waitForRebalance) {
+ return getVM(clearCoordinatorVM.vmNumber).invokeAsync(() -> {
+ Region<String, String> region = cacheRule.getCache().getRegion(regionName);
+ if (waitForRebalance) {
+ // Wait for the signal from the blackboard before triggering the clear to start
+ getBlackboard().waitForGate(REBALANCE_HAS_BEGUN, GeodeAwaitility.getTimeout().toMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+ region.clear();
+ });
+ }
+
+ // Trigger a rebalance and wait until it has started restoring redundancy before signalling the
+ // blackboard
+ private AsyncInvocation<?> startRebalanceAsyncAndSignalBlackboard(boolean waitForClear) {
+ return server1.invokeAsync(() -> {
+ RebalanceFactory rebalance =
+ cacheRule.getCache().getResourceManager().createRebalanceFactory();
+ if (waitForClear) {
+ // Wait for the signal from the blackboard before triggering the rebalance to start
+ getBlackboard().waitForGate(CLEAR_HAS_BEGUN, GeodeAwaitility.getTimeout().toMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+ RebalanceOperation op = rebalance.start();
+ await().untilAsserted(() -> assertThat(cacheRule.getCache().getInternalResourceManager()
+ .getStats().getRebalanceBucketCreatesCompleted()).isGreaterThan(0));
+ getBlackboard().signalGate(REBALANCE_HAS_BEGUN);
+ op.getResults();
+ });
+ }
+
+ private void executeClearAndRebalanceAsyncInvocations(TestVM clearCoordinatorVM,
+ String regionToClear, boolean rebalanceFirst) throws InterruptedException {
+ getVM(clearCoordinatorVM.vmNumber)
+ .invoke(() -> setBlackboardSignallerCacheWriter(regionToClear));
+
+ AsyncInvocation<?> clearInvocation = startClearAsync(clearCoordinatorVM, regionToClear,
+ rebalanceFirst);
+
+ AsyncInvocation<?> rebalanceInvocation =
+ startRebalanceAsyncAndSignalBlackboard(!rebalanceFirst);
+
+ clearInvocation.await();
+ rebalanceInvocation.await();
+ }
+
+ private void prepareMemberToShutdownOnClear() throws TimeoutException, InterruptedException {
+ getBlackboard().waitForGate(CLEAR_HAS_BEGUN, GeodeAwaitility.getTimeout().toMillis(),
+ TimeUnit.MILLISECONDS);
+ InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect();
+ MembershipManagerHelper.crashDistributedSystem(
+ InternalDistributedSystem.getConnectedInstance());
+ await().untilAsserted(
+ () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNull());
+ }
+
+ private void waitForSilenceOnRegion(String regionName) {
+ DMStats dmStats = cacheRule.getSystem().getDistributionManager().getStats();
+ PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(regionName);
+ PartitionedRegionStats partitionedRegionStats = region.getPrStats();
+ await().untilAsserted(() -> {
+ assertThat(dmStats.getReplyWaitsInProgress()).isEqualTo(0);
+ assertThat(partitionedRegionStats.getVolunteeringInProgress()).isEqualTo(0);
+ assertThat(partitionedRegionStats.getBucketCreatesInProgress()).isEqualTo(0);
+ assertThat(partitionedRegionStats.getPrimaryTransfersInProgress()).isEqualTo(0);
+ assertThat(partitionedRegionStats.getRebalanceBucketCreatesInProgress()).isEqualTo(0);
+ assertThat(partitionedRegionStats.getRebalancePrimaryTransfersInProgress()).isEqualTo(0);
+ });
+ }
+
+ private void assertRegionIsEmpty(List<VM> vms, String regionName) {
+ vms.forEach(vm -> vm.invoke(() -> {
+ waitForSilenceOnRegion(regionName);
+ PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(regionName);
+
+ assertThat(region.getLocalSize()).as("Region local size should be 0 for region " + regionName)
+ .isEqualTo(0);
+ }));
+ }
+
+ private void assertRegionIsNotEmpty(List<VM> vms, String regionName) {
+ vms.forEach(vm -> vm.invoke(() -> {
+ waitForSilenceOnRegion(regionName);
+ PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(regionName);
+
+ assertThat(region.size()).as("Region size should be " + ENTRIES + " for region " + regionName)
+ .isEqualTo(ENTRIES);
+ }));
+ }
+
+ private void assertRebalanceDoesNoWork() {
+ server1.invoke(() -> {
+ RebalanceResults results =
+ cacheRule.getCache().getResourceManager().createRebalanceFactory().start().getResults();
+
+ assertThat(results.getTotalBucketTransfersCompleted())
+ .as("Expected bucket transfers to be zero").isEqualTo(0);
+ assertThat(results.getTotalBucketCreatesCompleted()).as("Expected bucket creates to be zero")
+ .isEqualTo(0);
+ assertThat(results.getTotalPrimaryTransfersCompleted())
+ .as("Expected primary transfers to be zero").isEqualTo(0);
+ });
+ }
+
+ @Test
+ @Parameters(method = "coordinatorVMsAndRegionTypes")
+ @TestCaseName("[{index}] {method}(ClearCoordinator:{0}, RegionType:{1})")
+ public void clearRegionStartedAfterRebalanceClearsRegion(TestVM clearCoordinatorVM,
+ RegionShortcut regionType) throws InterruptedException {
+ parametrizedSetup(regionType, Collections.singleton(REGION_NAME), true);
+
+ executeClearAndRebalanceAsyncInvocations(clearCoordinatorVM, REGION_NAME, true);
+
+ // Assert that the region is empty
+ assertRegionIsEmpty(asList(accessor, server1, server2), REGION_NAME);
+
+ // Assert that the region was successfully rebalanced (a second rebalance should do no work)
+ assertRebalanceDoesNoWork();
+ }
+
+ @Test
+ @Parameters(method = "coordinatorVMsAndRegionTypes")
+ @TestCaseName("[{index}] {method}(ClearCoordinator:{0}, RegionType:{1})")
+ public void clearRegionStartedBeforeRebalanceClearsRegion(TestVM clearCoordinatorVM,
+ RegionShortcut regionType) throws InterruptedException {
+ parametrizedSetup(regionType, Collections.singleton(REGION_NAME), true);
+
+ executeClearAndRebalanceAsyncInvocations(clearCoordinatorVM, REGION_NAME, false);
+
+ // Assert that the region is empty
+ assertRegionIsEmpty(asList(accessor, server1, server2), REGION_NAME);
+
+ // Assert that the region was successfully rebalanced (a second rebalance should do no work)
+ assertRebalanceDoesNoWork();
+ }
+
+ @Test
+ @Parameters(method = "coordinatorVMsAndRegionTypes")
+ @TestCaseName("[{index}] {method}(ClearCoordinator:{0}, RegionType:{1})")
+ public void clearParentColocatedRegionStartedAfterRebalanceOfColocatedRegionsClearsRegionAndDoesNotInterfereWithRebalance(
+ TestVM clearCoordinatorVM, RegionShortcut regionType)
+ throws InterruptedException {
+ parametrizedSetup(regionType, asList(REGION_NAME, COLOCATED_REGION), true);
+
+ executeClearAndRebalanceAsyncInvocations(clearCoordinatorVM, REGION_NAME, true);
+
+ // Assert that the parent region is empty
+ assertRegionIsEmpty(asList(accessor, server1, server2), REGION_NAME);
+
+ // Assert that the colocated region is the correct size
+ assertRegionIsNotEmpty(asList(accessor, server1, server2), COLOCATED_REGION);
+
+ // Assert that the regions were successfully rebalanced (a second rebalance should do no work)
+ assertRebalanceDoesNoWork();
+ }
+
+ @Test
+ @Parameters(method = "coordinatorVMsAndRegionTypes")
+ @TestCaseName("[{index}] {method}(ClearCoordinator:{0}, RegionType:{1})")
+ public void clearParentColocatedRegionStartedBeforeRebalanceOfColocatedRegionsClearsRegionAndDoesNotInterfereWithRebalance(
+ TestVM clearCoordinatorVM, RegionShortcut regionType)
+ throws InterruptedException {
+ parametrizedSetup(regionType, asList(REGION_NAME, COLOCATED_REGION), true);
+
+ executeClearAndRebalanceAsyncInvocations(clearCoordinatorVM, REGION_NAME, false);
+
+ // Assert that the parent region is empty
+ assertRegionIsEmpty(asList(accessor, server1, server2), REGION_NAME);
+
+ // Assert that the colocated region is the correct size
+ assertRegionIsNotEmpty(asList(accessor, server1, server2), COLOCATED_REGION);
+
+ // Assert that the regions were successfully rebalanced (a second rebalance should do no work)
+ assertRebalanceDoesNoWork();
+ }
+
+ @Test
+ @Parameters(method = "coordinatorVMsAndRegionTypes")
+ @TestCaseName("[{index}] {method}(ClearCoordinator:{0}, RegionType:{1})")
+ public void clearChildColocatedRegionStartedAfterRebalanceOfColocatedRegionsClearsRegionAndDoesNotInterfereWithRebalance(
+ TestVM clearCoordinatorVM, RegionShortcut regionType)
+ throws InterruptedException {
+ parametrizedSetup(regionType, asList(REGION_NAME, COLOCATED_REGION), true);
+
+ executeClearAndRebalanceAsyncInvocations(clearCoordinatorVM, COLOCATED_REGION, true);
+
+ // Assert that the colocated region is empty
+ assertRegionIsEmpty(asList(accessor, server1, server2), COLOCATED_REGION);
+
+ // Assert that the parent region is the correct size
+ assertRegionIsNotEmpty(asList(accessor, server1, server2), REGION_NAME);
+
+ // Assert that the regions were successfully rebalanced (a second rebalance should do no work)
+ assertRebalanceDoesNoWork();
+ }
+
+ @Test
+ @Parameters(method = "coordinatorVMsAndRegionTypes")
+ @TestCaseName("[{index}] {method}(ClearCoordinator:{0}, RegionType:{1})")
+ public void clearChildColocatedRegionStartedBeforeRebalanceOfColocatedRegionsClearsRegionAndDoesNotInterfereWithRebalance(
+ TestVM clearCoordinatorVM, RegionShortcut regionType)
+ throws InterruptedException {
+ parametrizedSetup(regionType, asList(REGION_NAME, COLOCATED_REGION), true);
+
+ executeClearAndRebalanceAsyncInvocations(clearCoordinatorVM, COLOCATED_REGION, false);
+
+ // Assert that the colocated region is empty
+ assertRegionIsEmpty(asList(accessor, server1, server2), COLOCATED_REGION);
+
+ // Assert that the parent region is the correct size
+ assertRegionIsNotEmpty(asList(accessor, server1, server2), REGION_NAME);
+
+ // Assert that the regions were successfully rebalanced (a second rebalance should do no work)
+ assertRebalanceDoesNoWork();
+ }
+
+ @Test
+ @Parameters(method = "coordinatorVMsAndRegionTypesNoAccessor")
+ @TestCaseName("[{index}] {method}(ClearCoordinator:{0}, RegionType:{1})")
+ public void clearStartedBeforeRebalanceClearsRegionWhenNonCoordinatorMemberIsKilled(
+ TestVM clearCoordinatorVM, RegionShortcut regionType)
+ throws InterruptedException {
+ parametrizedSetup(regionType, Collections.singleton(REGION_NAME), false);
+
+ getVM(clearCoordinatorVM.vmNumber).invoke(() -> setBlackboardSignallerCacheWriter(REGION_NAME));
+
+ // Make server3 shut down when it receives the signal from the blackboard that clear has started
+ AsyncInvocation<?> shutdownInvocation =
+ server3.invokeAsync(this::prepareMemberToShutdownOnClear);
+
+ executeClearAndRebalanceAsyncInvocations(clearCoordinatorVM, REGION_NAME, false);
+
+ shutdownInvocation.await();
+
+ // Assert that the region is empty
+ assertRegionIsEmpty(asList(server1, server2), REGION_NAME);
+ }
+
+ @Test
+ @Parameters(method = "coordinatorVMsAndRegionTypesNoAccessor")
+ @TestCaseName("[{index}] {method}(ClearCoordinator:{0}, RegionType:{1})")
+ public void clearStartedAfterRebalanceClearsRegionWhenNewMemberJoins(TestVM clearCoordinatorVM,
+ RegionShortcut regionType) throws InterruptedException {
+
+ // Load the data on server1 before creating the region on other servers, to create an imbalanced
+ // system
+ server1.invoke(() -> {
+ initDataStore(regionType, Collections.singleton(REGION_NAME));
+ Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
+ IntStream.range(0, ENTRIES).forEach(i -> region.put("key" + i, "value" + i));
+ });
+ server2.invoke(() -> initDataStore(regionType, Collections.singleton(REGION_NAME)));
+
+ // Wait for rebalance to start, then create the region on server3
+ AsyncInvocation<?> createRegion = server3.invokeAsync(() -> {
+ cacheRule.createCache();
+
+ PartitionAttributesFactory<String, String> attributesFactory =
+ new PartitionAttributesFactory<String, String>()
+ .setTotalNumBuckets(BUCKETS)
+ .setStartupRecoveryDelay(-1);
+
+ RegionFactory<String, String> factory = cacheRule.getCache()
+ .<String, String>createRegionFactory(regionType)
+ .setPartitionAttributes(attributesFactory.create())
+ .setCacheWriter(new BlackboardSignaller());
+
+ if (regionType.isPersistent()) {
+ factory.setDiskStoreName(cacheRule.getCache()
+ .createDiskStoreFactory()
+ .create(REGION_NAME + DISK_STORE_SUFFIX)
+ .getName());
+ }
+
+ getBlackboard().waitForGate(REBALANCE_HAS_BEGUN, GeodeAwaitility.getTimeout().toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ factory.create(REGION_NAME);
+ });
+
+ executeClearAndRebalanceAsyncInvocations(clearCoordinatorVM, REGION_NAME, true);
+
+ createRegion.await();
+
+ // Assert that the region is empty
+ assertRegionIsEmpty(asList(server1, server2, server3), REGION_NAME);
+ }
+
+
+ @Test
+ @Parameters(method = "coordinatorVMsAndRegionTypesNoAccessor")
+ @TestCaseName("[{index}] {method}(ClearCoordinator:{0}, RegionType:{1})")
+ public void clearStartedBeforeRebalanceClearsRegionWhenNewMemberJoins(TestVM clearCoordinatorVM,
+ RegionShortcut regionType) throws InterruptedException {
+
+ // Load the data on server1 before creating the region on other servers, to create an imbalanced
+ // system
+ server1.invoke(() -> {
+ initDataStore(regionType, Collections.singleton(REGION_NAME));
+ Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME);
+ IntStream.range(0, ENTRIES).forEach(i -> region.put("key" + i, "value" + i));
+ });
+
+ server2.invoke(() -> initDataStore(regionType, Collections.singleton(REGION_NAME)));
+
+ // Wait for clear to start, then create the region on server3
+ AsyncInvocation<?> createRegion = server3.invokeAsync(() -> {
+ cacheRule.createCache();
+
+ PartitionAttributesFactory<String, String> attributesFactory =
+ new PartitionAttributesFactory<String, String>()
+ .setTotalNumBuckets(BUCKETS)
+ .setStartupRecoveryDelay(-1);
+
+ RegionFactory<String, String> factory = cacheRule.getCache()
+ .<String, String>createRegionFactory(regionType)
+ .setPartitionAttributes(attributesFactory.create())
+ .setCacheWriter(new BlackboardSignaller());
+
+ if (regionType.isPersistent()) {
+ factory.setDiskStoreName(cacheRule.getCache()
+ .createDiskStoreFactory()
+ .create(REGION_NAME + DISK_STORE_SUFFIX)
+ .getName());
+ }
+
+ getBlackboard().waitForGate(CLEAR_HAS_BEGUN, GeodeAwaitility.getTimeout().toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ factory.create(REGION_NAME);
+ });
+
+ executeClearAndRebalanceAsyncInvocations(clearCoordinatorVM, REGION_NAME, false);
+
+ createRegion.await();
+
+ // Assert that the region is empty
+ assertRegionIsEmpty(asList(server1, server2, server3), REGION_NAME);
+ }
+
+ public static class BlackboardSignaller extends CacheWriterAdapter<String, String> {
+ @Override
+ public synchronized void beforeRegionClear(RegionEvent<String, String> event)
+ throws CacheWriterException {
+ getBlackboard().signalGate(CLEAR_HAS_BEGUN);
+ }
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ColocationHelper.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ColocationHelper.java
index 4e30d64..f7c5c7f 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ColocationHelper.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ColocationHelper.java
@@ -279,15 +279,11 @@ public class ColocationHelper {
}
/**
- * An utility method to retrieve all partitioned regions(excluding self) in a colocation chain<br>
+ * A utility method to retrieve all partitioned regions(excluding self) in a colocation chain<br>
* <p>
- * For example, shipmentPR is colocated with orderPR and orderPR is colocated with customerPR <br>
- * <br>
- * getAllColocationRegions(customerPR) --> List{orderPR, shipmentPR}<br>
- * getAllColocationRegions(orderPR) --> List{customerPR, shipmentPR}<br>
- * getAllColocationRegions(shipmentPR) --> List{customerPR, orderPR}<br>
*
- * @return List of all partitioned regions (excluding self) in a colocated chain
+ * @return Map<String, PartitionedRegion> of all partitioned regions (excluding self) in a
+ * colocated chain. Keys are the full paths of the PartitionedRegion values.
* @since GemFire 5.8Beta
*/
public static Map<String, PartitionedRegion> getAllColocationRegions(
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
index 5a0621d..1c9d5b2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
@@ -382,7 +382,8 @@ public class PartitionedRegionClear {
}
protected void assignAllPrimaryBuckets() {
- PartitionRegionHelper.assignBucketsToPartitions(partitionedRegion);
+ PartitionedRegion leader = ColocationHelper.getLeaderRegion(partitionedRegion);
+ PartitionRegionHelper.assignBucketsToPartitions(leader);
}
protected void handleClearFromDepartedMember(InternalDistributedMember departedMember) {