You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2021/02/16 21:11:05 UTC
[geode] branch feature/GEODE-7665 updated: GEODE-8878: PR clear
should also send a lock message to the secondary members. (#5950)
This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-7665 by this push:
new 5156246 GEODE-8878: PR clear should also send a lock message to the secondary members. (#5950)
5156246 is described below
commit 5156246e7e9d5f3a00504b59795d1417011faad5
Author: Jinmei Liao <ji...@pivotal.io>
AuthorDate: Tue Feb 16 13:10:00 2021 -0800
GEODE-8878: PR clear should also send a lock message to the secondary members. (#5950)
---
.../partitioned/PRClearCreateIndexDUnitTest.java | 265 +++++++++++++++++++++
.../apache/geode/internal/cache/BucketRegion.java | 13 +-
.../internal/cache/DistributedClearOperation.java | 10 +-
.../geode/internal/cache/DistributedRegion.java | 42 ++--
.../internal/cache/PartitionedRegionClear.java | 28 ++-
.../cache/PartitionedRegionClearMessage.java | 4 +
.../internal/cache/BucketRegionJUnitTest.java | 10 +-
.../geode/test/junit/rules/MemberStarterRule.java | 9 +
8 files changed, 350 insertions(+), 31 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java
new file mode 100644
index 0000000..1c94c2d
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.cache.query.partitioned;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.stream.IntStream;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.internal.cache.DistributedClearOperation;
+import org.apache.geode.internal.cache.DistributedClearOperation.ClearRegionMessage;
+import org.apache.geode.internal.cache.PartitionedRegionClearMessage;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+public class PRClearCreateIndexDUnitTest implements Serializable {
+ @Rule
+ public ClusterStartupRule cluster = new ClusterStartupRule(4, true);
+
+ private MemberVM primary, secondary;
+ private ClientVM client;
+
+ @Before
+ public void before() throws Exception {
+ int locatorPort = ClusterStartupRule.getDUnitLocatorPort();
+ primary = cluster.startServerVM(0, locatorPort);
+ secondary = cluster.startServerVM(1, locatorPort);
+
+ // create region on server1 first, making sure server1 has the primary bucket
+ primary.invoke(() -> {
+ DistributionMessageObserver.setInstance(new MessageObserver());
+ Region<Object, Object> region =
+ ClusterStartupRule.memberStarter.createPartitionRegion("regionA",
+ f -> f.setTotalNumBuckets(1).setRedundantCopies(1));
+ IntStream.range(0, 100).forEach(i -> region.put(i, "value" + i));
+ });
+
+ // server2 has the secondary bucket
+ secondary.invoke(() -> {
+ DistributionMessageObserver.setInstance(new MessageObserver());
+ ClusterStartupRule.memberStarter.createPartitionRegion("regionA",
+ f -> f.setTotalNumBuckets(1).setRedundantCopies(1));
+ });
+ }
+
+ @After
+ public void after() throws Exception {
+ primary.invoke(() -> {
+ DistributionMessageObserver.setInstance(null);
+ });
+ secondary.invoke(() -> {
+ DistributionMessageObserver.setInstance(null);
+ });
+ }
+
+ // All tests create index on secondary members. These tests are making sure we are requesting
+ // locks for clear on secondary members as well. If we create index on the primary, the clear
+ // and createIndex will run sequentially so there would be no error. But if we create index on
+ // the secondary member and if the secondary member will not
+ // request a lock for clear operation, it will result in an EntryDestroyedException when create
+ // index is happening.
+
+ // Note: OP_LOCK_FOR_CLEAR, OP_CLEAR, OP_UNLOCK_FOR_CLEAR are messages for secondary members
+ // OP_LOCK_FOR_PR_CLEAR, OP_UNLOCK_FOR_PR_CLEAR, OP_PR_CLEAR can be for anybody
+
+ @Test
+ // all local buckets are primary, so only OP_LOCK_FOR_CLEAR and OP_CLEAR messages are sent to the
+ // secondary member
+ // in the end an OP_PR_CLEAR is sent to the secondary for no effect
+ public void clearFromPrimaryMember() throws Exception {
+ AsyncInvocation createIndex = secondary.invokeAsync(PRClearCreateIndexDUnitTest::createIndex);
+ AsyncInvocation clear = primary.invokeAsync(PRClearCreateIndexDUnitTest::clear);
+
+ createIndex.get();
+ clear.get();
+
+ // assert that secondary member received these messages
+ primary.invoke(() -> verifyEvents(false, false, false, false));
+ secondary.invoke(() -> verifyEvents(false, true, true, true));
+ }
+
+ @Test
+ // all local buckets are secondary, so an OP_PR_CLEAR is sent to the primary member, from there
+ // a OP_LOCK_FOR_CLEAR and OP_CLEAR messages are sent back to the secondary
+ public void clearFromSecondaryMember() throws Exception {
+ AsyncInvocation createIndex = secondary.invokeAsync(PRClearCreateIndexDUnitTest::createIndex);
+ AsyncInvocation clear = secondary.invokeAsync(PRClearCreateIndexDUnitTest::clear);
+
+ createIndex.get();
+ clear.get();
+
+ // assert that secondary member received these messages
+ primary.invoke(() -> verifyEvents(false, true, false, false));
+ secondary.invoke(() -> verifyEvents(false, false, true, true));
+ }
+
+ /**
+ * For interested client connecting to secondary member
+ * 1. locks all local primary region
+ * 2. send OP_LOCK_FOR_PR_CLEAR to lock all other members
+ * 3. send OP_PR_CLEAR to primary to clear
+ * 4. primary will send a OP_CLEAR message back to the secondary to clear
+ */
+ @Test
+ public void clearFromInterestedClientConnectingToSecondaryMember() throws Exception {
+ int port = secondary.getPort();
+ client = cluster.startClientVM(2, c -> c.withServerConnection(port).withPoolSubscription(true));
+ AsyncInvocation createIndex = secondary.invokeAsync(PRClearCreateIndexDUnitTest::createIndex);
+
+ AsyncInvocation clear = client.invokeAsync(() -> {
+ Thread.sleep(200);
+ ClientCache clientCache = ClusterStartupRule.getClientCache();
+ Region<Object, Object> regionA =
+ clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY).create("regionA");
+ regionA.registerInterestForAllKeys();
+ regionA.clear();
+ });
+
+ createIndex.get();
+ clear.get();
+ primary.invoke(() -> verifyEvents(true, true, false, false));
+ secondary.invoke(() -> verifyEvents(false, false, true, true));
+ }
+
+ @Test
+ /**
+ * For interested client connecting to primary member, behaves like starting from primary member
+ * except it locks first
+ * 1. locks local primary regions
+ * 2. send OP_LOCK_FOR_PR_CLEAR to lock all other members' primary buckets
+ * 3. send a OP_LOCK_FOR_CLEAR message to lock all secondary buckets
+ * 4. send OP_CLEAR to clear all secondary buckets
+ */
+ public void clearFromInterestedClientConnectingToPrimaryMember() throws Exception {
+ int port = primary.getPort();
+ client = cluster.startClientVM(2, c -> c.withServerConnection(port).withPoolSubscription(true));
+ AsyncInvocation createIndex = secondary.invokeAsync(PRClearCreateIndexDUnitTest::createIndex);
+
+ AsyncInvocation clear = client.invokeAsync(() -> {
+ Thread.sleep(200);
+ ClientCache clientCache = ClusterStartupRule.getClientCache();
+ Region<Object, Object> regionA =
+ clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY).create("regionA");
+ regionA.registerInterestForAllKeys();
+ regionA.clear();
+ });
+
+ createIndex.get();
+ clear.get();
+ primary.invoke(() -> verifyEvents(false, false, false, false));
+ secondary.invoke(() -> verifyEvents(true, true, true, true));
+ }
+
+ private static void clear() throws InterruptedException {
+ // start the clear a bit later that the createIndex operation, to reveal the race condition
+ // comment it out since the test does not need the race condition to happen anymore
+ // Thread.sleep(200);
+ Region region = ClusterStartupRule.getCache().getRegion("/regionA");
+ region.clear();
+ }
+
+ private static void createIndex() {
+ QueryService queryService = ClusterStartupRule.getCache().getQueryService();
+ // run create index multiple times to make sure the clear operation fall inside a
+ // createIndex Operation
+ IntStream.range(0, 10).forEach(i -> {
+ try {
+ queryService.createIndex("index" + i, "name" + i, "/regionA");
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ });
+ }
+
+ private static void verifyEvents(boolean lockOthers, boolean clearOthers, boolean lockSecondary,
+ boolean clearSecondary) {
+ MessageObserver observer = (MessageObserver) DistributionMessageObserver.getInstance();
+ assertThat(observer.isLock_others())
+ .describedAs("OP_LOCK_FOR_PR_CLEAR received: %s", observer.isLock_others())
+ .isEqualTo(lockOthers);
+ assertThat(observer.isClear_others())
+ .describedAs("OP_PR_CLEAR received: %s", observer.isClear_others()).isEqualTo(clearOthers);
+ assertThat(observer.isLock_secondary())
+ .describedAs("OP_LOCK_FOR_CLEAR received: %s", observer.isLock_secondary())
+ .isEqualTo(lockSecondary);
+ assertThat(observer.isClear_secondary())
+ .describedAs("OP_CLEAR received: %s", observer.isClear_secondary())
+ .isEqualTo(clearSecondary);
+ }
+
+ private static class MessageObserver extends DistributionMessageObserver {
+ private volatile boolean lock_secondary = false;
+ private volatile boolean clear_secondary = false;
+ private volatile boolean clear_others = false;
+ private volatile boolean lock_others = false;
+
+ @Override
+ public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
+ if (message instanceof ClearRegionMessage) {
+ ClearRegionMessage clearMessage = (ClearRegionMessage) message;
+ if (clearMessage
+ .getOperationType() == DistributedClearOperation.OperationType.OP_LOCK_FOR_CLEAR) {
+ lock_secondary = true;
+ }
+ if (clearMessage.getOperationType() == DistributedClearOperation.OperationType.OP_CLEAR) {
+ clear_secondary = true;
+ }
+ }
+ if (message instanceof PartitionedRegionClearMessage) {
+ PartitionedRegionClearMessage clearMessage = (PartitionedRegionClearMessage) message;
+ if (clearMessage
+ .getOp() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) {
+ lock_others = true;
+ }
+ if (clearMessage.getOp() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) {
+ clear_others = true;
+ }
+ }
+ }
+
+ public boolean isLock_secondary() {
+ return lock_secondary;
+ }
+
+ public boolean isClear_secondary() {
+ return clear_secondary;
+ }
+
+ public boolean isClear_others() {
+ return clear_others;
+ }
+
+ public boolean isLock_others() {
+ return lock_others;
+ }
+ }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index e0b895f..cb3c548 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -560,6 +560,11 @@ public class BucketRegion extends DistributedRegion implements Bucket {
}
}
+ /**
+ * this starts with a primary bucket, clears it, and distribute a DistributedClearOperation
+ * .OperationType.OP_CLEAR operation to other members.
+ * If this member is not locked yet, lock it and send OP_LOCK_FOR_CLEAR to others first.
+ */
@Override
public void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) {
if (!getBucketAdvisor().isPrimary()) {
@@ -576,9 +581,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
.isLockedForListenerAndClientNotification();
try {
- if (!isLockedAlready) {
- obtainWriteLocksForClear(regionEvent, participants);
- }
+ obtainWriteLocksForClear(regionEvent, participants, isLockedAlready);
// no need to dominate my own rvv.
// Clear is on going here, there won't be GII for this member
clearRegionLocally(regionEvent, cacheWrite, null);
@@ -586,9 +589,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
// TODO: call reindexUserDataRegion if there're lucene indexes
} finally {
- if (!isLockedAlready) {
- releaseWriteLocksForClear(regionEvent, participants);
- }
+ releaseWriteLocksForClear(regionEvent, participants, isLockedAlready);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
index 25cc2f5..4809291 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
@@ -163,6 +163,10 @@ public class DistributedClearOperation extends DistributedCacheOperation {
}
+ /**
+ * this message is to operate on the BucketRegion level, used by the primary member to distribute
+ * clear message to secondary buckets
+ */
public static class ClearRegionMessage extends CacheOperationMessage {
protected EventID eventID;
@@ -186,6 +190,10 @@ public class DistributedClearOperation extends DistributedCacheOperation {
return OperationExecutors.HIGH_PRIORITY_EXECUTOR;
}
+ public OperationType getOperationType() {
+ return clearOp;
+ }
+
@Override
protected InternalCacheEvent createEvent(DistributedRegion rgn) throws EntryNotFoundException {
RegionEventImpl event = createRegionEvent(rgn);
@@ -211,7 +219,7 @@ public class DistributedClearOperation extends DistributedCacheOperation {
switch (this.clearOp) {
case OP_CLEAR:
region.clearRegionLocally((RegionEventImpl) event, false, this.rvv);
- region.notifyBridgeClients((RegionEventImpl) event);
+ region.notifyBridgeClients(event);
this.appliedOperation = true;
break;
case OP_LOCK_FOR_CLEAR:
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index d0035fa..bd3ad48 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2027,13 +2027,13 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
getCacheDistributionAdvisor().adviseInvalidateRegion();
// pause all generation of versions and flush from the other members to this one
try {
- obtainWriteLocksForClear(regionEvent, participants);
+ obtainWriteLocksForClear(regionEvent, participants, false);
clearRegionLocally(regionEvent, cacheWrite, null);
if (!regionEvent.isOriginRemote() && regionEvent.getOperation().isDistributed()) {
distributeClearOperation(regionEvent, null, participants);
}
} finally {
- releaseWriteLocksForClear(regionEvent, participants);
+ releaseWriteLocksForClear(regionEvent, participants, false);
}
} finally {
distributedUnlockForClear();
@@ -2043,7 +2043,7 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
getCacheDistributionAdvisor().adviseInvalidateRegion();
clearRegionLocally(regionEvent, cacheWrite, null);
if (!regionEvent.isOriginRemote() && regionEvent.getOperation().isDistributed()) {
- DistributedClearOperation.clear(regionEvent, null, participants);
+ distributeClearOperation(regionEvent, null, participants);
}
}
}
@@ -2087,11 +2087,28 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
* obtain locks preventing generation of new versions in other members
*/
protected void obtainWriteLocksForClear(RegionEventImpl regionEvent,
- Set<InternalDistributedMember> participants) {
- lockLocallyForClear(getDistributionManager(), getMyId(), regionEvent);
- if (!isUsedForPartitionedRegionBucket()) {
- DistributedClearOperation.lockAndFlushToOthers(regionEvent, participants);
+ Set<InternalDistributedMember> participants, boolean localLockedAlready) {
+ if (!localLockedAlready) {
+ lockLocallyForClear(getDistributionManager(), getMyId(), regionEvent);
}
+ lockAndFlushClearToOthers(regionEvent, participants);
+ }
+
+ /**
+ * releases the locks obtained in obtainWriteLocksForClear
+ */
+ protected void releaseWriteLocksForClear(RegionEventImpl regionEvent,
+ Set<InternalDistributedMember> participants,
+ boolean localLockedAlready) {
+ if (!localLockedAlready) {
+ releaseLockLocallyForClear(regionEvent);
+ }
+ DistributedClearOperation.releaseLocks(regionEvent, participants);
+ }
+
+ void lockAndFlushClearToOthers(RegionEventImpl regionEvent,
+ Set<InternalDistributedMember> participants) {
+ DistributedClearOperation.lockAndFlushToOthers(regionEvent, participants);
}
/**
@@ -2125,17 +2142,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
}
}
- /**
- * releases the locks obtained in obtainWriteLocksForClear
- */
- protected void releaseWriteLocksForClear(RegionEventImpl regionEvent,
- Set<InternalDistributedMember> participants) {
- releaseLockLocallyForClear(regionEvent);
- if (!isUsedForPartitionedRegionBucket()) {
- DistributedClearOperation.releaseLocks(regionEvent, participants);
- }
- }
-
protected void releaseLockLocallyForClear(RegionEventImpl regionEvent) {
ARMLockTestHook armLockTestHook = getRegionMap().getARMLockTestHook();
if (armLockTestHook != null) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
index 2bec6f2..539f682 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
@@ -86,22 +86,37 @@ public class PartitionedRegionClear {
return partitionedRegionClearListener;
}
+ /**
+ * only called if there are any listeners or clients interested.
+ */
void obtainLockForClear(RegionEventImpl event) {
obtainClearLockLocal(partitionedRegion.getDistributionManager().getId());
sendPartitionedRegionClearMessage(event,
PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR);
}
+ /**
+ * only called if there are any listeners or clients interested.
+ */
void releaseLockForClear(RegionEventImpl event) {
releaseClearLockLocal();
sendPartitionedRegionClearMessage(event,
PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR);
}
+ /**
+ * clears local primaries and send message to remote primaries to clear
+ */
Set<Integer> clearRegion(RegionEventImpl regionEvent) {
- Set<Integer> allBucketsCleared = new HashSet<>(clearRegionLocal(regionEvent));
- allBucketsCleared.addAll(sendPartitionedRegionClearMessage(regionEvent,
- PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR));
+ // this includes all local primary buckets and their remote secondaries
+ Set<Integer> localPrimaryBuckets = clearRegionLocal(regionEvent);
+ // this includes all remote primary buckets and their secondaries
+ Set<Integer> remotePrimaryBuckets = sendPartitionedRegionClearMessage(regionEvent,
+ PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR);
+
+ Set<Integer> allBucketsCleared = new HashSet<>();
+ allBucketsCleared.addAll(localPrimaryBuckets);
+ allBucketsCleared.addAll(remotePrimaryBuckets);
return allBucketsCleared;
}
@@ -124,6 +139,10 @@ public class PartitionedRegionClear {
} while (retry);
}
+ /**
+ * this clears all local primary buckets (each will distribute the clear operation to its
+ * secondary members) and all of their remote secondaries
+ */
public Set<Integer> clearRegionLocal(RegionEventImpl regionEvent) {
Set<Integer> clearedBuckets = new HashSet<>();
long clearStartTime = System.nanoTime();
@@ -209,6 +228,9 @@ public class PartitionedRegionClear {
partitionedRegion.notifyBridgeClients(event);
}
+ /**
+ * obtain locks for all local buckets
+ */
protected void obtainClearLockLocal(InternalDistributedMember requester) {
synchronized (lockForListenerAndClientNotification) {
// Check if the member is still part of the distributed system
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java
index b48c9ee..724256b 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java
@@ -41,6 +41,10 @@ import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.logging.internal.log4j.api.LogService;
+/**
+ * this message is for operations no the partition region level, could be sent by any originating
+ * member to the other members hosting this partition region
+ */
public class PartitionedRegionClearMessage extends PartitionMessage {
public enum OperationType {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
index d3397eb..0d1cc87 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
@@ -159,6 +159,7 @@ public class BucketRegionJUnitTest extends DistributedRegionJUnitTest {
doReturn(ba).when(region).getBucketAdvisor();
doNothing().when(region).distributeClearOperation(any(), any(), any());
doNothing().when(region).lockLocallyForClear(any(), any(), any());
+ doNothing().when(region).lockAndFlushClearToOthers(any(), any());
doNothing().when(region).clearRegionLocally(event, true, null);
when(ba.isPrimary()).thenReturn(true);
region.cmnClearRegion(event, true, true);
@@ -174,6 +175,7 @@ public class BucketRegionJUnitTest extends DistributedRegionJUnitTest {
doReturn(ba).when(region).getBucketAdvisor();
doNothing().when(region).distributeClearOperation(any(), any(), any());
doNothing().when(region).lockLocallyForClear(any(), any(), any());
+ doNothing().when(region).lockAndFlushClearToOthers(any(), any());
doNothing().when(region).clearRegionLocally(event, true, null);
when(ba.isPrimary()).thenReturn(true);
region.cmnClearRegion(event, true, true);
@@ -181,12 +183,14 @@ public class BucketRegionJUnitTest extends DistributedRegionJUnitTest {
}
@Test
- public void obtainWriteLocksForClearInBRShouldNotDistribute() {
+ public void obtainWriteLocksForClearInBRShouldDistribute() {
RegionEventImpl event = createClearRegionEvent();
BucketRegion region = (BucketRegion) event.getRegion();
doNothing().when(region).lockLocallyForClear(any(), any(), any());
- region.obtainWriteLocksForClear(event, null);
- assertTrue(region.isUsedForPartitionedRegionBucket());
+ doNothing().when(region).lockAndFlushClearToOthers(any(), any());
+ region.obtainWriteLocksForClear(event, null, false);
+ verify(region).lockLocallyForClear(any(), any(), eq(event));
+ verify(region).lockAndFlushClearToOthers(eq(event), eq(null));
}
@Test
diff --git a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/MemberStarterRule.java b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
index ff0d7f2..58aa3fb 100644
--- a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
+++ b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
@@ -440,6 +440,15 @@ public abstract class MemberStarterRule<T> extends SerializableExternalResource
});
}
+ public Region createPartitionRegion(String name,
+ Consumer<PartitionAttributesFactory> attributesFactoryConsumer) {
+ return createRegion(RegionShortcut.PARTITION, name, rf -> {
+ PartitionAttributesFactory attributeFactory = new PartitionAttributesFactory();
+ attributesFactoryConsumer.accept(attributeFactory);
+ rf.setPartitionAttributes(attributeFactory.create());
+ });
+ }
+
public void waitTillCacheClientProxyHasBeenPaused() {
await().until(() -> {
CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance();