You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2020/10/02 19:28:59 UTC

[geode] 13/19: GEODE-8173: Add unit test (coverage) for PartitionedRegionClear class. (#5208)

This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 271054561dd476578e69e2514fd204b132edb236
Author: agingade <ag...@pivotal.io>
AuthorDate: Mon Jun 8 10:23:50 2020 -0700

    GEODE-8173: Add unit test (coverage) for PartitionedRegionClear class. (#5208)
    
    * GEODE-8173: Add unit test (coverage) for PartitionedRegionClear class.
    Co-authored-by: anilkumar gingade <an...@anilg.local>
---
 .../cache/PRCacheListenerDistributedTest.java      | 337 +++++++++++-
 .../ReplicateCacheListenerDistributedTest.java     |   4 +-
 .../geode/internal/cache/PartitionedRegion.java    |   2 +-
 .../internal/cache/PartitionedRegionClear.java     |  83 ++-
 .../internal/cache/PartitionedRegionClearTest.java | 611 +++++++++++++++++++++
 5 files changed, 999 insertions(+), 38 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java
index f4a9ac9..7d95473 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java
@@ -17,10 +17,18 @@ package org.apache.geode.cache;
 import static org.apache.geode.test.dunit.VM.getVM;
 import static org.apache.geode.test.dunit.VM.getVMCount;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
 
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
 
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -28,7 +36,13 @@ import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
 import org.junit.runners.Parameterized.UseParametersRunnerFactory;
 
+import org.apache.geode.cache.util.CacheListenerAdapter;
 import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.dunit.rules.SharedCountersRule;
+import org.apache.geode.test.dunit.rules.SharedErrorCollector;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
 import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
 
 /**
@@ -43,7 +57,28 @@ import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactor
 @RunWith(Parameterized.class)
 @UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
 @SuppressWarnings("serial")
-public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistributedTest {
+public class PRCacheListenerDistributedTest implements Serializable {
+
+  protected static final String CLEAR = "CLEAR";
+  protected static final String REGION_DESTROY = "REGION_DESTROY";
+  private static final String CREATES = "CREATES";
+  private static final String UPDATES = "UPDATES";
+  private static final String INVALIDATES = "INVALIDATES";
+  private static final String DESTROYS = "DESTROYS";
+  private static final int ENTRY_VALUE = 0;
+  private static final int UPDATED_ENTRY_VALUE = 1;
+  private static final String KEY = "key-1";
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule();
+  @Rule
+  public CacheRule cacheRule = CacheRule.builder().createCacheInAll().build();
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+  @Rule
+  public SharedCountersRule sharedCountersRule = new SharedCountersRule();
+  @Rule
+  public SharedErrorCollector errorCollector = new SharedErrorCollector();
+  protected String regionName;
 
   @Parameters
   public static Collection<Object[]> data() {
@@ -59,7 +94,6 @@ public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistri
   @Parameter(1)
   public Boolean withData;
 
-  @Override
   protected Region<String, Integer> createRegion(final String name,
       final CacheListener<String, Integer> listener) {
     return createPartitionedRegion(name, listener, false);
@@ -99,22 +133,18 @@ public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistri
     }
   }
 
-  @Override
   protected int expectedCreates() {
     return 1;
   }
 
-  @Override
   protected int expectedUpdates() {
     return 1;
   }
 
-  @Override
   protected int expectedInvalidates() {
     return 1;
   }
 
-  @Override
   protected int expectedDestroys() {
     return 1;
   }
@@ -132,7 +162,8 @@ public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistri
 
     region.destroyRegion();
 
-    assertThat(sharedCountersRule.getTotal(REGION_DESTROY)).isEqualTo(expectedRegionDestroys());
+    assertThat(sharedCountersRule.getTotal(REGION_DESTROY))
+        .isGreaterThanOrEqualTo(expectedRegionDestroys());
   }
 
   @Test
@@ -321,4 +352,296 @@ public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistri
     assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(1);
   }
 
+  @Before
+  public void setUp() {
+    regionName = getClass().getSimpleName();
+
+    sharedCountersRule.initialize(CREATES);
+    sharedCountersRule.initialize(DESTROYS);
+    sharedCountersRule.initialize(INVALIDATES);
+    sharedCountersRule.initialize(UPDATES);
+    sharedCountersRule.initialize(CLEAR);
+    sharedCountersRule.initialize(REGION_DESTROY);
+  }
+
+  @Test
+  public void afterCreateIsInvokedInEveryMember() {
+    CacheListener<String, Integer> listener = new CreateCountingCacheListener();
+    Region<String, Integer> region = createRegion(regionName, listener);
+    for (int i = 0; i < getVMCount(); i++) {
+      getVM(i).invoke(() -> {
+        createRegion(regionName, listener);
+      });
+    }
+
+    region.put(KEY, ENTRY_VALUE, cacheRule.getSystem().getDistributedMember());
+
+    assertThat(sharedCountersRule.getTotal(CREATES)).isEqualTo(expectedCreates());
+  }
+
+  @Test
+  public void afterUpdateIsInvokedInEveryMember() {
+    CacheListener<String, Integer> listener = new UpdateCountingCacheListener();
+    Region<String, Integer> region = createRegion(regionName, listener);
+    for (int i = 0; i < getVMCount(); i++) {
+      getVM(i).invoke(() -> {
+        createRegion(regionName, listener);
+      });
+    }
+
+    region.put(KEY, ENTRY_VALUE, cacheRule.getSystem().getDistributedMember());
+    region.put(KEY, UPDATED_ENTRY_VALUE, cacheRule.getSystem().getDistributedMember());
+
+    assertThat(sharedCountersRule.getTotal(UPDATES)).isEqualTo(expectedUpdates());
+  }
+
+  @Test
+  public void afterInvalidateIsInvokedInEveryMember() {
+    CacheListener<String, Integer> listener = new InvalidateCountingCacheListener();
+    Region<String, Integer> region = createRegion(regionName, listener);
+    for (int i = 0; i < getVMCount(); i++) {
+      getVM(i).invoke(() -> {
+        createRegion(regionName, listener);
+      });
+    }
+
+    region.put(KEY, 0, cacheRule.getSystem().getDistributedMember());
+    region.invalidate(KEY);
+
+    assertThat(sharedCountersRule.getTotal(INVALIDATES)).isEqualTo(expectedInvalidates());
+    assertThat(region.get(KEY)).isNull();
+  }
+
+  @Test
+  public void afterDestroyIsInvokedInEveryMember() {
+    CacheListener<String, Integer> listener = new DestroyCountingCacheListener();
+    Region<String, Integer> region = createRegion(regionName, listener);
+    for (int i = 0; i < getVMCount(); i++) {
+      getVM(i).invoke(() -> {
+        createRegion(regionName, listener);
+      });
+    }
+
+    region.put(KEY, 0, cacheRule.getSystem().getDistributedMember());
+    region.destroy(KEY);
+
+    assertThat(sharedCountersRule.getTotal(DESTROYS)).isEqualTo(expectedDestroys());
+  }
+
+  @Test
+  public void afterClearIsInvokedInEveryMember() {
+    CacheListener<String, Integer> listener = new ClearCountingCacheListener();
+    Region<String, Integer> region = createRegion(regionName, listener);
+    for (int i = 0; i < getVMCount(); i++) {
+      getVM(i).invoke(() -> {
+        createRegion(regionName, listener);
+      });
+    }
+
+    region.clear();
+
+    assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(expectedClears());
+  }
+
+  protected int expectedClears() {
+    return getVMCount() + 1;
+  }
+
+  protected int expectedRegionDestroys() {
+    return getVMCount() + 1;
+  }
+
+  /**
+   * Overridden within tests to increment shared counters.
+   */
+  private abstract static class BaseCacheListener extends CacheListenerAdapter<String, Integer>
+      implements Serializable {
+
+    @Override
+    public void afterCreate(final EntryEvent<String, Integer> event) {
+      fail("Unexpected listener callback: afterCreate");
+    }
+
+    @Override
+    public void afterInvalidate(final EntryEvent<String, Integer> event) {
+      fail("Unexpected listener callback: afterInvalidate");
+    }
+
+    @Override
+    public void afterDestroy(final EntryEvent<String, Integer> event) {
+      fail("Unexpected listener callback: afterDestroy");
+    }
+
+    @Override
+    public void afterUpdate(final EntryEvent<String, Integer> event) {
+      fail("Unexpected listener callback: afterUpdate");
+    }
+
+    @Override
+    public void afterRegionInvalidate(final RegionEvent<String, Integer> event) {
+      fail("Unexpected listener callback: afterRegionInvalidate");
+    }
+  }
+
+  private class CreateCountingCacheListener extends BaseCacheListener {
+
+    @Override
+    public void afterCreate(final EntryEvent<String, Integer> event) {
+      sharedCountersRule.increment(CREATES);
+
+      errorCollector.checkThat(event.getDistributedMember(), equalTo(event.getCallbackArgument()));
+      errorCollector.checkThat(event.getOperation(), equalTo(Operation.CREATE));
+      errorCollector.checkThat(event.getOldValue(), nullValue());
+      errorCollector.checkThat(event.getNewValue(), equalTo(ENTRY_VALUE));
+
+      if (event.getSerializedOldValue() != null) {
+        errorCollector.checkThat(event.getSerializedOldValue().getDeserializedValue(),
+            equalTo(event.getOldValue()));
+      }
+      if (event.getSerializedNewValue() != null) {
+        errorCollector.checkThat(event.getSerializedNewValue().getDeserializedValue(),
+            equalTo(event.getNewValue()));
+      }
+    }
+  }
+
+  private class UpdateCountingCacheListener extends BaseCacheListener {
+
+    @Override
+    public void afterCreate(final EntryEvent<String, Integer> event) {
+      // nothing
+    }
+
+    @Override
+    public void afterUpdate(final EntryEvent<String, Integer> event) {
+      sharedCountersRule.increment(UPDATES);
+
+      errorCollector.checkThat(event.getDistributedMember(), equalTo(event.getCallbackArgument()));
+      errorCollector.checkThat(event.getOperation(), equalTo(Operation.UPDATE));
+      errorCollector.checkThat(event.getOldValue(), anyOf(equalTo(ENTRY_VALUE), nullValue()));
+      errorCollector.checkThat(event.getNewValue(), equalTo(UPDATED_ENTRY_VALUE));
+
+      if (event.getSerializedOldValue() != null) {
+        errorCollector.checkThat(event.getSerializedOldValue().getDeserializedValue(),
+            equalTo(event.getOldValue()));
+      }
+      if (event.getSerializedNewValue() != null) {
+        errorCollector.checkThat(event.getSerializedNewValue().getDeserializedValue(),
+            equalTo(event.getNewValue()));
+      }
+    }
+  }
+
+  private class InvalidateCountingCacheListener extends BaseCacheListener {
+
+    @Override
+    public void afterCreate(final EntryEvent<String, Integer> event) {
+      // ignore
+    }
+
+    @Override
+    public void afterInvalidate(final EntryEvent<String, Integer> event) {
+      sharedCountersRule.increment(INVALIDATES);
+
+      if (event.isOriginRemote()) {
+        errorCollector.checkThat(event.getDistributedMember(),
+            not(cacheRule.getSystem().getDistributedMember()));
+      } else {
+        errorCollector.checkThat(event.getDistributedMember(),
+            equalTo(cacheRule.getSystem().getDistributedMember()));
+      }
+      errorCollector.checkThat(event.getOperation(), equalTo(Operation.INVALIDATE));
+      errorCollector.checkThat(event.getOldValue(), anyOf(equalTo(ENTRY_VALUE), nullValue()));
+      errorCollector.checkThat(event.getNewValue(), nullValue());
+    }
+  }
+
+  private class DestroyCountingCacheListener extends BaseCacheListener {
+
+    @Override
+    public void afterCreate(final EntryEvent<String, Integer> event) {
+      sharedCountersRule.increment(CREATES);
+    }
+
+    @Override
+    public void afterUpdate(final EntryEvent<String, Integer> event) {
+      sharedCountersRule.increment(UPDATES);
+    }
+
+    @Override
+    public void afterDestroy(final EntryEvent<String, Integer> event) {
+      sharedCountersRule.increment(DESTROYS);
+
+      if (event.isOriginRemote()) {
+        errorCollector.checkThat(event.getDistributedMember(),
+            not(cacheRule.getSystem().getDistributedMember()));
+      } else {
+        errorCollector.checkThat(event.getDistributedMember(),
+            equalTo(cacheRule.getSystem().getDistributedMember()));
+      }
+      errorCollector.checkThat(event.getOperation(), equalTo(Operation.DESTROY));
+      errorCollector.checkThat(event.getOldValue(), anyOf(equalTo(ENTRY_VALUE), nullValue()));
+      errorCollector.checkThat(event.getNewValue(), nullValue());
+    }
+  }
+
+  protected class ClearCountingCacheListener extends BaseCacheListener {
+
+    @Override
+    public void afterCreate(final EntryEvent<String, Integer> event) {
+      sharedCountersRule.increment(CREATES);
+    }
+
+    @Override
+    public void afterUpdate(final EntryEvent<String, Integer> event) {
+      sharedCountersRule.increment(UPDATES);
+    }
+
+    @Override
+    public void afterRegionClear(RegionEvent<String, Integer> event) {
+
+      sharedCountersRule.increment(CLEAR);
+      if (!event.getRegion().getAttributes().getDataPolicy().withPartitioning()) {
+        if (event.isOriginRemote()) {
+          errorCollector.checkThat(event.getDistributedMember(),
+              not(cacheRule.getSystem().getDistributedMember()));
+        } else {
+          errorCollector.checkThat(event.getDistributedMember(),
+              equalTo(cacheRule.getSystem().getDistributedMember()));
+        }
+      }
+      errorCollector.checkThat(event.getOperation(), equalTo(Operation.REGION_CLEAR));
+      errorCollector.checkThat(event.getRegion().getName(), equalTo(regionName));
+    }
+  }
+
+  protected class RegionDestroyCountingCacheListener extends BaseCacheListener {
+
+    @Override
+    public void afterCreate(final EntryEvent<String, Integer> event) {
+      sharedCountersRule.increment(CREATES);
+    }
+
+    @Override
+    public void afterUpdate(final EntryEvent<String, Integer> event) {
+      sharedCountersRule.increment(UPDATES);
+    }
+
+    @Override
+    public void afterRegionDestroy(final RegionEvent<String, Integer> event) {
+      sharedCountersRule.increment(REGION_DESTROY);
+
+      if (!event.getRegion().getAttributes().getDataPolicy().withPartitioning()) {
+        if (event.isOriginRemote()) {
+          errorCollector.checkThat(event.getDistributedMember(),
+              not(cacheRule.getSystem().getDistributedMember()));
+        } else {
+          errorCollector.checkThat(event.getDistributedMember(),
+              equalTo(cacheRule.getSystem().getDistributedMember()));
+        }
+      }
+      errorCollector.checkThat(event.getOperation(), equalTo(Operation.REGION_DESTROY));
+      errorCollector.checkThat(event.getRegion().getName(), equalTo(regionName));
+    }
+  }
 }
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java
index 6612833..dd229de 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java
@@ -51,8 +51,8 @@ public class ReplicateCacheListenerDistributedTest implements Serializable {
   private static final String UPDATES = "UPDATES";
   private static final String INVALIDATES = "INVALIDATES";
   private static final String DESTROYS = "DESTROYS";
-  protected static final String CLEAR = "CLEAR";
-  protected static final String REGION_DESTROY = "REGION_DESTROY";
+  private static final String CLEAR = "CLEAR";
+  private static final String REGION_DESTROY = "REGION_DESTROY";
 
   private static final int ENTRY_VALUE = 0;
   private static final int UPDATED_ENTRY_VALUE = 1;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 2da4973..51eabe9 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -10256,7 +10256,7 @@ public class PartitionedRegion extends LocalRegion
   void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) {
     // Synchronized to avoid other threads invoking clear on this vm/node.
     synchronized (clearLock) {
-      partitionedRegionClear.doClear(regionEvent, cacheWrite, this);
+      partitionedRegionClear.doClear(regionEvent, cacheWrite);
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
index 69277ef..030b36e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
@@ -39,21 +39,24 @@ public class PartitionedRegionClear {
 
   private static final Logger logger = LogService.getLogger();
 
-  private static final String CLEAR_OPERATION = "_clearOperation";
+  protected static final String CLEAR_OPERATION = "_clearOperation";
 
   private final int retryTime = 2 * 60 * 1000;
 
   private final PartitionedRegion partitionedRegion;
 
-  private final LockForListenerAndClientNotification lockForListenerAndClientNotification =
+  protected final LockForListenerAndClientNotification lockForListenerAndClientNotification =
       new LockForListenerAndClientNotification();
 
   private volatile boolean membershipChange = false;
 
+  protected final PartitionedRegionClearListener partitionedRegionClearListener =
+      new PartitionedRegionClearListener();
+
   public PartitionedRegionClear(PartitionedRegion partitionedRegion) {
     this.partitionedRegion = partitionedRegion;
     partitionedRegion.getDistributionManager()
-        .addMembershipListener(new PartitionedRegionClearListener());
+        .addMembershipListener(partitionedRegionClearListener);
   }
 
   public boolean isLockedForListenerAndClientNotification() {
@@ -79,6 +82,10 @@ public class PartitionedRegionClear {
     }
   }
 
+  protected PartitionedRegionClearListener getPartitionedRegionClearListener() {
+    return partitionedRegionClearListener;
+  }
+
   void obtainLockForClear(RegionEventImpl event) {
     obtainClearLockLocal(partitionedRegion.getDistributionManager().getId());
     sendPartitionedRegionClearMessage(event,
@@ -100,9 +107,8 @@ public class PartitionedRegionClear {
     return allBucketsCleared;
   }
 
-  private void waitForPrimary() {
+  protected void waitForPrimary(PartitionedRegion.RetryTimeKeeper retryTimer) {
     boolean retry;
-    PartitionedRegion.RetryTimeKeeper retryTimer = new PartitionedRegion.RetryTimeKeeper(retryTime);
     do {
       retry = false;
       for (BucketRegion bucketRegion : partitionedRegion.getDataStore()
@@ -122,7 +128,7 @@ public class PartitionedRegionClear {
 
   public ArrayList clearRegionLocal(RegionEventImpl regionEvent) {
     ArrayList clearedBuckets = new ArrayList();
-    membershipChange = false;
+    setMembershipChange(false);
     // Synchronized to handle the requester departure.
     synchronized (lockForListenerAndClientNotification) {
       if (partitionedRegion.getDataStore() != null) {
@@ -130,18 +136,22 @@ public class PartitionedRegionClear {
         try {
           boolean retry;
           do {
-            waitForPrimary();
-
+            waitForPrimary(new PartitionedRegion.RetryTimeKeeper(retryTime));
+            RegionEventImpl bucketRegionEvent;
             for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore()
                 .getAllLocalPrimaryBucketRegions()) {
               if (localPrimaryBucketRegion.size() > 0) {
-                localPrimaryBucketRegion.clear();
+                bucketRegionEvent =
+                    new RegionEventImpl(localPrimaryBucketRegion, Operation.REGION_CLEAR, null,
+                        false, partitionedRegion.getMyId(), regionEvent.getEventId());
+                localPrimaryBucketRegion.cmnClearRegion(bucketRegionEvent, false, true);
               }
               clearedBuckets.add(localPrimaryBucketRegion.getId());
             }
 
-            if (membershipChange) {
-              membershipChange = false;
+            if (getMembershipChange()) {
+              // Retry and reset the membership change status.
+              setMembershipChange(false);
               retry = true;
             } else {
               retry = false;
@@ -160,7 +170,7 @@ public class PartitionedRegionClear {
     return clearedBuckets;
   }
 
-  private void doAfterClear(RegionEventImpl regionEvent) {
+  protected void doAfterClear(RegionEventImpl regionEvent) {
     if (partitionedRegion.hasAnyClientsInterested()) {
       notifyClients(regionEvent);
     }
@@ -245,7 +255,7 @@ public class PartitionedRegionClear {
     }
   }
 
-  private List sendPartitionedRegionClearMessage(RegionEventImpl event,
+  protected List sendPartitionedRegionClearMessage(RegionEventImpl event,
       PartitionedRegionClearMessage.OperationType op) {
     RegionEventImpl eventForLocalClear = (RegionEventImpl) event.clone();
     eventForLocalClear.setOperation(Operation.REGION_LOCAL_CLEAR);
@@ -259,7 +269,7 @@ public class PartitionedRegionClear {
     } while (true);
   }
 
-  private List attemptToSendPartitionedRegionClearMessage(RegionEventImpl event,
+  protected List attemptToSendPartitionedRegionClearMessage(RegionEventImpl event,
       PartitionedRegionClearMessage.OperationType op)
       throws ForceReattemptException {
     List bucketsOperated = null;
@@ -321,30 +331,27 @@ public class PartitionedRegionClear {
     return bucketsOperated;
   }
 
-  void doClear(RegionEventImpl regionEvent, boolean cacheWrite,
-      PartitionedRegion partitionedRegion) {
-    String lockName = CLEAR_OPERATION + partitionedRegion.getDisplayName();
+  void doClear(RegionEventImpl regionEvent, boolean cacheWrite) {
+    String lockName = CLEAR_OPERATION + partitionedRegion.getName();
 
     try {
       // distributed lock to make sure only one clear op is in progress in the cluster.
       acquireDistributedClearLock(lockName);
 
       // Force all primary buckets to be created before clear.
-      PartitionRegionHelper.assignBucketsToPartitions(partitionedRegion);
+      assignAllPrimaryBuckets();
 
       // do cacheWrite
-      try {
-        partitionedRegion.cacheWriteBeforeRegionClear(regionEvent);
-      } catch (OperationAbortedException operationAbortedException) {
-        throw new CacheWriterException(operationAbortedException);
+      if (cacheWrite) {
+        invokeCacheWriter(regionEvent);
       }
 
       // Check if there are any listeners or clients interested. If so, then clear write
       // locks needs to be taken on all local and remote primary buckets in order to
       // preserve the ordering of client events (for concurrent operations on the region).
-      boolean acquireClearLockForClientNotification =
-          (partitionedRegion.hasAnyClientsInterested() && partitionedRegion.hasListener());
-      if (acquireClearLockForClientNotification) {
+      boolean acquireClearLockForNotification =
+          (partitionedRegion.hasAnyClientsInterested() || partitionedRegion.hasListener());
+      if (acquireClearLockForNotification) {
         obtainLockForClear(regionEvent);
       }
       try {
@@ -362,7 +369,7 @@ public class PartitionedRegionClear {
           throw new PartitionedRegionPartialClearException(message);
         }
       } finally {
-        if (acquireClearLockForClientNotification) {
+        if (acquireClearLockForNotification) {
           releaseLockForClear(regionEvent);
         }
       }
@@ -372,7 +379,19 @@ public class PartitionedRegionClear {
     }
   }
 
-  void handleClearFromDepartedMember(InternalDistributedMember departedMember) {
+  protected void invokeCacheWriter(RegionEventImpl regionEvent) {
+    try {
+      partitionedRegion.cacheWriteBeforeRegionClear(regionEvent);
+    } catch (OperationAbortedException operationAbortedException) {
+      throw new CacheWriterException(operationAbortedException);
+    }
+  }
+
+  protected void assignAllPrimaryBuckets() {
+    PartitionRegionHelper.assignBucketsToPartitions(partitionedRegion);
+  }
+
+  protected void handleClearFromDepartedMember(InternalDistributedMember departedMember) {
     if (departedMember.equals(lockForListenerAndClientNotification.getLockRequester())) {
       synchronized (lockForListenerAndClientNotification) {
         if (lockForListenerAndClientNotification.getLockRequester() != null) {
@@ -407,12 +426,20 @@ public class PartitionedRegionClear {
     }
   }
 
+  protected void setMembershipChange(boolean membershipChange) {
+    this.membershipChange = membershipChange;
+  }
+
+  protected boolean getMembershipChange() {
+    return membershipChange;
+  }
+
   protected class PartitionedRegionClearListener implements MembershipListener {
 
     @Override
     public synchronized void memberDeparted(DistributionManager distributionManager,
         InternalDistributedMember id, boolean crashed) {
-      membershipChange = true;
+      setMembershipChange(true);
       handleClearFromDepartedMember(id);
     }
   }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java
new file mode 100644
index 0000000..d8c42af
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java
@@ -0,0 +1,611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.PartitionedRegionPartialClearException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.distributed.DistributedLockService;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.MembershipListener;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
+import org.apache.geode.internal.cache.versions.RegionVersionVector;
+
+public class PartitionedRegionClearTest {
+
+
+  private PartitionedRegionClear partitionedRegionClear;
+  private DistributionManager distributionManager;
+  private PartitionedRegion partitionedRegion;
+
+  @Before
+  public void setUp() {
+
+    partitionedRegion = mock(PartitionedRegion.class);
+    distributionManager = mock(DistributionManager.class);
+
+    when(partitionedRegion.getDistributionManager()).thenReturn(distributionManager);
+    when(partitionedRegion.getName()).thenReturn("prRegion");
+
+    partitionedRegionClear = new PartitionedRegionClear(partitionedRegion);
+  }
+
+  private Set<BucketRegion> setupBucketRegions(
+      PartitionedRegionDataStore partitionedRegionDataStore,
+      BucketAdvisor bucketAdvisor) {
+    final int numBuckets = 2;
+    Set<BucketRegion> bucketRegions = new HashSet<>();
+    for (int i = 0; i < numBuckets; i++) {
+      BucketRegion bucketRegion = mock(BucketRegion.class);
+      when(bucketRegion.getBucketAdvisor()).thenReturn(bucketAdvisor);
+      when(bucketRegion.size()).thenReturn(1);
+      when(bucketRegion.getId()).thenReturn(i);
+      bucketRegions.add(bucketRegion);
+    }
+
+    when(partitionedRegionDataStore.getAllLocalBucketRegions()).thenReturn(bucketRegions);
+    when(partitionedRegionDataStore.getAllLocalPrimaryBucketRegions()).thenReturn(bucketRegions);
+
+    return bucketRegions;
+  }
+
+  @Test
+  public void isLockedForListenerAndClientNotificationReturnsTrueWhenLocked() {
+    InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
+    when(distributionManager.isCurrentMember(internalDistributedMember)).thenReturn(true);
+    partitionedRegionClear.obtainClearLockLocal(internalDistributedMember);
+
+    assertThat(partitionedRegionClear.isLockedForListenerAndClientNotification()).isTrue();
+  }
+
+  @Test
+  public void isLockedForListenerAndClientNotificationReturnsFalseWhenMemberNotInTheSystemRequestsLock() {
+    InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
+    when(distributionManager.isCurrentMember(internalDistributedMember)).thenReturn(false);
+
+    assertThat(partitionedRegionClear.isLockedForListenerAndClientNotification()).isFalse();
+  }
+
+  @Test
+  public void acquireDistributedClearLockGetsDistributedLock() {
+    DistributedLockService distributedLockService = mock(DistributedLockService.class);
+    String lockName = PartitionedRegionClear.CLEAR_OPERATION + partitionedRegion.getName();
+    when(partitionedRegion.getPartitionedRegionLockService()).thenReturn(distributedLockService);
+
+    partitionedRegionClear.acquireDistributedClearLock(lockName);
+
+    verify(distributedLockService, times(1)).lock(lockName, -1, -1);
+  }
+
+  @Test
+  public void releaseDistributedClearLockReleasesDistributedLock() {
+    DistributedLockService distributedLockService = mock(DistributedLockService.class);
+    String lockName = PartitionedRegionClear.CLEAR_OPERATION + partitionedRegion.getName();
+    when(partitionedRegion.getPartitionedRegionLockService()).thenReturn(distributedLockService);
+
+    partitionedRegionClear.releaseDistributedClearLock(lockName);
+
+    verify(distributedLockService, times(1)).unlock(lockName);
+  }
+
+  @Test
+  public void obtainLockForClearGetsLocalLockAndSendsMessageForRemote() throws Exception {
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+    when(regionEvent.clone()).thenReturn(mock(RegionEventImpl.class));
+    Region<String, PartitionRegionConfig> region = mock(Region.class);
+    when(partitionedRegion.getPRRoot()).thenReturn(region);
+    PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+    doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear)
+        .attemptToSendPartitionedRegionClearMessage(regionEvent,
+            PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR);
+    InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
+    when(distributionManager.getId()).thenReturn(internalDistributedMember);
+
+    spyPartitionedRegionClear.obtainLockForClear(regionEvent);
+
+    verify(spyPartitionedRegionClear, times(1)).obtainClearLockLocal(internalDistributedMember);
+    verify(spyPartitionedRegionClear, times(1)).sendPartitionedRegionClearMessage(regionEvent,
+        PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR);
+  }
+
+  @Test
+  public void releaseLockForClearReleasesLocalLockAndSendsMessageForRemote() throws Exception {
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+    when(regionEvent.clone()).thenReturn(mock(RegionEventImpl.class));
+    Region<String, PartitionRegionConfig> region = mock(Region.class);
+    when(partitionedRegion.getPRRoot()).thenReturn(region);
+    PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+    doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear)
+        .attemptToSendPartitionedRegionClearMessage(regionEvent,
+            PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR);
+    InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
+    when(distributionManager.getId()).thenReturn(internalDistributedMember);
+
+    spyPartitionedRegionClear.releaseLockForClear(regionEvent);
+
+    verify(spyPartitionedRegionClear, times(1)).releaseClearLockLocal();
+    verify(spyPartitionedRegionClear, times(1)).sendPartitionedRegionClearMessage(regionEvent,
+        PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR);
+  }
+
+  @Test
+  public void clearRegionClearsLocalAndSendsMessageForRemote() throws Exception {
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+    when(regionEvent.clone()).thenReturn(mock(RegionEventImpl.class));
+    Region<String, PartitionRegionConfig> region = mock(Region.class);
+    when(partitionedRegion.getPRRoot()).thenReturn(region);
+    PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+    doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear)
+        .attemptToSendPartitionedRegionClearMessage(regionEvent,
+            PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR);
+    InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
+    when(distributionManager.getId()).thenReturn(internalDistributedMember);
+    RegionVersionVector regionVersionVector = mock(RegionVersionVector.class);
+
+    spyPartitionedRegionClear.clearRegion(regionEvent, false, regionVersionVector);
+
+    verify(spyPartitionedRegionClear, times(1)).clearRegionLocal(regionEvent);
+    verify(spyPartitionedRegionClear, times(1)).sendPartitionedRegionClearMessage(regionEvent,
+        PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR);
+  }
+
+  @Test
+  public void waitForPrimaryReturnsAfterFindingAllPrimary() {
+    PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class);
+    BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+    when(bucketAdvisor.hasPrimary()).thenReturn(true);
+    setupBucketRegions(partitionedRegionDataStore, bucketAdvisor);
+    when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
+    PartitionedRegion.RetryTimeKeeper retryTimer = mock(PartitionedRegion.RetryTimeKeeper.class);
+
+    partitionedRegionClear.waitForPrimary(retryTimer);
+
+    verify(retryTimer, times(0)).waitForBucketsRecovery();
+  }
+
+  @Test
+  public void waitForPrimaryReturnsAfterRetryForPrimary() {
+    PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class);
+    BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+    when(bucketAdvisor.hasPrimary()).thenReturn(false).thenReturn(true);
+    setupBucketRegions(partitionedRegionDataStore, bucketAdvisor);
+    when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
+    PartitionedRegion.RetryTimeKeeper retryTimer = mock(PartitionedRegion.RetryTimeKeeper.class);
+
+    partitionedRegionClear.waitForPrimary(retryTimer);
+
+    verify(retryTimer, times(1)).waitForBucketsRecovery();
+  }
+
+  @Test
+  public void waitForPrimaryThrowsPartitionedRegionPartialClearException() {
+    PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class);
+    BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+    setupBucketRegions(partitionedRegionDataStore, bucketAdvisor);
+    when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
+    PartitionedRegion.RetryTimeKeeper retryTimer = mock(PartitionedRegion.RetryTimeKeeper.class);
+    when(retryTimer.overMaximum()).thenReturn(true);
+
+    Throwable thrown = catchThrowable(() -> partitionedRegionClear.waitForPrimary(retryTimer));
+
+    assertThat(thrown)
+        .isInstanceOf(PartitionedRegionPartialClearException.class)
+        .hasMessage(
+            "Unable to find primary bucket region during clear operation for region: prRegion");
+    verify(retryTimer, times(0)).waitForBucketsRecovery();
+  }
+
+  @Test
+  public void clearRegionLocalCallsClearOnLocalPrimaryBucketRegions() {
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+    BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+    when(bucketAdvisor.hasPrimary()).thenReturn(true);
+    PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class);
+    doNothing().when(partitionedRegionDataStore).lockBucketCreationForRegionClear();
+    Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor);
+    when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
+
+    List bucketsCleared = partitionedRegionClear.clearRegionLocal(regionEvent);
+
+    assertThat(bucketsCleared).hasSize(buckets.size());
+
+    ArgumentCaptor<RegionEventImpl> argument = ArgumentCaptor.forClass(RegionEventImpl.class);
+    for (BucketRegion bucketRegion : buckets) {
+      verify(bucketRegion, times(1)).cmnClearRegion(argument.capture(), eq(false), eq(true));
+      RegionEventImpl bucketRegionEvent = argument.getValue();
+      assertThat(bucketRegionEvent.getRegion()).isEqualTo(bucketRegion);
+    }
+  }
+
+  @Test
+  public void clearRegionLocalRetriesClearOnLocalPrimaryBucketRegions() {
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+    BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+    when(bucketAdvisor.hasPrimary()).thenReturn(true);
+    PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class);
+    doNothing().when(partitionedRegionDataStore).lockBucketCreationForRegionClear();
+    Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor);
+    when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
+    PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+    when(spyPartitionedRegionClear.getMembershipChange()).thenReturn(true).thenReturn(false);
+
+    List bucketsCleared = spyPartitionedRegionClear.clearRegionLocal(regionEvent);
+
+    int expectedClears = buckets.size() * 2; /* clear is called twice on each bucket */
+    assertThat(bucketsCleared).hasSize(expectedClears);
+
+    ArgumentCaptor<RegionEventImpl> argument = ArgumentCaptor.forClass(RegionEventImpl.class);
+    for (BucketRegion bucketRegion : buckets) {
+      verify(bucketRegion, times(2)).cmnClearRegion(argument.capture(), eq(false), eq(true));
+      RegionEventImpl bucketRegionEvent = argument.getValue();
+      assertThat(bucketRegionEvent.getRegion()).isEqualTo(bucketRegion);
+    }
+  }
+
+  @Test
+  public void doAfterClearCallsNotifyClientsWhenClientHaveInterests() {
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+    when(partitionedRegion.hasAnyClientsInterested()).thenReturn(true);
+    FilterProfile filterProfile = mock(FilterProfile.class);
+    FilterRoutingInfo filterRoutingInfo = mock(FilterRoutingInfo.class);
+    when(filterProfile.getFilterRoutingInfoPart1(regionEvent, FilterProfile.NO_PROFILES,
+        Collections.emptySet())).thenReturn(filterRoutingInfo);
+    when(filterProfile.getFilterRoutingInfoPart2(filterRoutingInfo, regionEvent)).thenReturn(
+        filterRoutingInfo);
+    when(partitionedRegion.getFilterProfile()).thenReturn(filterProfile);
+
+    partitionedRegionClear.doAfterClear(regionEvent);
+
+    verify(regionEvent, times(1)).setLocalFilterInfo(any());
+    verify(partitionedRegion, times(1)).notifyBridgeClients(regionEvent);
+  }
+
+  @Test
+  public void doAfterClearDispatchesListenerEvents() {
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+    when(partitionedRegion.hasListener()).thenReturn(true);
+
+    partitionedRegionClear.doAfterClear(regionEvent);
+
+    verify(partitionedRegion, times(1)).dispatchListenerEvent(
+        EnumListenerEvent.AFTER_REGION_CLEAR, regionEvent);
+  }
+
+  @Test
+  public void obtainClearLockLocalGetsLockOnPrimaryBuckets() {
+    BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+    when(bucketAdvisor.hasPrimary()).thenReturn(true);
+    PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class);
+    Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor);
+    when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    when(distributionManager.isCurrentMember(member)).thenReturn(true);
+
+    partitionedRegionClear.obtainClearLockLocal(member);
+
+    assertThat(partitionedRegionClear.lockForListenerAndClientNotification.getLockRequester())
+        .isSameAs(member);
+    for (BucketRegion bucketRegion : buckets) {
+      verify(bucketRegion, times(1)).lockLocallyForClear(partitionedRegion.getDistributionManager(),
+          partitionedRegion.getMyId(), null);
+    }
+  }
+
+  @Test
+  public void obtainClearLockLocalDoesNotGetLocksOnPrimaryBucketsWhenMemberIsNotCurrent() {
+    BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+    when(bucketAdvisor.hasPrimary()).thenReturn(true);
+    PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class);
+    Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor);
+    when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    when(distributionManager.isCurrentMember(member)).thenReturn(false);
+
+    partitionedRegionClear.obtainClearLockLocal(member);
+
+    assertThat(partitionedRegionClear.lockForListenerAndClientNotification.getLockRequester())
+        .isNull();
+    for (BucketRegion bucketRegion : buckets) {
+      verify(bucketRegion, times(0)).lockLocallyForClear(partitionedRegion.getDistributionManager(),
+          partitionedRegion.getMyId(), null);
+    }
+  }
+
+  @Test
+  public void releaseClearLockLocalReleasesLockOnPrimaryBuckets() {
+    BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+    when(bucketAdvisor.hasPrimary()).thenReturn(true);
+    PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class);
+    Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor);
+    when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    when(distributionManager.isCurrentMember(member)).thenReturn(true);
+    partitionedRegionClear.lockForListenerAndClientNotification.setLocked(member);
+
+    partitionedRegionClear.releaseClearLockLocal();
+
+    for (BucketRegion bucketRegion : buckets) {
+      verify(bucketRegion, times(1)).releaseLockLocallyForClear(null);
+    }
+  }
+
+  @Test
+  public void releaseClearLockLocalDoesNotReleaseLocksOnPrimaryBucketsWhenMemberIsNotCurrent() {
+    BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+    when(bucketAdvisor.hasPrimary()).thenReturn(true);
+    PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class);
+    Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor);
+    when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+
+    partitionedRegionClear.releaseClearLockLocal();
+
+    assertThat(partitionedRegionClear.lockForListenerAndClientNotification.getLockRequester())
+        .isNull();
+    for (BucketRegion bucketRegion : buckets) {
+      verify(bucketRegion, times(0)).releaseLockLocallyForClear(null);
+    }
+  }
+
+  @Test
+  public void sendPartitionedRegionClearMessageSendsClearMessageToPRNodes() {
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+    when(regionEvent.clone()).thenReturn(mock(RegionEventImpl.class));
+    Region<String, PartitionRegionConfig> prRoot = mock(Region.class);
+    when(partitionedRegion.getPRRoot()).thenReturn(prRoot);
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+    Set<InternalDistributedMember> prNodes = Collections.singleton(member);
+    Node node = mock(Node.class);
+    when(node.getMemberId()).thenReturn(member);
+    Set<Node> configNodes = Collections.singleton(node);
+    when(regionAdvisor.adviseAllPRNodes()).thenReturn(prNodes);
+    when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor);
+    PartitionRegionConfig partitionRegionConfig = mock(PartitionRegionConfig.class);
+    when(partitionRegionConfig.getNodes()).thenReturn(configNodes);
+    when(prRoot.get(any())).thenReturn(partitionRegionConfig);
+    InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class);
+    when(internalDistributedSystem.getDistributionManager()).thenReturn(distributionManager);
+    when(partitionedRegion.getSystem()).thenReturn(internalDistributedSystem);
+    InternalCache internalCache = mock(InternalCache.class);
+    TXManagerImpl txManager = mock(TXManagerImpl.class);
+    when(txManager.isDistributed()).thenReturn(false);
+    when(internalCache.getTxManager()).thenReturn(txManager);
+    when(partitionedRegion.getCache()).thenReturn(internalCache);
+
+    when(distributionManager.getCancelCriterion()).thenReturn(mock(CancelCriterion.class));
+    when(distributionManager.getStats()).thenReturn(mock(DMStats.class));
+
+    partitionedRegionClear.sendPartitionedRegionClearMessage(regionEvent,
+        PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR);
+
+    verify(distributionManager, times(1)).putOutgoing(any());
+  }
+
+  @Test
+  public void doClearAcquiresAndReleasesDistributedClearLockAndCreatesAllPrimaryBuckets() {
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+    PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+    doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any());
+    doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets();
+    doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear).clearRegion(regionEvent, false,
+        null);
+
+    spyPartitionedRegionClear.doClear(regionEvent, false);
+
+    verify(spyPartitionedRegionClear, times(1)).acquireDistributedClearLock(any());
+    verify(spyPartitionedRegionClear, times(1)).releaseDistributedClearLock(any());
+    verify(spyPartitionedRegionClear, times(1)).assignAllPrimaryBuckets();
+  }
+
+  @Test
+  public void doClearInvokesCacheWriterWhenCacheWriteIsSet() {
+    boolean cacheWrite = true;
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+    PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+    doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any());
+    doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets();
+    doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear).clearRegion(regionEvent,
+        cacheWrite, null);
+
+    spyPartitionedRegionClear.doClear(regionEvent, cacheWrite);
+
+    verify(spyPartitionedRegionClear, times(1)).invokeCacheWriter(regionEvent);
+  }
+
+  @Test
+  public void doClearDoesNotInvokesCacheWriterWhenCacheWriteIsNotSet() {
+    boolean cacheWrite = false;
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+    PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+    doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any());
+    doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets();
+    doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear).clearRegion(regionEvent,
+        cacheWrite, null);
+
+    spyPartitionedRegionClear.doClear(regionEvent, cacheWrite);
+
+    verify(spyPartitionedRegionClear, times(0)).invokeCacheWriter(regionEvent);
+  }
+
+  @Test
+  public void doClearObtainsAndReleasesLockForClearWhenRegionHasListener() {
+    boolean cacheWrite = false;
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+    when(partitionedRegion.hasListener()).thenReturn(true);
+    when(partitionedRegion.hasAnyClientsInterested()).thenReturn(false);
+    PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+    doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any());
+    doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets();
+    doNothing().when(spyPartitionedRegionClear).obtainLockForClear(regionEvent);
+    doNothing().when(spyPartitionedRegionClear).releaseLockForClear(regionEvent);
+    doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear).clearRegion(regionEvent,
+        cacheWrite, null);
+
+    spyPartitionedRegionClear.doClear(regionEvent, cacheWrite);
+
+    verify(spyPartitionedRegionClear, times(1)).obtainLockForClear(regionEvent);
+    verify(spyPartitionedRegionClear, times(1)).releaseLockForClear(regionEvent);
+  }
+
+  @Test
+  public void doClearObtainsAndReleasesLockForClearWhenRegionHasClientInterest() {
+    boolean cacheWrite = false;
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+    when(partitionedRegion.hasListener()).thenReturn(false);
+    when(partitionedRegion.hasAnyClientsInterested()).thenReturn(true);
+    PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+    doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any());
+    doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets();
+    doNothing().when(spyPartitionedRegionClear).obtainLockForClear(regionEvent);
+    doNothing().when(spyPartitionedRegionClear).releaseLockForClear(regionEvent);
+    doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear).clearRegion(regionEvent,
+        cacheWrite, null);
+
+    spyPartitionedRegionClear.doClear(regionEvent, cacheWrite);
+
+    verify(spyPartitionedRegionClear, times(1)).obtainLockForClear(regionEvent);
+    verify(spyPartitionedRegionClear, times(1)).releaseLockForClear(regionEvent);
+  }
+
+  @Test
+  public void doClearDoesNotObtainLockForClearWhenRegionHasNoListenerAndNoClientInterest() {
+    boolean cacheWrite = false;
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+    when(partitionedRegion.hasListener()).thenReturn(false);
+    when(partitionedRegion.hasAnyClientsInterested()).thenReturn(false);
+    PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+    doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any());
+    doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets();
+    doNothing().when(spyPartitionedRegionClear).obtainLockForClear(regionEvent);
+    doNothing().when(spyPartitionedRegionClear).releaseLockForClear(regionEvent);
+    doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear).clearRegion(regionEvent,
+        cacheWrite, null);
+
+    spyPartitionedRegionClear.doClear(regionEvent, cacheWrite);
+
+    verify(spyPartitionedRegionClear, times(0)).obtainLockForClear(regionEvent);
+    verify(spyPartitionedRegionClear, times(0)).releaseLockForClear(regionEvent);
+  }
+
+  @Test
+  public void doClearThrowsPartitionedRegionPartialClearException() {
+    boolean cacheWrite = false;
+    RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+    when(partitionedRegion.hasListener()).thenReturn(false);
+    when(partitionedRegion.hasAnyClientsInterested()).thenReturn(false);
+    when(partitionedRegion.getTotalNumberOfBuckets()).thenReturn(1);
+    when(partitionedRegion.getName()).thenReturn("prRegion");
+    PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+    doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any());
+    doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets();
+    doNothing().when(spyPartitionedRegionClear).obtainLockForClear(regionEvent);
+    doNothing().when(spyPartitionedRegionClear).releaseLockForClear(regionEvent);
+    doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear).clearRegion(regionEvent,
+        cacheWrite, null);
+
+    Throwable thrown =
+        catchThrowable(() -> spyPartitionedRegionClear.doClear(regionEvent, cacheWrite));
+
+    assertThat(thrown)
+        .isInstanceOf(PartitionedRegionPartialClearException.class)
+        .hasMessage(
+            "Unable to clear all the buckets from the partitioned region prRegion, either data (buckets) moved or member departed.");
+  }
+
+  @Test
+  public void handleClearFromDepartedMemberReleasesTheLockForRequesterDeparture() {
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    partitionedRegionClear.lockForListenerAndClientNotification.setLocked(member);
+    PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+
+    spyPartitionedRegionClear.handleClearFromDepartedMember(member);
+
+    verify(spyPartitionedRegionClear, times(1)).releaseClearLockLocal();
+  }
+
+  @Test
+  public void handleClearFromDepartedMemberDoesNotReleasesTheLockForNonRequesterDeparture() {
+    InternalDistributedMember requesterMember = mock(InternalDistributedMember.class);
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    partitionedRegionClear.lockForListenerAndClientNotification.setLocked(requesterMember);
+    PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+
+    spyPartitionedRegionClear.handleClearFromDepartedMember(member);
+
+    verify(spyPartitionedRegionClear, times(0)).releaseClearLockLocal();
+  }
+
+  @Test
+  public void partitionedRegionClearRegistersMembershipListener() {
+    MembershipListener membershipListener =
+        partitionedRegionClear.getPartitionedRegionClearListener();
+
+    verify(distributionManager, times(1)).addMembershipListener(membershipListener);
+  }
+
+  @Test
+  public void lockRequesterDepartureReleasesTheLock() {
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    partitionedRegionClear.lockForListenerAndClientNotification.setLocked(member);
+    PartitionedRegionClear.PartitionedRegionClearListener partitionedRegionClearListener =
+        partitionedRegionClear.getPartitionedRegionClearListener();
+
+    partitionedRegionClearListener.memberDeparted(distributionManager, member, true);
+
+    assertThat(partitionedRegionClear.getMembershipChange()).isTrue();
+    assertThat(partitionedRegionClear.lockForListenerAndClientNotification.getLockRequester())
+        .isNull();
+  }
+
+  @Test
+  public void nonLockRequesterDepartureDoesNotReleasesTheLock() {
+    InternalDistributedMember requesterMember = mock(InternalDistributedMember.class);
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    partitionedRegionClear.lockForListenerAndClientNotification.setLocked(requesterMember);
+    PartitionedRegionClear.PartitionedRegionClearListener partitionedRegionClearListener =
+        partitionedRegionClear.getPartitionedRegionClearListener();
+
+    partitionedRegionClearListener.memberDeparted(distributionManager, member, true);
+
+    assertThat(partitionedRegionClear.getMembershipChange()).isTrue();
+    assertThat(partitionedRegionClear.lockForListenerAndClientNotification.getLockRequester())
+        .isNotNull();
+  }
+}