You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2021/02/16 21:11:05 UTC

[geode] branch feature/GEODE-7665 updated: GEODE-8878: PR clear should also send a lock message to the secondary members. (#5950)

This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-7665 by this push:
     new 5156246  GEODE-8878: PR clear should also send a lock message to the secondary members. (#5950)
5156246 is described below

commit 5156246e7e9d5f3a00504b59795d1417011faad5
Author: Jinmei Liao <ji...@pivotal.io>
AuthorDate: Tue Feb 16 13:10:00 2021 -0800

    GEODE-8878: PR clear should also send a lock message to the secondary members. (#5950)
---
 .../partitioned/PRClearCreateIndexDUnitTest.java   | 265 +++++++++++++++++++++
 .../apache/geode/internal/cache/BucketRegion.java  |  13 +-
 .../internal/cache/DistributedClearOperation.java  |  10 +-
 .../geode/internal/cache/DistributedRegion.java    |  42 ++--
 .../internal/cache/PartitionedRegionClear.java     |  28 ++-
 .../cache/PartitionedRegionClearMessage.java       |   4 +
 .../internal/cache/BucketRegionJUnitTest.java      |  10 +-
 .../geode/test/junit/rules/MemberStarterRule.java  |   9 +
 8 files changed, 350 insertions(+), 31 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java
new file mode 100644
index 0000000..1c94c2d
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.cache.query.partitioned;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.stream.IntStream;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.internal.cache.DistributedClearOperation;
+import org.apache.geode.internal.cache.DistributedClearOperation.ClearRegionMessage;
+import org.apache.geode.internal.cache.PartitionedRegionClearMessage;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+public class PRClearCreateIndexDUnitTest implements Serializable {
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule(4, true);
+
+  private MemberVM primary, secondary;
+  private ClientVM client;
+
+  @Before
+  public void before() throws Exception {
+    int locatorPort = ClusterStartupRule.getDUnitLocatorPort();
+    primary = cluster.startServerVM(0, locatorPort);
+    secondary = cluster.startServerVM(1, locatorPort);
+
+    // create region on server1 first, making sure server1 has the primary bucket
+    primary.invoke(() -> {
+      DistributionMessageObserver.setInstance(new MessageObserver());
+      Region<Object, Object> region =
+          ClusterStartupRule.memberStarter.createPartitionRegion("regionA",
+              f -> f.setTotalNumBuckets(1).setRedundantCopies(1));
+      IntStream.range(0, 100).forEach(i -> region.put(i, "value" + i));
+    });
+
+    // server2 has the secondary bucket
+    secondary.invoke(() -> {
+      DistributionMessageObserver.setInstance(new MessageObserver());
+      ClusterStartupRule.memberStarter.createPartitionRegion("regionA",
+          f -> f.setTotalNumBuckets(1).setRedundantCopies(1));
+    });
+  }
+
+  @After
+  public void after() throws Exception {
+    primary.invoke(() -> {
+      DistributionMessageObserver.setInstance(null);
+    });
+    secondary.invoke(() -> {
+      DistributionMessageObserver.setInstance(null);
+    });
+  }
+
+  // All tests create index on secondary members. These tests are making sure we are requesting
+  // locks for clear on secondary members as well. If we create index on the primary, the clear
+  // and createIndex will run sequentially so there would be no error. But if we create index on
+  // the secondary member and if the secondary member will not
+  // request a lock for clear operation, it will result in an EntryDestroyedException when create
+  // index is happening.
+
+  // Note: OP_LOCK_FOR_CLEAR, OP_CLEAR, OP_UNLOCK_FOR_CLEAR are messages for secondary members
+  // OP_LOCK_FOR_PR_CLEAR, OP_UNLOCK_FOR_PR_CLEAR, OP_PR_CLEAR can be for anybody
+
+  @Test
+  // all local buckets are primary, so only OP_LOCK_FOR_CLEAR and OP_CLEAR messages are sent to the
+  // secondary member
+  // in the end an OP_PR_CLEAR is sent to the secondary for no effect
+  public void clearFromPrimaryMember() throws Exception {
+    AsyncInvocation createIndex = secondary.invokeAsync(PRClearCreateIndexDUnitTest::createIndex);
+    AsyncInvocation clear = primary.invokeAsync(PRClearCreateIndexDUnitTest::clear);
+
+    createIndex.get();
+    clear.get();
+
+    // assert that secondary member received these messages
+    primary.invoke(() -> verifyEvents(false, false, false, false));
+    secondary.invoke(() -> verifyEvents(false, true, true, true));
+  }
+
+  @Test
+  // all local buckets are secondary, so an OP_PR_CLEAR is sent to the primary member, from there
+  // a OP_LOCK_FOR_CLEAR and OP_CLEAR messages are sent back to the secondary
+  public void clearFromSecondaryMember() throws Exception {
+    AsyncInvocation createIndex = secondary.invokeAsync(PRClearCreateIndexDUnitTest::createIndex);
+    AsyncInvocation clear = secondary.invokeAsync(PRClearCreateIndexDUnitTest::clear);
+
+    createIndex.get();
+    clear.get();
+
+    // assert that secondary member received these messages
+    primary.invoke(() -> verifyEvents(false, true, false, false));
+    secondary.invoke(() -> verifyEvents(false, false, true, true));
+  }
+
+  /**
+   * For interested client connecting to secondary member
+   * 1. locks all local primary region
+   * 2. send OP_LOCK_FOR_PR_CLEAR to lock all other members
+   * 3. send OP_PR_CLEAR to primary to clear
+   * 4. primary will send a OP_CLEAR message back to the secondary to clear
+   */
+  @Test
+  public void clearFromInterestedClientConnectingToSecondaryMember() throws Exception {
+    int port = secondary.getPort();
+    client = cluster.startClientVM(2, c -> c.withServerConnection(port).withPoolSubscription(true));
+    AsyncInvocation createIndex = secondary.invokeAsync(PRClearCreateIndexDUnitTest::createIndex);
+
+    AsyncInvocation clear = client.invokeAsync(() -> {
+      Thread.sleep(200);
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      Region<Object, Object> regionA =
+          clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY).create("regionA");
+      regionA.registerInterestForAllKeys();
+      regionA.clear();
+    });
+
+    createIndex.get();
+    clear.get();
+    primary.invoke(() -> verifyEvents(true, true, false, false));
+    secondary.invoke(() -> verifyEvents(false, false, true, true));
+  }
+
+  @Test
+  /**
+   * For interested client connecting to primary member, behaves like starting from primary member
+   * except it locks first
+   * 1. locks local primary regions
+   * 2. send OP_LOCK_FOR_PR_CLEAR to lock all other members' primary buckets
+   * 3. send a OP_LOCK_FOR_CLEAR message to lock all secondary buckets
+   * 4. send OP_CLEAR to clear all secondary buckets
+   */
+  public void clearFromInterestedClientConnectingToPrimaryMember() throws Exception {
+    int port = primary.getPort();
+    client = cluster.startClientVM(2, c -> c.withServerConnection(port).withPoolSubscription(true));
+    AsyncInvocation createIndex = secondary.invokeAsync(PRClearCreateIndexDUnitTest::createIndex);
+
+    AsyncInvocation clear = client.invokeAsync(() -> {
+      Thread.sleep(200);
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      Region<Object, Object> regionA =
+          clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY).create("regionA");
+      regionA.registerInterestForAllKeys();
+      regionA.clear();
+    });
+
+    createIndex.get();
+    clear.get();
+    primary.invoke(() -> verifyEvents(false, false, false, false));
+    secondary.invoke(() -> verifyEvents(true, true, true, true));
+  }
+
+  private static void clear() throws InterruptedException {
+    // start the clear a bit later that the createIndex operation, to reveal the race condition
+    // comment it out since the test does not need the race condition to happen anymore
+    // Thread.sleep(200);
+    Region region = ClusterStartupRule.getCache().getRegion("/regionA");
+    region.clear();
+  }
+
+  private static void createIndex() {
+    QueryService queryService = ClusterStartupRule.getCache().getQueryService();
+    // run create index multiple times to make sure the clear operation fall inside a
+    // createIndex Operation
+    IntStream.range(0, 10).forEach(i -> {
+      try {
+        queryService.createIndex("index" + i, "name" + i, "/regionA");
+      } catch (Exception e) {
+        throw new RuntimeException(e.getMessage(), e);
+      }
+    });
+  }
+
+  private static void verifyEvents(boolean lockOthers, boolean clearOthers, boolean lockSecondary,
+      boolean clearSecondary) {
+    MessageObserver observer = (MessageObserver) DistributionMessageObserver.getInstance();
+    assertThat(observer.isLock_others())
+        .describedAs("OP_LOCK_FOR_PR_CLEAR received: %s", observer.isLock_others())
+        .isEqualTo(lockOthers);
+    assertThat(observer.isClear_others())
+        .describedAs("OP_PR_CLEAR received: %s", observer.isClear_others()).isEqualTo(clearOthers);
+    assertThat(observer.isLock_secondary())
+        .describedAs("OP_LOCK_FOR_CLEAR received: %s", observer.isLock_secondary())
+        .isEqualTo(lockSecondary);
+    assertThat(observer.isClear_secondary())
+        .describedAs("OP_CLEAR received: %s", observer.isClear_secondary())
+        .isEqualTo(clearSecondary);
+  }
+
+  private static class MessageObserver extends DistributionMessageObserver {
+    private volatile boolean lock_secondary = false;
+    private volatile boolean clear_secondary = false;
+    private volatile boolean clear_others = false;
+    private volatile boolean lock_others = false;
+
+    @Override
+    public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
+      if (message instanceof ClearRegionMessage) {
+        ClearRegionMessage clearMessage = (ClearRegionMessage) message;
+        if (clearMessage
+            .getOperationType() == DistributedClearOperation.OperationType.OP_LOCK_FOR_CLEAR) {
+          lock_secondary = true;
+        }
+        if (clearMessage.getOperationType() == DistributedClearOperation.OperationType.OP_CLEAR) {
+          clear_secondary = true;
+        }
+      }
+      if (message instanceof PartitionedRegionClearMessage) {
+        PartitionedRegionClearMessage clearMessage = (PartitionedRegionClearMessage) message;
+        if (clearMessage
+            .getOp() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) {
+          lock_others = true;
+        }
+        if (clearMessage.getOp() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) {
+          clear_others = true;
+        }
+      }
+    }
+
+    public boolean isLock_secondary() {
+      return lock_secondary;
+    }
+
+    public boolean isClear_secondary() {
+      return clear_secondary;
+    }
+
+    public boolean isClear_others() {
+      return clear_others;
+    }
+
+    public boolean isLock_others() {
+      return lock_others;
+    }
+  }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index e0b895f..cb3c548 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -560,6 +560,11 @@ public class BucketRegion extends DistributedRegion implements Bucket {
     }
   }
 
+  /**
+   * this starts with a primary bucket, clears it, and distribute a DistributedClearOperation
+   * .OperationType.OP_CLEAR operation to other members.
+   * If this member is not locked yet, lock it and send OP_LOCK_FOR_CLEAR to others first.
+   */
   @Override
   public void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) {
     if (!getBucketAdvisor().isPrimary()) {
@@ -576,9 +581,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
         .isLockedForListenerAndClientNotification();
 
     try {
-      if (!isLockedAlready) {
-        obtainWriteLocksForClear(regionEvent, participants);
-      }
+      obtainWriteLocksForClear(regionEvent, participants, isLockedAlready);
       // no need to dominate my own rvv.
       // Clear is on going here, there won't be GII for this member
       clearRegionLocally(regionEvent, cacheWrite, null);
@@ -586,9 +589,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
 
       // TODO: call reindexUserDataRegion if there're lucene indexes
     } finally {
-      if (!isLockedAlready) {
-        releaseWriteLocksForClear(regionEvent, participants);
-      }
+      releaseWriteLocksForClear(regionEvent, participants, isLockedAlready);
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
index 25cc2f5..4809291 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
@@ -163,6 +163,10 @@ public class DistributedClearOperation extends DistributedCacheOperation {
   }
 
 
+  /**
+   * this message is to operate on the BucketRegion level, used by the primary member to distribute
+   * clear message to secondary buckets
+   */
   public static class ClearRegionMessage extends CacheOperationMessage {
 
     protected EventID eventID;
@@ -186,6 +190,10 @@ public class DistributedClearOperation extends DistributedCacheOperation {
       return OperationExecutors.HIGH_PRIORITY_EXECUTOR;
     }
 
+    public OperationType getOperationType() {
+      return clearOp;
+    }
+
     @Override
     protected InternalCacheEvent createEvent(DistributedRegion rgn) throws EntryNotFoundException {
       RegionEventImpl event = createRegionEvent(rgn);
@@ -211,7 +219,7 @@ public class DistributedClearOperation extends DistributedCacheOperation {
       switch (this.clearOp) {
         case OP_CLEAR:
           region.clearRegionLocally((RegionEventImpl) event, false, this.rvv);
-          region.notifyBridgeClients((RegionEventImpl) event);
+          region.notifyBridgeClients(event);
           this.appliedOperation = true;
           break;
         case OP_LOCK_FOR_CLEAR:
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index d0035fa..bd3ad48 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2027,13 +2027,13 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
               getCacheDistributionAdvisor().adviseInvalidateRegion();
           // pause all generation of versions and flush from the other members to this one
           try {
-            obtainWriteLocksForClear(regionEvent, participants);
+            obtainWriteLocksForClear(regionEvent, participants, false);
             clearRegionLocally(regionEvent, cacheWrite, null);
             if (!regionEvent.isOriginRemote() && regionEvent.getOperation().isDistributed()) {
               distributeClearOperation(regionEvent, null, participants);
             }
           } finally {
-            releaseWriteLocksForClear(regionEvent, participants);
+            releaseWriteLocksForClear(regionEvent, participants, false);
           }
         } finally {
           distributedUnlockForClear();
@@ -2043,7 +2043,7 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
             getCacheDistributionAdvisor().adviseInvalidateRegion();
         clearRegionLocally(regionEvent, cacheWrite, null);
         if (!regionEvent.isOriginRemote() && regionEvent.getOperation().isDistributed()) {
-          DistributedClearOperation.clear(regionEvent, null, participants);
+          distributeClearOperation(regionEvent, null, participants);
         }
       }
     }
@@ -2087,11 +2087,28 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
    * obtain locks preventing generation of new versions in other members
    */
   protected void obtainWriteLocksForClear(RegionEventImpl regionEvent,
-      Set<InternalDistributedMember> participants) {
-    lockLocallyForClear(getDistributionManager(), getMyId(), regionEvent);
-    if (!isUsedForPartitionedRegionBucket()) {
-      DistributedClearOperation.lockAndFlushToOthers(regionEvent, participants);
+      Set<InternalDistributedMember> participants, boolean localLockedAlready) {
+    if (!localLockedAlready) {
+      lockLocallyForClear(getDistributionManager(), getMyId(), regionEvent);
     }
+    lockAndFlushClearToOthers(regionEvent, participants);
+  }
+
+  /**
+   * releases the locks obtained in obtainWriteLocksForClear
+   */
+  protected void releaseWriteLocksForClear(RegionEventImpl regionEvent,
+      Set<InternalDistributedMember> participants,
+      boolean localLockedAlready) {
+    if (!localLockedAlready) {
+      releaseLockLocallyForClear(regionEvent);
+    }
+    DistributedClearOperation.releaseLocks(regionEvent, participants);
+  }
+
+  void lockAndFlushClearToOthers(RegionEventImpl regionEvent,
+      Set<InternalDistributedMember> participants) {
+    DistributedClearOperation.lockAndFlushToOthers(regionEvent, participants);
   }
 
   /**
@@ -2125,17 +2142,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
     }
   }
 
-  /**
-   * releases the locks obtained in obtainWriteLocksForClear
-   */
-  protected void releaseWriteLocksForClear(RegionEventImpl regionEvent,
-      Set<InternalDistributedMember> participants) {
-    releaseLockLocallyForClear(regionEvent);
-    if (!isUsedForPartitionedRegionBucket()) {
-      DistributedClearOperation.releaseLocks(regionEvent, participants);
-    }
-  }
-
   protected void releaseLockLocallyForClear(RegionEventImpl regionEvent) {
     ARMLockTestHook armLockTestHook = getRegionMap().getARMLockTestHook();
     if (armLockTestHook != null) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
index 2bec6f2..539f682 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
@@ -86,22 +86,37 @@ public class PartitionedRegionClear {
     return partitionedRegionClearListener;
   }
 
+  /**
+   * only called if there are any listeners or clients interested.
+   */
   void obtainLockForClear(RegionEventImpl event) {
     obtainClearLockLocal(partitionedRegion.getDistributionManager().getId());
     sendPartitionedRegionClearMessage(event,
         PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR);
   }
 
+  /**
+   * only called if there are any listeners or clients interested.
+   */
   void releaseLockForClear(RegionEventImpl event) {
     releaseClearLockLocal();
     sendPartitionedRegionClearMessage(event,
         PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR);
   }
 
+  /**
+   * clears local primaries and send message to remote primaries to clear
+   */
   Set<Integer> clearRegion(RegionEventImpl regionEvent) {
-    Set<Integer> allBucketsCleared = new HashSet<>(clearRegionLocal(regionEvent));
-    allBucketsCleared.addAll(sendPartitionedRegionClearMessage(regionEvent,
-        PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR));
+    // this includes all local primary buckets and their remote secondaries
+    Set<Integer> localPrimaryBuckets = clearRegionLocal(regionEvent);
+    // this includes all remote primary buckets and their secondaries
+    Set<Integer> remotePrimaryBuckets = sendPartitionedRegionClearMessage(regionEvent,
+        PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR);
+
+    Set<Integer> allBucketsCleared = new HashSet<>();
+    allBucketsCleared.addAll(localPrimaryBuckets);
+    allBucketsCleared.addAll(remotePrimaryBuckets);
     return allBucketsCleared;
   }
 
@@ -124,6 +139,10 @@ public class PartitionedRegionClear {
     } while (retry);
   }
 
+  /**
+   * this clears all local primary buckets (each will distribute the clear operation to its
+   * secondary members) and all of their remote secondaries
+   */
   public Set<Integer> clearRegionLocal(RegionEventImpl regionEvent) {
     Set<Integer> clearedBuckets = new HashSet<>();
     long clearStartTime = System.nanoTime();
@@ -209,6 +228,9 @@ public class PartitionedRegionClear {
     partitionedRegion.notifyBridgeClients(event);
   }
 
+  /**
+   * obtain locks for all local buckets
+   */
   protected void obtainClearLockLocal(InternalDistributedMember requester) {
     synchronized (lockForListenerAndClientNotification) {
       // Check if the member is still part of the distributed system
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java
index b48c9ee..724256b 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java
@@ -41,6 +41,10 @@ import org.apache.geode.internal.serialization.DeserializationContext;
 import org.apache.geode.internal.serialization.SerializationContext;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
+/**
+ * this message is for operations no the partition region level, could be sent by any originating
+ * member to the other members hosting this partition region
+ */
 public class PartitionedRegionClearMessage extends PartitionMessage {
 
   public enum OperationType {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
index d3397eb..0d1cc87 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
@@ -159,6 +159,7 @@ public class BucketRegionJUnitTest extends DistributedRegionJUnitTest {
     doReturn(ba).when(region).getBucketAdvisor();
     doNothing().when(region).distributeClearOperation(any(), any(), any());
     doNothing().when(region).lockLocallyForClear(any(), any(), any());
+    doNothing().when(region).lockAndFlushClearToOthers(any(), any());
     doNothing().when(region).clearRegionLocally(event, true, null);
     when(ba.isPrimary()).thenReturn(true);
     region.cmnClearRegion(event, true, true);
@@ -174,6 +175,7 @@ public class BucketRegionJUnitTest extends DistributedRegionJUnitTest {
     doReturn(ba).when(region).getBucketAdvisor();
     doNothing().when(region).distributeClearOperation(any(), any(), any());
     doNothing().when(region).lockLocallyForClear(any(), any(), any());
+    doNothing().when(region).lockAndFlushClearToOthers(any(), any());
     doNothing().when(region).clearRegionLocally(event, true, null);
     when(ba.isPrimary()).thenReturn(true);
     region.cmnClearRegion(event, true, true);
@@ -181,12 +183,14 @@ public class BucketRegionJUnitTest extends DistributedRegionJUnitTest {
   }
 
   @Test
-  public void obtainWriteLocksForClearInBRShouldNotDistribute() {
+  public void obtainWriteLocksForClearInBRShouldDistribute() {
     RegionEventImpl event = createClearRegionEvent();
     BucketRegion region = (BucketRegion) event.getRegion();
     doNothing().when(region).lockLocallyForClear(any(), any(), any());
-    region.obtainWriteLocksForClear(event, null);
-    assertTrue(region.isUsedForPartitionedRegionBucket());
+    doNothing().when(region).lockAndFlushClearToOthers(any(), any());
+    region.obtainWriteLocksForClear(event, null, false);
+    verify(region).lockLocallyForClear(any(), any(), eq(event));
+    verify(region).lockAndFlushClearToOthers(eq(event), eq(null));
   }
 
   @Test
diff --git a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/MemberStarterRule.java b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
index ff0d7f2..58aa3fb 100644
--- a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
+++ b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
@@ -440,6 +440,15 @@ public abstract class MemberStarterRule<T> extends SerializableExternalResource
     });
   }
 
+  public Region createPartitionRegion(String name,
+      Consumer<PartitionAttributesFactory> attributesFactoryConsumer) {
+    return createRegion(RegionShortcut.PARTITION, name, rf -> {
+      PartitionAttributesFactory attributeFactory = new PartitionAttributesFactory();
+      attributesFactoryConsumer.accept(attributeFactory);
+      rf.setPartitionAttributes(attributeFactory.create());
+    });
+  }
+
   public void waitTillCacheClientProxyHasBeenPaused() {
     await().until(() -> {
       CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance();