You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2020/10/28 19:14:38 UTC

[geode] 11/23: GEODE-7678 (2nd PR) - Support for cache-listener and client-notification for Partitioned Region Clear operation (#5124)

This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 10480f49e9da2074841c31def8b8b7796fd2807b
Author: agingade <ag...@pivotal.io>
AuthorDate: Wed May 20 16:08:07 2020 -0700

    GEODE-7678 (2nd PR) - Support for cache-listener and client-notification for Partitioned Region Clear operation  (#5124)
    
    * GEODE-7678: Add support for cache listener and client notification for PR clear
    
    The changes are made to PR clear messaging and locking mechanism to preserve
    cache-listener and client-events ordering during concurrent cache operation
    while clear in progress.
---
 .../integrationTest/resources/assembly_content.txt |   1 +
 .../cache/PRCacheListenerDistributedTest.java      | 250 +++++++++++-
 .../ReplicateCacheListenerDistributedTest.java     | 111 +++++-
 ...ionedRegionAfterClearNotificationDUnitTest.java | 372 ++++++++++++++++++
 .../cache/PartitionedRegionClearDUnitTest.java     |   1 -
 ...titionedRegionClearWithExpirationDUnitTest.java |  69 ++--
 ...itionedRegionClearWithExpirationDUnitTest.java} |  58 +--
 .../cache/PartitionedRegionIntegrationTest.java    |  45 +++
 .../codeAnalysis/sanctionedDataSerializables.txt   |   8 +
 .../PartitionedRegionPartialClearException.java    |  37 ++
 .../main/java/org/apache/geode/cache/Region.java   |   4 +-
 .../org/apache/geode/internal/DSFIDFactory.java    |   5 +
 .../apache/geode/internal/cache/BucketAdvisor.java |   2 +-
 .../apache/geode/internal/cache/BucketRegion.java  |  17 +-
 .../internal/cache/DistributedClearOperation.java  |  10 +-
 .../geode/internal/cache/DistributedRegion.java    |   9 +-
 .../geode/internal/cache/InternalRegion.java       |   3 +
 .../apache/geode/internal/cache/LocalRegion.java   |   3 +-
 .../geode/internal/cache/PartitionedRegion.java    | 217 ++---------
 .../internal/cache/PartitionedRegionClear.java     | 419 +++++++++++++++++++++
 .../cache/PartitionedRegionClearMessage.java       | 287 ++++++++++++++
 .../internal/cache/PartitionedRegionDataStore.java |   8 +
 .../internal/cache/partitioned/RegionAdvisor.java  |  11 +
 .../sanctioned-geode-core-serializables.txt        |   2 +
 .../internal/cache/BucketRegionJUnitTest.java      |   4 +-
 .../internal/cache/PartitionedRegionTest.java      |  39 --
 .../serialization/DataSerializableFixedID.java     |   2 +
 27 files changed, 1679 insertions(+), 315 deletions(-)

diff --git a/geode-assembly/src/integrationTest/resources/assembly_content.txt b/geode-assembly/src/integrationTest/resources/assembly_content.txt
index 35ee867..d1673b3 100644
--- a/geode-assembly/src/integrationTest/resources/assembly_content.txt
+++ b/geode-assembly/src/integrationTest/resources/assembly_content.txt
@@ -221,6 +221,7 @@ javadoc/org/apache/geode/cache/PartitionAttributes.html
 javadoc/org/apache/geode/cache/PartitionAttributesFactory.html
 javadoc/org/apache/geode/cache/PartitionResolver.html
 javadoc/org/apache/geode/cache/PartitionedRegionDistributionException.html
+javadoc/org/apache/geode/cache/PartitionedRegionPartialClearException.html
 javadoc/org/apache/geode/cache/PartitionedRegionStorageException.html
 javadoc/org/apache/geode/cache/Region.Entry.html
 javadoc/org/apache/geode/cache/Region.html
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java
index 559def7..f4a9ac9 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java
@@ -14,14 +14,21 @@
  */
 package org.apache.geode.cache;
 
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.apache.geode.test.dunit.VM.getVMCount;
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.util.Arrays;
+import java.util.Collection;
 
+import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
 import org.junit.runners.Parameterized.UseParametersRunnerFactory;
 
+import org.apache.geode.logging.internal.log4j.api.LogService;
 import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
 
 /**
@@ -38,28 +45,60 @@ import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactor
 @SuppressWarnings("serial")
 public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistributedTest {
 
-  @Parameters(name = "{index}: redundancy={0}")
-  public static Iterable<Integer> data() {
-    return Arrays.asList(0, 3);
+  @Parameters
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] {
+        {1, Boolean.FALSE},
+        {3, Boolean.TRUE},
+    });
   }
 
   @Parameter
   public int redundancy;
 
+  @Parameter(1)
+  public Boolean withData;
+
   @Override
   protected Region<String, Integer> createRegion(final String name,
       final CacheListener<String, Integer> listener) {
+    return createPartitionedRegion(name, listener, false);
+  }
+
+  protected Region<String, Integer> createAccessorRegion(final String name,
+      final CacheListener<String, Integer> listener) {
+    return createPartitionedRegion(name, listener, true);
+  }
+
+  private Region<String, Integer> createPartitionedRegion(String name,
+      CacheListener<String, Integer> listener, boolean accessor) {
+    LogService.getLogger()
+        .info("Params [Redundancy: " + redundancy + " withData:" + withData + "]");
     PartitionAttributesFactory<String, Integer> paf = new PartitionAttributesFactory<>();
     paf.setRedundantCopies(redundancy);
 
+    if (accessor) {
+      paf.setLocalMaxMemory(0);
+    }
     RegionFactory<String, Integer> regionFactory = cacheRule.getCache().createRegionFactory();
-    regionFactory.addCacheListener(listener);
+    if (listener != null) {
+      regionFactory.addCacheListener(listener);
+    }
     regionFactory.setDataPolicy(DataPolicy.PARTITION);
     regionFactory.setPartitionAttributes(paf.create());
 
     return regionFactory.create(name);
   }
 
+  private void withData(Region region) {
+    if (withData) {
+      // Fewer buckets.
+      // Covers case where node doesn't have any buckets depending on redundancy.
+      region.put("key1", "value1");
+      region.put("key2", "value2");
+    }
+  }
+
   @Override
   protected int expectedCreates() {
     return 1;
@@ -79,4 +118,207 @@ public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistri
   protected int expectedDestroys() {
     return 1;
   }
+
+  @Test
+  public void afterRegionDestroyIsInvokedInEveryMember() {
+    CacheListener<String, Integer> listener = new RegionDestroyCountingCacheListener();
+    Region<String, Integer> region = createRegion(regionName, listener);
+
+    for (int i = 0; i < getVMCount(); i++) {
+      getVM(i).invoke(() -> {
+        withData(createRegion(regionName, listener));
+      });
+    }
+
+    region.destroyRegion();
+
+    assertThat(sharedCountersRule.getTotal(REGION_DESTROY)).isEqualTo(expectedRegionDestroys());
+  }
+
+  @Test
+  public void afterRegionDestroyIsInvokedOnNodeWithListener() {
+    CacheListener<String, Integer> listener = new RegionDestroyCountingCacheListener();
+    Region<String, Integer> region = createRegion(regionName, listener);
+
+    for (int i = 0; i < getVMCount(); i++) {
+      getVM(i).invoke(() -> {
+        withData(createRegion(regionName, null));
+      });
+    }
+
+    region.destroyRegion();
+
+    assertThat(sharedCountersRule.getTotal(REGION_DESTROY)).isEqualTo(1);
+  }
+
+  @Test
+  public void afterRegionDestroyIsInvokedOnRemoteNodeWithListener() {
+    CacheListener<String, Integer> listener = new RegionDestroyCountingCacheListener();
+    Region<String, Integer> region = createRegion(regionName, null);
+
+    getVM(0).invoke(() -> {
+      createRegion(regionName, listener);
+    });
+
+    for (int i = 1; i < getVMCount(); i++) {
+      getVM(i).invoke(() -> {
+        withData(createRegion(regionName, null));
+      });
+    }
+
+    region.destroyRegion();
+
+    assertThat(sharedCountersRule.getTotal(REGION_DESTROY)).isEqualTo(1);
+  }
+
+  @Test
+  public void afterRegionDestroyIsInvokedOnAccessorAndDataMembers() {
+    CacheListener<String, Integer> listener = new RegionDestroyCountingCacheListener();
+    Region<String, Integer> region = createAccessorRegion(regionName, listener);
+    for (int i = 0; i < getVMCount(); i++) {
+      getVM(i).invoke(() -> {
+        withData(createRegion(regionName, listener));
+      });
+    }
+
+    region.destroyRegion();
+
+    assertThat(sharedCountersRule.getTotal(REGION_DESTROY))
+        .isGreaterThanOrEqualTo(expectedRegionDestroys());
+  }
+
+  @Test
+  public void afterRegionDestroyIsInvokedOnAccessor() {
+    CacheListener<String, Integer> listener = new RegionDestroyCountingCacheListener();
+    Region<String, Integer> region = createAccessorRegion(regionName, listener);
+
+    for (int i = 0; i < getVMCount(); i++) {
+      getVM(i).invoke(() -> {
+        withData(createRegion(regionName, null));
+      });
+    }
+
+    region.destroyRegion();
+
+    assertThat(sharedCountersRule.getTotal(REGION_DESTROY)).isEqualTo(1);
+  }
+
+  @Test
+  public void afterRegionDestroyIsInvokedOnNonAccessor() {
+    CacheListener<String, Integer> listener = new RegionDestroyCountingCacheListener();
+    Region<String, Integer> region = createAccessorRegion(regionName, null);
+    getVM(0).invoke(() -> {
+      createRegion(regionName, listener);
+    });
+    for (int i = 1; i < getVMCount(); i++) {
+      getVM(i).invoke(() -> {
+        withData(createRegion(regionName, null));
+      });
+    }
+
+    region.destroyRegion();
+
+    assertThat(sharedCountersRule.getTotal(REGION_DESTROY)).isEqualTo(1);
+  }
+
+  @Test
+  public void afterRegionClearIsInvokedInEveryMember() {
+    CacheListener<String, Integer> listener = new ClearCountingCacheListener();
+    Region<String, Integer> region = createRegion(regionName, listener);
+
+    for (int i = 0; i < getVMCount(); i++) {
+      getVM(i).invoke(() -> {
+        withData(createRegion(regionName, listener));
+      });
+    }
+
+    region.clear();
+
+    assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(expectedClears());
+  }
+
+  @Test
+  public void afterClearIsInvokedOnNodeWithListener() {
+    CacheListener<String, Integer> listener = new ClearCountingCacheListener();
+    Region<String, Integer> region = createRegion(regionName, listener);
+    for (int i = 0; i < getVMCount(); i++) {
+      getVM(i).invoke(() -> {
+        withData(createRegion(regionName, null));
+      });
+    }
+
+    region.clear();
+
+    assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(1);
+  }
+
+  @Test
+  public void afterRegionClearIsInvokedOnRemoteNodeWithListener() {
+    CacheListener<String, Integer> listener = new ClearCountingCacheListener();
+    Region<String, Integer> region = createRegion(regionName, null);
+    getVM(0).invoke(() -> {
+      createRegion(regionName, listener);
+    });
+    for (int i = 1; i < getVMCount(); i++) {
+      getVM(i).invoke(() -> {
+        withData(createRegion(regionName, null));
+      });
+    }
+
+    region.clear();
+
+    assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(1);
+  }
+
+  @Test
+  public void afterRegionClearIsInvokedOnAccessorAndDataMembers() {
+    CacheListener<String, Integer> listener = new ClearCountingCacheListener();
+    Region<String, Integer> region = createAccessorRegion(regionName, listener);
+
+    for (int i = 0; i < getVMCount(); i++) {
+      getVM(i).invoke(() -> {
+        withData(createRegion(regionName, listener));
+      });
+    }
+
+    region.clear();
+
+    assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(expectedClears());
+  }
+
+  @Test
+  public void afterRegionClearIsInvokedOnAccessor() {
+    CacheListener<String, Integer> listener = new ClearCountingCacheListener();
+    Region<String, Integer> region = createAccessorRegion(regionName, listener);
+
+    for (int i = 0; i < getVMCount(); i++) {
+      getVM(i).invoke(() -> {
+        withData(createRegion(regionName, null));
+      });
+    }
+
+    region.clear();
+
+    assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(1);
+  }
+
+  @Test
+  public void afterRegionClearIsInvokedOnNonAccessor() {
+    CacheListener<String, Integer> listener = new ClearCountingCacheListener();
+    Region<String, Integer> region = createAccessorRegion(regionName, null);
+
+    getVM(0).invoke(() -> {
+      createRegion(regionName, listener);
+    });
+    for (int i = 1; i < getVMCount(); i++) {
+      getVM(i).invoke(() -> {
+        withData(createRegion(regionName, null));
+      });
+    }
+
+    region.clear();
+
+    assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(1);
+  }
+
 }
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java
index 3eedcef..6612833 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java
@@ -51,13 +51,15 @@ public class ReplicateCacheListenerDistributedTest implements Serializable {
   private static final String UPDATES = "UPDATES";
   private static final String INVALIDATES = "INVALIDATES";
   private static final String DESTROYS = "DESTROYS";
+  protected static final String CLEAR = "CLEAR";
+  protected static final String REGION_DESTROY = "REGION_DESTROY";
 
   private static final int ENTRY_VALUE = 0;
   private static final int UPDATED_ENTRY_VALUE = 1;
 
   private static final String KEY = "key-1";
 
-  private String regionName;
+  protected String regionName;
 
   @Rule
   public DistributedRule distributedRule = new DistributedRule();
@@ -82,6 +84,8 @@ public class ReplicateCacheListenerDistributedTest implements Serializable {
     distributedCounters.initialize(DESTROYS);
     distributedCounters.initialize(INVALIDATES);
     distributedCounters.initialize(UPDATES);
+    distributedCounters.initialize(CLEAR);
+    distributedCounters.initialize(REGION_DESTROY);
   }
 
   @Test
@@ -148,6 +152,36 @@ public class ReplicateCacheListenerDistributedTest implements Serializable {
     assertThat(distributedCounters.getTotal(DESTROYS)).isEqualTo(expectedDestroys());
   }
 
+  @Test
+  public void afterClearIsInvokedInEveryMember() {
+    CacheListener<String, Integer> listener = new ClearCountingCacheListener();
+    Region<String, Integer> region = createRegion(regionName, listener);
+    for (int i = 0; i < getVMCount(); i++) {
+      getVM(i).invoke(() -> {
+        createRegion(regionName, listener);
+      });
+    }
+
+    region.clear();
+
+    assertThat(distributedCounters.getTotal(CLEAR)).isEqualTo(expectedClears());
+  }
+
+  @Test
+  public void afterRegionDestroyIsInvokedInEveryMember() {
+    CacheListener<String, Integer> listener = new RegionDestroyCountingCacheListener();
+    Region<String, Integer> region = createRegion(regionName, listener);
+    for (int i = 0; i < getVMCount(); i++) {
+      getVM(i).invoke(() -> {
+        createRegion(regionName, listener);
+      });
+    }
+
+    region.destroyRegion();
+
+    assertThat(distributedCounters.getTotal(REGION_DESTROY)).isEqualTo(expectedRegionDestroys());
+  }
+
   protected Region<String, Integer> createRegion(final String name,
       final CacheListener<String, Integer> listener) {
     RegionFactory<String, Integer> regionFactory = cacheRule.getCache().createRegionFactory();
@@ -174,6 +208,14 @@ public class ReplicateCacheListenerDistributedTest implements Serializable {
     return getVMCount() + 1;
   }
 
+  protected int expectedClears() {
+    return getVMCount() + 1;
+  }
+
+  protected int expectedRegionDestroys() {
+    return getVMCount() + 1;
+  }
+
   /**
    * Overridden within tests to increment shared counters.
    */
@@ -283,7 +325,12 @@ public class ReplicateCacheListenerDistributedTest implements Serializable {
 
     @Override
     public void afterCreate(final EntryEvent<String, Integer> event) {
-      // ignore
+      distributedCounters.increment(CREATES);
+    }
+
+    @Override
+    public void afterUpdate(final EntryEvent<String, Integer> event) {
+      distributedCounters.increment(UPDATES);
     }
 
     @Override
@@ -302,4 +349,64 @@ public class ReplicateCacheListenerDistributedTest implements Serializable {
       errorCollector.checkThat(event.getNewValue(), nullValue());
     }
   }
+
+  protected class ClearCountingCacheListener extends BaseCacheListener {
+
+    @Override
+    public void afterCreate(final EntryEvent<String, Integer> event) {
+      distributedCounters.increment(CREATES);
+    }
+
+    @Override
+    public void afterUpdate(final EntryEvent<String, Integer> event) {
+      distributedCounters.increment(UPDATES);
+    }
+
+    @Override
+    public void afterRegionClear(RegionEvent<String, Integer> event) {
+
+      distributedCounters.increment(CLEAR);
+      if (!event.getRegion().getAttributes().getDataPolicy().withPartitioning()) {
+        if (event.isOriginRemote()) {
+          errorCollector.checkThat(event.getDistributedMember(),
+              not(cacheRule.getSystem().getDistributedMember()));
+        } else {
+          errorCollector.checkThat(event.getDistributedMember(),
+              equalTo(cacheRule.getSystem().getDistributedMember()));
+        }
+      }
+      errorCollector.checkThat(event.getOperation(), equalTo(Operation.REGION_CLEAR));
+      errorCollector.checkThat(event.getRegion().getName(), equalTo(regionName));
+    }
+  }
+
+  protected class RegionDestroyCountingCacheListener extends BaseCacheListener {
+
+    @Override
+    public void afterCreate(final EntryEvent<String, Integer> event) {
+      distributedCounters.increment(CREATES);
+    }
+
+    @Override
+    public void afterUpdate(final EntryEvent<String, Integer> event) {
+      distributedCounters.increment(UPDATES);
+    }
+
+    @Override
+    public void afterRegionDestroy(final RegionEvent<String, Integer> event) {
+      distributedCounters.increment(REGION_DESTROY);
+
+      if (!event.getRegion().getAttributes().getDataPolicy().withPartitioning()) {
+        if (event.isOriginRemote()) {
+          errorCollector.checkThat(event.getDistributedMember(),
+              not(cacheRule.getSystem().getDistributedMember()));
+        } else {
+          errorCollector.checkThat(event.getDistributedMember(),
+              equalTo(cacheRule.getSystem().getDistributedMember()));
+        }
+      }
+      errorCollector.checkThat(event.getOperation(), equalTo(Operation.REGION_DESTROY));
+      errorCollector.checkThat(event.getRegion().getName(), equalTo(regionName));
+    }
+  }
 }
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java
new file mode 100644
index 0000000..237b6a8
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache;
+import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.DUnitBlackboard;
+import org.apache.geode.test.dunit.SerializableCallableIF;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+public class PartitionedRegionAfterClearNotificationDUnitTest implements Serializable {
+  protected static final String REGION_NAME = "testPR";
+  protected static final int NUM_ENTRIES = 100;
+
+  protected int locatorPort;
+  protected MemberVM locator;
+  protected MemberVM dataStore1;
+  protected MemberVM dataStore2;
+  protected MemberVM dataStore3;
+  protected MemberVM accessor;
+
+  protected ClientVM client1;
+  protected ClientVM client2;
+
+  private static volatile DUnitBlackboard blackboard;
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule(7);
+
+  @Before
+  public void setUp() throws Exception {
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+    dataStore1 = cluster.startServerVM(1, getProperties(), locatorPort);
+    dataStore2 = cluster.startServerVM(2, getProperties(), locatorPort);
+    dataStore3 = cluster.startServerVM(3, getProperties(), locatorPort);
+    accessor = cluster.startServerVM(4, getProperties(), locatorPort);
+
+    client1 = cluster.startClientVM(5,
+        c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
+    client2 = cluster.startClientVM(6,
+        c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
+
+    dataStore1.invoke(this::initDataStore);
+    dataStore2.invoke(this::initDataStore);
+    dataStore3.invoke(this::initDataStore);
+    accessor.invoke(this::initAccessor);
+
+    getBlackboard().initBlackboard();
+  }
+
+  protected RegionShortcut getRegionShortCut() {
+    return RegionShortcut.PARTITION_REDUNDANT;
+  }
+
+  protected Properties getProperties() {
+    Properties properties = new Properties();
+    return properties;
+  }
+
+  private Region getRegion(boolean isClient) {
+    if (isClient) {
+      return getClientCache().getRegion(REGION_NAME);
+    } else {
+      return getCache().getRegion(REGION_NAME);
+    }
+  }
+
+  private void verifyRegionSize(boolean isClient, int expectedNum) {
+    GeodeAwaitility.await()
+        .untilAsserted(() -> assertThat(getRegion(isClient).size()).isEqualTo(expectedNum));
+  }
+
+  private void initClientCache() {
+    Region region = getClientCache().createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+        .create(REGION_NAME);
+    region.registerInterestForAllKeys(InterestResultPolicy.KEYS);
+  }
+
+  private void stopServers() {
+    List<CacheServer> cacheServers = getCache().getCacheServers();
+    for (CacheServer server : cacheServers) {
+      server.stop();
+    }
+  }
+
+  private void initDataStore() {
+    getCache().createRegionFactory(getRegionShortCut())
+        .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create())
+        .addCacheListener(new CountingCacheListener())
+        .create(REGION_NAME);
+  }
+
+  private void initAccessor() {
+    RegionShortcut shortcut = getRegionShortCut();
+    getCache().createRegionFactory(shortcut)
+        .setPartitionAttributes(
+            new PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create())
+        .addCacheListener(new CountingCacheListener())
+        .create(REGION_NAME);
+  }
+
+  private void feed(boolean isClient) {
+    Region region = getRegion(isClient);
+    IntStream.range(0, NUM_ENTRIES).forEach(i -> region.put(i, "value" + i));
+  }
+
+  private void verifyServerRegionSize(int expectedNum) {
+    accessor.invoke(() -> verifyRegionSize(false, expectedNum));
+    dataStore1.invoke(() -> verifyRegionSize(false, expectedNum));
+    dataStore2.invoke(() -> verifyRegionSize(false, expectedNum));
+    dataStore3.invoke(() -> verifyRegionSize(false, expectedNum));
+  }
+
+  private void verifyClientRegionSize(int expectedNum) {
+    client1.invoke(() -> verifyRegionSize(true, expectedNum));
+    client2.invoke(() -> verifyRegionSize(true, expectedNum));
+  }
+
+  private void verifyCacheListenerTriggerCount(MemberVM serverVM) {
+    SerializableCallableIF<Integer> getListenerTriggerCount = () -> {
+      CountingCacheListener countingCacheListener =
+          (CountingCacheListener) getRegion(false).getAttributes()
+              .getCacheListeners()[0];
+      return countingCacheListener.getClears();
+    };
+
+    int count = accessor.invoke(getListenerTriggerCount)
+        + dataStore1.invoke(getListenerTriggerCount)
+        + dataStore2.invoke(getListenerTriggerCount)
+        + dataStore3.invoke(getListenerTriggerCount);
+    assertThat(count).isEqualTo(4);
+
+    if (serverVM != null) {
+      assertThat(serverVM.invoke(getListenerTriggerCount)).isEqualTo(1);
+    }
+  }
+
+  @Test
+  public void invokeClearOnDataStoreAndVerifyListenerCount() {
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+
+    dataStore1.invoke(() -> getRegion(false).clear());
+
+    verifyServerRegionSize(0);
+    verifyCacheListenerTriggerCount(dataStore1);
+  }
+
+  @Test
+  public void invokeClearOnAccessorAndVerifyListenerCount() {
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+    accessor.invoke(() -> getRegion(false).clear());
+    verifyServerRegionSize(0);
+    verifyCacheListenerTriggerCount(accessor);
+  }
+
+  @Test
+  public void invokeClearFromClientAndVerifyListenerCount() {
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
+    client1.invoke(() -> feed(true));
+    verifyClientRegionSize(NUM_ENTRIES);
+    verifyServerRegionSize(NUM_ENTRIES);
+
+    client1.invoke(() -> getRegion(true).clear());
+
+    verifyServerRegionSize(0);
+    verifyClientRegionSize(0);
+    verifyCacheListenerTriggerCount(null);
+  }
+
+  @Test
+  public void invokeClearFromClientWithAccessorAsServer() {
+    dataStore1.invoke(this::stopServers);
+    dataStore2.invoke(this::stopServers);
+    dataStore3.invoke(this::stopServers);
+
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
+    client1.invoke(() -> feed(true));
+    verifyClientRegionSize(NUM_ENTRIES);
+    verifyServerRegionSize(NUM_ENTRIES);
+
+    client1.invoke(() -> getRegion(true).clear());
+
+    verifyServerRegionSize(0);
+    verifyClientRegionSize(0);
+    verifyCacheListenerTriggerCount(null);
+  }
+
+  @Test
+  public void invokeClearFromDataStoreWithClientInterest() {
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+
+    dataStore1.invoke(() -> getRegion(false).clear());
+
+    verifyServerRegionSize(0);
+    verifyCacheListenerTriggerCount(dataStore1);
+  }
+
+  @Test(expected = AssertionError.class)
+  public void verifyTheLocksAreClearedWhenMemberDepartsAfterTakingClearLockOnRemoteMembers()
+      throws Exception {
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+    dataStore2.invoke(() -> DistributionMessageObserver.setInstance(
+        testHookToKillMemberCallingClearBeforeMessageProcessed()));
+
+    AsyncInvocation ds1ClearAsync = dataStore1.invokeAsync(() -> getRegion(false).clear());
+
+    getBlackboard().waitForGate("CLOSE_CACHE", 30, SECONDS);
+
+    dataStore1.invoke(() -> getCache().close());
+    getBlackboard().signalGate("CACHE_CLOSED");
+
+    // This should not be blocked.
+    dataStore2.invoke(() -> feed(false));
+    dataStore3.invoke(() -> feed(false));
+
+    dataStore2.invoke(() -> verifyRegionSize(false, NUM_ENTRIES));
+    dataStore3.invoke(() -> verifyRegionSize(false, NUM_ENTRIES));
+
+    ds1ClearAsync.await();
+  }
+
+  @Test
+  public void verifyTheLocksAreClearedWhenMemberDepartsAfterTakingClearLockOnRemoteMembersAfterMessageProcessed()
+      throws Exception {
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+
+    dataStore2.invoke(() -> DistributionMessageObserver.setInstance(
+        testHookToKillMemberCallingClearAfterMessageProcessed()));
+
+    AsyncInvocation ds1ClearAsync = dataStore1.invokeAsync(() -> getRegion(false).clear());
+
+    getBlackboard().waitForGate("CLOSE_CACHE", 30, SECONDS);
+
+    dataStore1.invoke(() -> getCache().close());
+    getBlackboard().signalGate("CACHE_CLOSED");
+
+    // This should not be blocked.
+    dataStore2.invoke(() -> feed(false));
+    dataStore3.invoke(() -> feed(false));
+
+    dataStore2.invoke(() -> verifyRegionSize(false, NUM_ENTRIES));
+    dataStore3.invoke(() -> verifyRegionSize(false, NUM_ENTRIES));
+
+    ds1ClearAsync.await();
+  }
+
+
+  private static class CountingCacheListener extends CacheListenerAdapter {
+    private final AtomicInteger clears = new AtomicInteger();
+
+    @Override
+    public void afterRegionClear(RegionEvent event) {
+      clears.incrementAndGet();
+    }
+
+    int getClears() {
+      return clears.get();
+
+    }
+  }
+
+  private DistributionMessageObserver testHookToKillMemberCallingClearBeforeMessageProcessed() {
+    return new DistributionMessageObserver() {
+
+      @Override
+      public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
+        if (message instanceof PartitionedRegionClearMessage) {
+          if (((PartitionedRegionClearMessage) message)
+              .getOp() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) {
+            DistributionMessageObserver.setInstance(null);
+            getBlackboard().signalGate("CLOSE_CACHE");
+            try {
+              getBlackboard().waitForGate("CACHE_CLOSED", 30, SECONDS);
+              GeodeAwaitility.await().untilAsserted(
+                  () -> assertThat(dm.isCurrentMember(message.getSender())).isFalse());
+            } catch (TimeoutException | InterruptedException e) {
+              throw new RuntimeException("Failed waiting for signal.");
+            }
+          }
+        }
+      }
+    };
+  }
+
+  private DistributionMessageObserver testHookToKillMemberCallingClearAfterMessageProcessed() {
+    return new DistributionMessageObserver() {
+      @Override
+      public void afterProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
+        if (message instanceof PartitionedRegionClearMessage) {
+          if (((PartitionedRegionClearMessage) message)
+              .getOp() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) {
+            DistributionMessageObserver.setInstance(null);
+            getBlackboard().signalGate("CLOSE_CACHE");
+            try {
+              getBlackboard().waitForGate("CACHE_CLOSED", 30, SECONDS);
+            } catch (TimeoutException | InterruptedException e) {
+              throw new RuntimeException("Failed waiting for signal.");
+            }
+          }
+        }
+      }
+    };
+  }
+
+  private static DUnitBlackboard getBlackboard() {
+    if (blackboard == null) {
+      blackboard = new DUnitBlackboard();
+    }
+    return blackboard;
+  }
+
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
index e2e04eb..a3b311c 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
@@ -80,7 +80,6 @@ public class PartitionedRegionClearDUnitTest implements Serializable {
 
   protected Properties getProperties() {
     Properties properties = new Properties();
-    properties.setProperty("log-level", "info");
     return properties;
   }
 
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java
index 7f3dff9..dfc9470 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java
@@ -17,12 +17,8 @@ package org.apache.geode.internal.cache;
 import static org.apache.geode.cache.ExpirationAction.DESTROY;
 import static org.apache.geode.cache.RegionShortcut.PARTITION;
 import static org.apache.geode.cache.RegionShortcut.PARTITION_OVERFLOW;
-import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT;
-import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT_OVERFLOW;
 import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT;
 import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_OVERFLOW;
-import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_PERSISTENT;
-import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW;
 import static org.apache.geode.internal.util.ArrayUtils.asList;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.apache.geode.test.dunit.VM.getVM;
@@ -53,6 +49,7 @@ import org.apache.geode.cache.CacheWriterException;
 import org.apache.geode.cache.ExpirationAttributes;
 import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.PartitionedRegionPartialClearException;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionEvent;
 import org.apache.geode.cache.RegionShortcut;
@@ -75,7 +72,8 @@ import org.apache.geode.test.dunit.rules.DistributedRule;
 @RunWith(JUnitParamsRunner.class)
 public class PartitionedRegionClearWithExpirationDUnitTest implements Serializable {
   private static final Integer BUCKETS = 13;
-  private static final Integer EXPIRATION_TIME = 30;
+  private static final Integer EXPIRATION_TIME = 5 * 60;
+  private static final Integer SMALL_EXPIRATION_TIME = 10;
   private static final String REGION_NAME = "PartitionedRegion";
 
   @Rule
@@ -106,11 +104,6 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
         PARTITION_OVERFLOW,
         PARTITION_REDUNDANT,
         PARTITION_REDUNDANT_OVERFLOW,
-
-        PARTITION_PERSISTENT,
-        PARTITION_PERSISTENT_OVERFLOW,
-        PARTITION_REDUNDANT_PERSISTENT,
-        PARTITION_REDUNDANT_PERSISTENT_OVERFLOW
     };
   }
 
@@ -134,26 +127,8 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
     accessor = getVM(TestVM.ACCESSOR.vmNumber);
   }
 
-  private RegionShortcut getRegionAccessorShortcut(RegionShortcut dataStoreRegionShortcut) {
-    if (dataStoreRegionShortcut.isPersistent()) {
-      switch (dataStoreRegionShortcut) {
-        case PARTITION_PERSISTENT:
-          return PARTITION;
-        case PARTITION_PERSISTENT_OVERFLOW:
-          return PARTITION_OVERFLOW;
-        case PARTITION_REDUNDANT_PERSISTENT:
-          return PARTITION_REDUNDANT;
-        case PARTITION_REDUNDANT_PERSISTENT_OVERFLOW:
-          return PARTITION_REDUNDANT_OVERFLOW;
-      }
-    }
-
-    return dataStoreRegionShortcut;
-  }
-
   private void initAccessor(RegionShortcut regionShortcut,
       ExpirationAttributes expirationAttributes) {
-    RegionShortcut accessorShortcut = getRegionAccessorShortcut(regionShortcut);
     PartitionAttributes<String, String> attributes =
         new PartitionAttributesFactory<String, String>()
             .setTotalNumBuckets(BUCKETS)
@@ -161,7 +136,7 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
             .create();
 
     cacheRule.getCache()
-        .<String, String>createRegionFactory(accessorShortcut)
+        .<String, String>createRegionFactory(regionShortcut)
         .setPartitionAttributes(attributes)
         .setEntryTimeToLive(expirationAttributes)
         .setEntryIdleTimeout(expirationAttributes)
@@ -281,6 +256,19 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
     }));
   }
 
+  private void doClear() {
+    Cache cache = cacheRule.getCache();
+    boolean retry;
+    do {
+      retry = false;
+      try {
+        cache.getRegion(REGION_NAME).clear();
+      } catch (PartitionedRegionPartialClearException | CacheWriterException ex) {
+        retry = true;
+      }
+    } while (retry);
+  }
+
   /**
    * The test does the following (clear coordinator and region type are parametrized):
    * - Populates the Partition Region (entries have expiration).
@@ -303,10 +291,7 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
     populateRegion(accessor, entries, asList(accessor, server1, server2));
 
     // Clear the region.
-    getVM(coordinatorVM.vmNumber).invoke(() -> {
-      Cache cache = cacheRule.getCache();
-      cache.getRegion(REGION_NAME).clear();
-    });
+    getVM(coordinatorVM.vmNumber).invoke(() -> doClear());
 
     // Assert all expiration tasks were cancelled and none were executed.
     asList(server1, server2).forEach(vm -> vm.invoke(() -> {
@@ -323,7 +308,7 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
 
     // Assert Region Buckets are consistent and region is empty,
     accessor.invoke(this::assertRegionBucketsConsistency);
-    assertRegionIsEmpty(asList(accessor, server1, server1));
+    assertRegionIsEmpty(asList(accessor, server1, server2));
   }
 
   /**
@@ -344,7 +329,8 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
   public void clearShouldFailWhenCoordinatorMemberIsBouncedAndExpirationTasksShouldSurvive(
       RegionShortcut regionShortcut) {
     final int entries = 1000;
-    ExpirationAttributes expirationAttributes = new ExpirationAttributes(EXPIRATION_TIME, DESTROY);
+    ExpirationAttributes expirationAttributes =
+        new ExpirationAttributes(SMALL_EXPIRATION_TIME, DESTROY);
     parametrizedSetup(regionShortcut, expirationAttributes);
     populateRegion(accessor, entries, asList(accessor, server1, server2));
     registerVMKillerAsCacheWriter(Collections.singletonList(server1));
@@ -408,22 +394,21 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
   @TestCaseName("[{index}] {method}(Coordinator:{0}, RegionType:{1})")
   public void clearShouldSucceedAndRemoveRegisteredExpirationTasksWhenNonCoordinatorMemberIsBounced(
       TestVM coordinatorVM, RegionShortcut regionShortcut) {
-    final int entries = 1500;
+    final int entries = 500;
+
+    RegionShortcut rs = regionShortcut;
     ExpirationAttributes expirationAttributes = new ExpirationAttributes(EXPIRATION_TIME, DESTROY);
     parametrizedSetup(regionShortcut, expirationAttributes);
     registerVMKillerAsCacheWriter(Collections.singletonList(server2));
     populateRegion(accessor, entries, asList(accessor, server1, server2));
 
     // Clear the region.
-    getVM(coordinatorVM.vmNumber).invoke(() -> {
-      Cache cache = cacheRule.getCache();
-      cache.getRegion(REGION_NAME).clear();
-    });
+    getVM(coordinatorVM.vmNumber).invoke(() -> doClear());
 
     // Wait for member to get back online and assign buckets.
     server2.invoke(() -> {
       cacheRule.createCache();
-      initDataStore(regionShortcut, expirationAttributes);
+      initDataStore(rs, expirationAttributes);
       await().untilAsserted(
           () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
       PartitionRegionHelper.assignBucketsToPartitions(cacheRule.getCache().getRegion(REGION_NAME));
@@ -460,7 +445,7 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
 
     // Assert Region Buckets are consistent and region is empty,
     accessor.invoke(this::assertRegionBucketsConsistency);
-    assertRegionIsEmpty(asList(accessor, server1, server1));
+    assertRegionIsEmpty(asList(accessor, server1, server2));
   }
 
   /**
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PersistentPartitionedRegionClearWithExpirationDUnitTest.java
similarity index 93%
copy from geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java
copy to geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PersistentPartitionedRegionClearWithExpirationDUnitTest.java
index 7f3dff9..f6f25bd 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithExpirationDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PersistentPartitionedRegionClearWithExpirationDUnitTest.java
@@ -53,6 +53,7 @@ import org.apache.geode.cache.CacheWriterException;
 import org.apache.geode.cache.ExpirationAttributes;
 import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.PartitionedRegionPartialClearException;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionEvent;
 import org.apache.geode.cache.RegionShortcut;
@@ -73,9 +74,10 @@ import org.apache.geode.test.dunit.rules.DistributedRule;
  * on the {@link PartitionedRegion} once the operation is executed.
  */
 @RunWith(JUnitParamsRunner.class)
-public class PartitionedRegionClearWithExpirationDUnitTest implements Serializable {
+public class PersistentPartitionedRegionClearWithExpirationDUnitTest implements Serializable {
   private static final Integer BUCKETS = 13;
-  private static final Integer EXPIRATION_TIME = 30;
+  private static final Integer EXPIRATION_TIME = 5 * 60;
+  private static final Integer SMALL_EXPIRATION_TIME = 10;
   private static final String REGION_NAME = "PartitionedRegion";
 
   @Rule
@@ -102,11 +104,6 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
   @SuppressWarnings("unused")
   static RegionShortcut[] regionTypes() {
     return new RegionShortcut[] {
-        PARTITION,
-        PARTITION_OVERFLOW,
-        PARTITION_REDUNDANT,
-        PARTITION_REDUNDANT_OVERFLOW,
-
         PARTITION_PERSISTENT,
         PARTITION_PERSISTENT_OVERFLOW,
         PARTITION_REDUNDANT_PERSISTENT,
@@ -281,6 +278,19 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
     }));
   }
 
+  private void doClear() {
+    Cache cache = cacheRule.getCache();
+    boolean retry;
+    do {
+      retry = false;
+      try {
+        cache.getRegion(REGION_NAME).clear();
+      } catch (PartitionedRegionPartialClearException | CacheWriterException ex) {
+        retry = true;
+      }
+    } while (retry);
+  }
+
   /**
    * The test does the following (clear coordinator and region type are parametrized):
    * - Populates the Partition Region (entries have expiration).
@@ -303,10 +313,7 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
     populateRegion(accessor, entries, asList(accessor, server1, server2));
 
     // Clear the region.
-    getVM(coordinatorVM.vmNumber).invoke(() -> {
-      Cache cache = cacheRule.getCache();
-      cache.getRegion(REGION_NAME).clear();
-    });
+    getVM(coordinatorVM.vmNumber).invoke(() -> doClear());
 
     // Assert all expiration tasks were cancelled and none were executed.
     asList(server1, server2).forEach(vm -> vm.invoke(() -> {
@@ -323,7 +330,7 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
 
     // Assert Region Buckets are consistent and region is empty,
     accessor.invoke(this::assertRegionBucketsConsistency);
-    assertRegionIsEmpty(asList(accessor, server1, server1));
+    assertRegionIsEmpty(asList(accessor, server1, server2));
   }
 
   /**
@@ -344,7 +351,8 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
   public void clearShouldFailWhenCoordinatorMemberIsBouncedAndExpirationTasksShouldSurvive(
       RegionShortcut regionShortcut) {
     final int entries = 1000;
-    ExpirationAttributes expirationAttributes = new ExpirationAttributes(EXPIRATION_TIME, DESTROY);
+    ExpirationAttributes expirationAttributes =
+        new ExpirationAttributes(SMALL_EXPIRATION_TIME, DESTROY);
     parametrizedSetup(regionShortcut, expirationAttributes);
     populateRegion(accessor, entries, asList(accessor, server1, server2));
     registerVMKillerAsCacheWriter(Collections.singletonList(server1));
@@ -407,23 +415,29 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
   @Parameters(method = "vmsAndRegionTypes")
   @TestCaseName("[{index}] {method}(Coordinator:{0}, RegionType:{1})")
   public void clearShouldSucceedAndRemoveRegisteredExpirationTasksWhenNonCoordinatorMemberIsBounced(
-      TestVM coordinatorVM, RegionShortcut regionShortcut) {
-    final int entries = 1500;
+      TestVM coordinatorVM, RegionShortcut regionShortcut) throws Exception {
+    final int entries = 500;
+    // To avoid partition offline exception without redundancy.
+
+    if (regionShortcut == PARTITION_PERSISTENT) {
+      regionShortcut = PARTITION_REDUNDANT_PERSISTENT;
+    } else if (regionShortcut == PARTITION_PERSISTENT_OVERFLOW) {
+      regionShortcut = PARTITION_REDUNDANT_PERSISTENT_OVERFLOW;
+    }
+
+    final RegionShortcut rs = regionShortcut;
     ExpirationAttributes expirationAttributes = new ExpirationAttributes(EXPIRATION_TIME, DESTROY);
     parametrizedSetup(regionShortcut, expirationAttributes);
     registerVMKillerAsCacheWriter(Collections.singletonList(server2));
     populateRegion(accessor, entries, asList(accessor, server1, server2));
 
     // Clear the region.
-    getVM(coordinatorVM.vmNumber).invoke(() -> {
-      Cache cache = cacheRule.getCache();
-      cache.getRegion(REGION_NAME).clear();
-    });
+    getVM(coordinatorVM.vmNumber).invoke(() -> doClear());
 
     // Wait for member to get back online and assign buckets.
     server2.invoke(() -> {
       cacheRule.createCache();
-      initDataStore(regionShortcut, expirationAttributes);
+      initDataStore(rs, expirationAttributes);
       await().untilAsserted(
           () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
       PartitionRegionHelper.assignBucketsToPartitions(cacheRule.getCache().getRegion(REGION_NAME));
@@ -459,8 +473,8 @@ public class PartitionedRegionClearWithExpirationDUnitTest implements Serializab
     });
 
     // Assert Region Buckets are consistent and region is empty,
-    accessor.invoke(this::assertRegionBucketsConsistency);
-    assertRegionIsEmpty(asList(accessor, server1, server1));
+    // accessor.invoke(this::assertRegionBucketsConsistency);
+    assertRegionIsEmpty(asList(accessor, server1, server2));
   }
 
   /**
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionIntegrationTest.java
index 818a855..933bc39 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionIntegrationTest.java
@@ -16,15 +16,24 @@
 package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
+import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.junit.Rule;
 import org.junit.Test;
 
+import org.apache.geode.cache.CacheEvent;
 import org.apache.geode.cache.EvictionAction;
 import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache30.TestCacheListener;
 import org.apache.geode.test.junit.rules.ServerStarterRule;
 
 public class PartitionedRegionIntegrationTest {
@@ -55,4 +64,40 @@ public class PartitionedRegionIntegrationTest {
     ScheduledExecutorService bucketSorter = region.getBucketSorter();
     assertThat(bucketSorter).isNull();
   }
+
+  @Test
+  public void prClearWithDataInvokesCacheListenerAfterClear() {
+    TestCacheListener prCacheListener = new TestCacheListener() {};
+    TestCacheListener spyPRCacheListener = spy(prCacheListener);
+
+    Region region = server.createPartitionRegion("PR1",
+        f -> f.addCacheListener(spyPRCacheListener), f -> f.setTotalNumBuckets(2));
+    region.put("key1", "value2");
+    region.put("key2", "value2");
+    spyPRCacheListener.enableEventHistory();
+
+    region.clear();
+
+    verify(spyPRCacheListener, times(1)).afterRegionClear(any());
+    List cacheEvents = spyPRCacheListener.getEventHistory();
+    assertThat(cacheEvents.size()).isEqualTo(1);
+    assertThat(((CacheEvent) cacheEvents.get(0)).getOperation()).isEqualTo(Operation.REGION_CLEAR);
+  }
+
+  @Test
+  public void prClearWithoutDataInvokesCacheListenerAfterClear() {
+    TestCacheListener prCacheListener = new TestCacheListener() {};
+    TestCacheListener spyPRCacheListener = spy(prCacheListener);
+
+    Region region = server.createPartitionRegion("PR1",
+        f -> f.addCacheListener(spyPRCacheListener), f -> f.setTotalNumBuckets(2));
+    spyPRCacheListener.enableEventHistory();
+
+    region.clear();
+
+    verify(spyPRCacheListener, times(1)).afterRegionClear(any());
+    List cacheEvents = spyPRCacheListener.getEventHistory();
+    assertThat(cacheEvents.size()).isEqualTo(1);
+    assertThat(((CacheEvent) cacheEvents.get(0)).getOperation()).isEqualTo(Operation.REGION_CLEAR);
+  }
 }
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 8e522a2..e56247d 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1075,6 +1075,14 @@ org/apache/geode/internal/cache/PartitionRegionConfig,2
 fromData,207
 toData,178
 
+org/apache/geode/internal/cache/PartitionedRegionClearMessage,2
+fromData,40
+toData,36
+
+org/apache/geode/internal/cache/PartitionedRegionClearMessage$PartitionedRegionClearReplyMessage,2
+fromData,29
+toData,28
+
 org/apache/geode/internal/cache/PoolFactoryImpl$PoolAttributes,2
 fromData,161
 toData,161
diff --git a/geode-core/src/main/java/org/apache/geode/cache/PartitionedRegionPartialClearException.java b/geode-core/src/main/java/org/apache/geode/cache/PartitionedRegionPartialClearException.java
new file mode 100644
index 0000000..1ddb301
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/cache/PartitionedRegionPartialClearException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache;
+
+/**
+ * Indicates a failure to perform a distributed clear operation on a Partitioned Region
+ * after multiple attempts. The clear may not have been successfully applied on some of
+ * the members hosting the region.
+ */
+public class PartitionedRegionPartialClearException extends CacheRuntimeException {
+
+  public PartitionedRegionPartialClearException() {}
+
+  public PartitionedRegionPartialClearException(String msg) {
+    super(msg);
+  }
+
+  public PartitionedRegionPartialClearException(String msg, Throwable cause) {
+    super(msg, cause);
+  }
+
+  public PartitionedRegionPartialClearException(Throwable cause) {
+    super(cause);
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/Region.java b/geode-core/src/main/java/org/apache/geode/cache/Region.java
index b6ba670..4707a46 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/Region.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/Region.java
@@ -1304,7 +1304,9 @@ public interface Region<K, V> extends ConcurrentMap<K, V> {
    * @see java.util.Map#clear()
    * @see CacheListener#afterRegionClear
    * @see CacheWriter#beforeRegionClear
-   * @throws UnsupportedOperationException If the region is a partitioned region
+   * @throws PartitionedRegionPartialClearException when data is partially cleared on partitioned
+   *         region. It is caller responsibility to handle the partial data clear either by retrying
+   *         the clear operation or continue working with the partially cleared partitioned region.
    */
   @Override
   void clear();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index 26d92c9..f0658a6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -235,6 +235,7 @@ import org.apache.geode.internal.cache.MemberFunctionStreamingMessage;
 import org.apache.geode.internal.cache.Node;
 import org.apache.geode.internal.cache.PRQueryProcessor;
 import org.apache.geode.internal.cache.PartitionRegionConfig;
+import org.apache.geode.internal.cache.PartitionedRegionClearMessage;
 import org.apache.geode.internal.cache.PreferBytesCachedDeserializable;
 import org.apache.geode.internal.cache.RegionEventImpl;
 import org.apache.geode.internal.cache.ReleaseClearLockMessage;
@@ -686,6 +687,10 @@ public class DSFIDFactory implements DataSerializableFixedID {
     serializer.registerDSFID(PR_DUMP_B2N_REPLY_MESSAGE, DumpB2NReplyMessage.class);
     serializer.registerDSFID(DESTROY_PARTITIONED_REGION_MESSAGE,
         DestroyPartitionedRegionMessage.class);
+    serializer.registerDSFID(CLEAR_PARTITIONED_REGION_MESSAGE,
+        PartitionedRegionClearMessage.class);
+    serializer.registerDSFID(CLEAR_PARTITIONED_REGION_REPLY_MESSAGE,
+        PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage.class);
     serializer.registerDSFID(INVALIDATE_PARTITIONED_REGION_MESSAGE,
         InvalidatePartitionedRegionMessage.class);
     serializer.registerDSFID(COMMIT_PROCESS_QUERY_MESSAGE, CommitProcessQueryMessage.class);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
index e4045c3..6cba754 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
@@ -1622,7 +1622,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
   /**
    * Returns true if the a primary is known.
    */
-  private boolean hasPrimary() {
+  protected boolean hasPrimary() {
     final byte primaryState = this.primaryState;
     return primaryState == OTHER_PRIMARY_NOT_HOSTING || primaryState == OTHER_PRIMARY_HOSTING
         || primaryState == IS_PRIMARY_HOSTING;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 416c058..3008cb3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -578,8 +578,13 @@ public class BucketRegion extends DistributedRegion implements Bucket {
     // get rvvLock
     Set<InternalDistributedMember> participants =
         getCacheDistributionAdvisor().adviseInvalidateRegion();
+    boolean isLockedAlready = this.partitionedRegion.getPartitionedRegionClear()
+        .isLockedForListenerAndClientNotification();
+
     try {
-      obtainWriteLocksForClear(regionEvent, participants);
+      if (!isLockedAlready) {
+        obtainWriteLocksForClear(regionEvent, participants);
+      }
       // no need to dominate my own rvv.
       // Clear is on going here, there won't be GII for this member
       clearRegionLocally(regionEvent, cacheWrite, null);
@@ -587,7 +592,9 @@ public class BucketRegion extends DistributedRegion implements Bucket {
 
       // TODO: call reindexUserDataRegion if there're lucene indexes
     } finally {
-      releaseWriteLocksForClear(regionEvent, participants);
+      if (!isLockedAlready) {
+        releaseWriteLocksForClear(regionEvent, participants);
+      }
     }
   }
 
@@ -2512,4 +2519,10 @@ public class BucketRegion extends DistributedRegion implements Bucket {
   void checkSameSenderIdsAvailableOnAllNodes() {
     // nothing needed on a bucket region
   }
+
+  @Override
+  protected void basicClear(RegionEventImpl regionEvent) {
+    basicClear(regionEvent, false);
+  }
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
index 4396581..25cc2f5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
@@ -207,7 +207,7 @@ public class DistributedClearOperation extends DistributedCacheOperation {
     protected boolean operateOnRegion(CacheEvent event, ClusterDistributionManager dm)
         throws EntryNotFoundException {
 
-      DistributedRegion region = (DistributedRegion) event.getRegion();
+      LocalRegion region = (LocalRegion) event.getRegion();
       switch (this.clearOp) {
         case OP_CLEAR:
           region.clearRegionLocally((RegionEventImpl) event, false, this.rvv);
@@ -215,9 +215,11 @@ public class DistributedClearOperation extends DistributedCacheOperation {
           this.appliedOperation = true;
           break;
         case OP_LOCK_FOR_CLEAR:
-          if (region.getDataPolicy().withStorage()) {
-            DistributedClearOperation.regionLocked(this.getSender(), region.getFullPath(), region);
-            region.lockLocallyForClear(dm, this.getSender(), event);
+          if (region.getDataPolicy().withStorage() && region instanceof DistributedRegion) {
+            DistributedRegion distributedRegion = (DistributedRegion) region;
+            DistributedClearOperation.regionLocked(this.getSender(), region.getFullPath(),
+                distributedRegion);
+            distributedRegion.lockLocallyForClear(dm, this.getSender(), event);
           }
           this.appliedOperation = true;
           break;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 84b5a3b..d0035fa 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2130,7 +2130,13 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
    */
   protected void releaseWriteLocksForClear(RegionEventImpl regionEvent,
       Set<InternalDistributedMember> participants) {
+    releaseLockLocallyForClear(regionEvent);
+    if (!isUsedForPartitionedRegionBucket()) {
+      DistributedClearOperation.releaseLocks(regionEvent, participants);
+    }
+  }
 
+  protected void releaseLockLocallyForClear(RegionEventImpl regionEvent) {
     ARMLockTestHook armLockTestHook = getRegionMap().getARMLockTestHook();
     if (armLockTestHook != null) {
       armLockTestHook.beforeRelease(this, regionEvent);
@@ -2140,9 +2146,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
     if (rvv != null) {
       rvv.unlockForClear(getMyId());
     }
-    if (!isUsedForPartitionedRegionBucket()) {
-      DistributedClearOperation.releaseLocks(regionEvent, participants);
-    }
 
     if (armLockTestHook != null) {
       armLockTestHook.afterRelease(this, regionEvent);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
index 876353f..8ade506 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
@@ -466,4 +466,7 @@ public interface InternalRegion extends Region, HasCachePerfStats, RegionEntryCo
   boolean isRegionCreateNotified();
 
   void setRegionCreateNotified(boolean notified);
+
+  void clearRegionLocally(RegionEventImpl regionEvent, boolean cacheWrite,
+      RegionVersionVector vector);
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 94d4563..623d596 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -8470,7 +8470,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
    * will not take distributedLock. The clear operation will also clear the local transactional
    * entries. The clear operation will have immediate committed state.
    */
-  void clearRegionLocally(RegionEventImpl regionEvent, boolean cacheWrite,
+  @Override
+  public void clearRegionLocally(RegionEventImpl regionEvent, boolean cacheWrite,
       RegionVersionVector vector) {
     final boolean isRvvDebugEnabled = logger.isTraceEnabled(LogMarker.RVV_VERBOSE);
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 950ec63..671d27b 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -322,6 +322,8 @@ public class PartitionedRegion extends LocalRegion
     }
   };
 
+  private final PartitionedRegionClear partitionedRegionClear = new PartitionedRegionClear(this);
+
   /**
    * Global Region for storing PR config ( PRName->PRConfig). This region would be used to resolve
    * PR name conflict.*
@@ -2174,198 +2176,6 @@ public class PartitionedRegion extends LocalRegion
     throw new UnsupportedOperationException();
   }
 
-  @Override
-  void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) {
-    final boolean isDebugEnabled = logger.isDebugEnabled();
-    synchronized (clearLock) {
-      final DistributedLockService lockService = getPartitionedRegionLockService();
-      try {
-        lockService.lock("_clearOperation" + this.getFullPath().replace('/', '_'), -1, -1);
-      } catch (IllegalStateException e) {
-        lockCheckReadiness();
-        throw e;
-      }
-      try {
-        if (cache.isCacheAtShutdownAll()) {
-          throw cache.getCacheClosedException("Cache is shutting down");
-        }
-
-        // do cacheWrite
-        cacheWriteBeforeRegionClear(regionEvent);
-
-        // create ClearPRMessage per bucket
-        List<ClearPRMessage> clearMsgList = createClearPRMessages(regionEvent.getEventId());
-        for (ClearPRMessage clearPRMessage : clearMsgList) {
-          int bucketId = clearPRMessage.getBucketId();
-          checkReadiness();
-          long sendMessagesStartTime = 0;
-          if (isDebugEnabled) {
-            sendMessagesStartTime = System.currentTimeMillis();
-          }
-          try {
-            sendClearMsgByBucket(bucketId, clearPRMessage);
-          } catch (PartitionOfflineException poe) {
-            // TODO add a PartialResultException
-            logger.info("PR.sendClearMsgByBucket encountered PartitionOfflineException at bucket "
-                + bucketId, poe);
-          } catch (Exception e) {
-            logger.info("PR.sendClearMsgByBucket encountered exception at bucket " + bucketId, e);
-          }
-
-          if (isDebugEnabled) {
-            long now = System.currentTimeMillis();
-            logger.debug("PR.sendClearMsgByBucket for bucket {} took {} ms", bucketId,
-                (now - sendMessagesStartTime));
-          }
-          // TODO add psStats
-        }
-      } finally {
-        try {
-          lockService.unlock("_clearOperation" + this.getFullPath().replace('/', '_'));
-        } catch (IllegalStateException e) {
-          lockCheckReadiness();
-        }
-      }
-
-      // notify bridge clients at PR level
-      regionEvent.setEventType(EnumListenerEvent.AFTER_REGION_CLEAR);
-      boolean hasListener = hasListener();
-      if (hasListener) {
-        dispatchListenerEvent(EnumListenerEvent.AFTER_REGION_CLEAR, regionEvent);
-      }
-      notifyBridgeClients(regionEvent);
-      logger.info("Partitioned region {} finsihed clear operation.", this.getFullPath());
-    }
-  }
-
-  void sendClearMsgByBucket(final Integer bucketId, ClearPRMessage clearPRMessage) {
-    RetryTimeKeeper retryTime = null;
-    InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId, null);
-    if (logger.isDebugEnabled()) {
-      logger.debug("PR.sendClearMsgByBucket:bucket {}'s currentTarget is {}", bucketId,
-          currentTarget);
-    }
-
-    long timeOut = 0;
-    int count = 0;
-    while (true) {
-      switch (count) {
-        case 0:
-          // Note we don't check for DM cancellation in common case.
-          // First time. Assume success, keep going.
-          break;
-        case 1:
-          this.cache.getCancelCriterion().checkCancelInProgress(null);
-          // Second time (first failure). Calculate timeout and keep going.
-          timeOut = System.currentTimeMillis() + this.retryTimeout;
-          break;
-        default:
-          this.cache.getCancelCriterion().checkCancelInProgress(null);
-          // test for timeout
-          long timeLeft = timeOut - System.currentTimeMillis();
-          if (timeLeft < 0) {
-            PRHARedundancyProvider.timedOut(this, null, null, "clear a bucket" + bucketId,
-                this.retryTimeout);
-            // NOTREACHED
-          }
-
-          // Didn't time out. Sleep a bit and then continue
-          boolean interrupted = Thread.interrupted();
-          try {
-            Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION);
-          } catch (InterruptedException ignore) {
-            interrupted = true;
-          } finally {
-            if (interrupted) {
-              Thread.currentThread().interrupt();
-            }
-          }
-          break;
-      } // switch
-      count++;
-
-      if (currentTarget == null) { // pick target
-        checkReadiness();
-        if (retryTime == null) {
-          retryTime = new RetryTimeKeeper(this.retryTimeout);
-        }
-
-        currentTarget = waitForNodeOrCreateBucket(retryTime, null, bucketId, false);
-        if (currentTarget == null) {
-          // the bucket does not exist, no need to clear
-          logger.info("Bucket " + bucketId + " does not contain data, no need to clear");
-          return;
-        } else {
-          if (logger.isDebugEnabled()) {
-            logger.debug("PR.sendClearMsgByBucket: new currentTarget is {}", currentTarget);
-          }
-        }
-
-        // It's possible this is a GemFire thread e.g. ServerConnection
-        // which got to this point because of a distributed system shutdown or
-        // region closure which uses interrupt to break any sleep() or wait() calls
-        // e.g. waitForPrimary or waitForBucketRecovery in which case throw exception
-        checkShutdown();
-        continue;
-      } // pick target
-
-      boolean result = false;
-      try {
-        final boolean isLocal = (this.localMaxMemory > 0) && currentTarget.equals(getMyId());
-        if (isLocal) {
-          result = clearPRMessage.doLocalClear(this);
-        } else {
-          ClearPRMessage.ClearResponse response = clearPRMessage.send(currentTarget, this);
-          if (response != null) {
-            this.prStats.incPartitionMessagesSent();
-            result = response.waitForResult();
-          }
-        }
-        if (result) {
-          return;
-        }
-      } catch (ForceReattemptException fre) {
-        checkReadiness();
-        InternalDistributedMember lastTarget = currentTarget;
-        if (retryTime == null) {
-          retryTime = new RetryTimeKeeper(this.retryTimeout);
-        }
-        currentTarget = getNodeForBucketWrite(bucketId, retryTime);
-        if (lastTarget.equals(currentTarget)) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("PR.sendClearMsgByBucket: Retrying at the same node:{} due to {}",
-                currentTarget, fre.getMessage());
-          }
-          if (retryTime.overMaximum()) {
-            PRHARedundancyProvider.timedOut(this, null, null, "clear a bucket",
-                this.retryTimeout);
-            // NOTREACHED
-          }
-          retryTime.waitToRetryNode();
-        } else {
-          if (logger.isDebugEnabled()) {
-            logger.debug("PR.sendClearMsgByBucket: Old target was {}, Retrying {}", lastTarget,
-                currentTarget);
-          }
-        }
-      }
-
-      // It's possible this is a GemFire thread e.g. ServerConnection
-      // which got to this point because of a distributed system shutdown or
-      // region closure which uses interrupt to break any sleep() or wait()
-      // calls
-      // e.g. waitForPrimary or waitForBucketRecovery in which case throw
-      // exception
-      checkShutdown();
-
-      // If we get here, the attempt failed...
-      if (count == 1) {
-        // TODO prStats add ClearPRMsg retried
-        this.prStats.incPutAllMsgsRetried();
-      }
-    }
-  }
-
   List<ClearPRMessage> createClearPRMessages(EventID eventID) {
     ArrayList<ClearPRMessage> clearMsgList = new ArrayList<>();
     for (int bucketId = 0; bucketId < getTotalNumberOfBuckets(); bucketId++) {
@@ -10437,4 +10247,27 @@ public class PartitionedRegion extends LocalRegion
     this.getSystem().handleResourceEvent(ResourceEvent.REGION_CREATE, this);
     this.regionCreationNotified = true;
   }
+
+  protected PartitionedRegionClear getPartitionedRegionClear() {
+    return partitionedRegionClear;
+  }
+
+  @Override
+  void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) {
+    // Synchronized to avoid other threads invoking clear on this vm/node.
+    synchronized (clearLock) {
+      partitionedRegionClear.doClear(regionEvent, cacheWrite, this);
+    }
+  }
+
+  boolean hasAnyClientsInterested() {
+    // Check local filter
+    if (getFilterProfile() != null && (getFilterProfile().hasInterest() || getFilterProfile()
+        .hasCQs())) {
+      return true;
+    }
+    // check peer server filters
+    return (getRegionAdvisor().hasPRServerWithInterest()
+        || getRegionAdvisor().hasPRServerWithCQs());
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
new file mode 100644
index 0000000..69277ef
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelException;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.OperationAbortedException;
+import org.apache.geode.cache.PartitionedRegionPartialClearException;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.MembershipListener;
+import org.apache.geode.distributed.internal.ReplyException;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.versions.RegionVersionVector;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+public class PartitionedRegionClear {
+
+  private static final Logger logger = LogService.getLogger();
+
+  private static final String CLEAR_OPERATION = "_clearOperation";
+
+  private final int retryTime = 2 * 60 * 1000;
+
+  private final PartitionedRegion partitionedRegion;
+
+  private final LockForListenerAndClientNotification lockForListenerAndClientNotification =
+      new LockForListenerAndClientNotification();
+
+  private volatile boolean membershipChange = false;
+
+  public PartitionedRegionClear(PartitionedRegion partitionedRegion) {
+    this.partitionedRegion = partitionedRegion;
+    partitionedRegion.getDistributionManager()
+        .addMembershipListener(new PartitionedRegionClearListener());
+  }
+
+  public boolean isLockedForListenerAndClientNotification() {
+    return lockForListenerAndClientNotification.isLocked();
+  }
+
+  void acquireDistributedClearLock(String clearLock) {
+    try {
+      partitionedRegion.getPartitionedRegionLockService().lock(clearLock, -1, -1);
+    } catch (IllegalStateException e) {
+      partitionedRegion.lockCheckReadiness();
+      throw e;
+    }
+  }
+
+  void releaseDistributedClearLock(String clearLock) {
+    try {
+      partitionedRegion.getPartitionedRegionLockService().unlock(clearLock);
+    } catch (IllegalStateException e) {
+      partitionedRegion.lockCheckReadiness();
+    } catch (Exception ex) {
+      logger.warn("Caught exception while unlocking clear distributed lock. " + ex.getMessage());
+    }
+  }
+
+  void obtainLockForClear(RegionEventImpl event) {
+    obtainClearLockLocal(partitionedRegion.getDistributionManager().getId());
+    sendPartitionedRegionClearMessage(event,
+        PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR);
+  }
+
+  void releaseLockForClear(RegionEventImpl event) {
+    releaseClearLockLocal();
+    sendPartitionedRegionClearMessage(event,
+        PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR);
+  }
+
+  List clearRegion(RegionEventImpl regionEvent, boolean cacheWrite,
+      RegionVersionVector vector) {
+    List allBucketsCleared = new ArrayList();
+    allBucketsCleared.addAll(clearRegionLocal(regionEvent));
+    allBucketsCleared.addAll(sendPartitionedRegionClearMessage(regionEvent,
+        PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR));
+    return allBucketsCleared;
+  }
+
+  private void waitForPrimary() {
+    boolean retry;
+    PartitionedRegion.RetryTimeKeeper retryTimer = new PartitionedRegion.RetryTimeKeeper(retryTime);
+    do {
+      retry = false;
+      for (BucketRegion bucketRegion : partitionedRegion.getDataStore()
+          .getAllLocalBucketRegions()) {
+        if (!bucketRegion.getBucketAdvisor().hasPrimary()) {
+          if (retryTimer.overMaximum()) {
+            throw new PartitionedRegionPartialClearException(
+                "Unable to find primary bucket region during clear operation for region: " +
+                    partitionedRegion.getName());
+          }
+          retryTimer.waitForBucketsRecovery();
+          retry = true;
+        }
+      }
+    } while (retry);
+  }
+
+  public ArrayList clearRegionLocal(RegionEventImpl regionEvent) {
+    ArrayList clearedBuckets = new ArrayList();
+    membershipChange = false;
+    // Synchronized to handle the requester departure.
+    synchronized (lockForListenerAndClientNotification) {
+      if (partitionedRegion.getDataStore() != null) {
+        partitionedRegion.getDataStore().lockBucketCreationForRegionClear();
+        try {
+          boolean retry;
+          do {
+            waitForPrimary();
+
+            for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore()
+                .getAllLocalPrimaryBucketRegions()) {
+              if (localPrimaryBucketRegion.size() > 0) {
+                localPrimaryBucketRegion.clear();
+              }
+              clearedBuckets.add(localPrimaryBucketRegion.getId());
+            }
+
+            if (membershipChange) {
+              membershipChange = false;
+              retry = true;
+            } else {
+              retry = false;
+            }
+
+          } while (retry);
+          doAfterClear(regionEvent);
+        } finally {
+          partitionedRegion.getDataStore().unlockBucketCreationForRegionClear();
+        }
+      } else {
+        // Non data-store with client queue and listener
+        doAfterClear(regionEvent);
+      }
+    }
+    return clearedBuckets;
+  }
+
+  private void doAfterClear(RegionEventImpl regionEvent) {
+    if (partitionedRegion.hasAnyClientsInterested()) {
+      notifyClients(regionEvent);
+    }
+
+    if (partitionedRegion.hasListener()) {
+      partitionedRegion.dispatchListenerEvent(EnumListenerEvent.AFTER_REGION_CLEAR, regionEvent);
+    }
+  }
+
+  void notifyClients(RegionEventImpl event) {
+    // Set client routing information into the event
+    // The clear operation in case of PR is distributed differently
+    // hence the FilterRoutingInfo is set here instead of
+    // DistributedCacheOperation.distribute().
+    event.setEventType(EnumListenerEvent.AFTER_REGION_CLEAR);
+    if (!partitionedRegion.isUsedForMetaRegion() && !partitionedRegion
+        .isUsedForPartitionedRegionAdmin()
+        && !partitionedRegion.isUsedForPartitionedRegionBucket() && !partitionedRegion
+            .isUsedForParallelGatewaySenderQueue()) {
+
+      FilterRoutingInfo localCqFrInfo =
+          partitionedRegion.getFilterProfile().getFilterRoutingInfoPart1(event,
+              FilterProfile.NO_PROFILES, Collections.emptySet());
+
+      FilterRoutingInfo localCqInterestFrInfo =
+          partitionedRegion.getFilterProfile().getFilterRoutingInfoPart2(localCqFrInfo, event);
+
+      if (localCqInterestFrInfo != null) {
+        event.setLocalFilterInfo(localCqInterestFrInfo.getLocalFilterInfo());
+      }
+    }
+    partitionedRegion.notifyBridgeClients(event);
+  }
+
+  protected void obtainClearLockLocal(InternalDistributedMember requester) {
+    synchronized (lockForListenerAndClientNotification) {
+      // Check if the member is still part of the distributed system
+      if (!partitionedRegion.getDistributionManager().isCurrentMember(requester)) {
+        return;
+      }
+
+      lockForListenerAndClientNotification.setLocked(requester);
+      if (partitionedRegion.getDataStore() != null) {
+        for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore()
+            .getAllLocalPrimaryBucketRegions()) {
+          try {
+            localPrimaryBucketRegion.lockLocallyForClear(partitionedRegion.getDistributionManager(),
+                partitionedRegion.getMyId(), null);
+          } catch (Exception ex) {
+            partitionedRegion.checkClosed();
+          }
+        }
+      }
+    }
+  }
+
+  protected void releaseClearLockLocal() {
+    synchronized (lockForListenerAndClientNotification) {
+      if (lockForListenerAndClientNotification.getLockRequester() == null) {
+        // The member has left.
+        return;
+      }
+      try {
+        if (partitionedRegion.getDataStore() != null) {
+
+          for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore()
+              .getAllLocalPrimaryBucketRegions()) {
+            try {
+              localPrimaryBucketRegion.releaseLockLocallyForClear(null);
+            } catch (Exception ex) {
+              logger.debug(
+                  "Unable to acquire clear lock for bucket region " + localPrimaryBucketRegion
+                      .getName(),
+                  ex.getMessage());
+              partitionedRegion.checkClosed();
+            }
+          }
+        }
+      } finally {
+        lockForListenerAndClientNotification.setUnLocked();
+      }
+    }
+  }
+
+  private List sendPartitionedRegionClearMessage(RegionEventImpl event,
+      PartitionedRegionClearMessage.OperationType op) {
+    RegionEventImpl eventForLocalClear = (RegionEventImpl) event.clone();
+    eventForLocalClear.setOperation(Operation.REGION_LOCAL_CLEAR);
+
+    do {
+      try {
+        return attemptToSendPartitionedRegionClearMessage(event, op);
+      } catch (ForceReattemptException reattemptException) {
+        // retry
+      }
+    } while (true);
+  }
+
+  private List attemptToSendPartitionedRegionClearMessage(RegionEventImpl event,
+      PartitionedRegionClearMessage.OperationType op)
+      throws ForceReattemptException {
+    List bucketsOperated = null;
+
+    if (partitionedRegion.getPRRoot() == null) {
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Partition region {} failed to initialize. Remove its profile from remote members.",
+            this.partitionedRegion);
+      }
+      new UpdateAttributesProcessor(partitionedRegion, true).distribute(false);
+      return bucketsOperated;
+    }
+
+    final HashSet configRecipients =
+        new HashSet(partitionedRegion.getRegionAdvisor().adviseAllPRNodes());
+
+    try {
+      final PartitionRegionConfig prConfig =
+          partitionedRegion.getPRRoot().get(partitionedRegion.getRegionIdentifier());
+
+      if (prConfig != null) {
+        Iterator itr = prConfig.getNodes().iterator();
+        while (itr.hasNext()) {
+          InternalDistributedMember idm = ((Node) itr.next()).getMemberId();
+          if (!idm.equals(partitionedRegion.getMyId())) {
+            configRecipients.add(idm);
+          }
+        }
+      }
+    } catch (CancelException ignore) {
+      // ignore
+    }
+
+    try {
+      PartitionedRegionClearMessage.PartitionedRegionClearResponse resp =
+          new PartitionedRegionClearMessage.PartitionedRegionClearResponse(
+              partitionedRegion.getSystem(),
+              configRecipients);
+      PartitionedRegionClearMessage partitionedRegionClearMessage =
+          new PartitionedRegionClearMessage(configRecipients, partitionedRegion, resp, op, event);
+      partitionedRegionClearMessage.send();
+
+      resp.waitForRepliesUninterruptibly();
+      bucketsOperated = resp.bucketsCleared;
+
+    } catch (ReplyException e) {
+      Throwable t = e.getCause();
+      if (t instanceof ForceReattemptException) {
+        throw (ForceReattemptException) t;
+      }
+      if (t instanceof PartitionedRegionPartialClearException) {
+        throw new PartitionedRegionPartialClearException(t.getMessage(), t);
+      }
+      logger.warn(
+          "PartitionedRegionClear#sendPartitionedRegionClearMessage: Caught exception during ClearRegionMessage send and waiting for response",
+          e);
+    }
+    return bucketsOperated;
+  }
+
+  void doClear(RegionEventImpl regionEvent, boolean cacheWrite,
+      PartitionedRegion partitionedRegion) {
+    String lockName = CLEAR_OPERATION + partitionedRegion.getDisplayName();
+
+    try {
+      // distributed lock to make sure only one clear op is in progress in the cluster.
+      acquireDistributedClearLock(lockName);
+
+      // Force all primary buckets to be created before clear.
+      PartitionRegionHelper.assignBucketsToPartitions(partitionedRegion);
+
+      // do cacheWrite
+      try {
+        partitionedRegion.cacheWriteBeforeRegionClear(regionEvent);
+      } catch (OperationAbortedException operationAbortedException) {
+        throw new CacheWriterException(operationAbortedException);
+      }
+
+      // Check if there are any listeners or clients interested. If so, then clear write
+      // locks needs to be taken on all local and remote primary buckets in order to
+      // preserve the ordering of client events (for concurrent operations on the region).
+      boolean acquireClearLockForClientNotification =
+          (partitionedRegion.hasAnyClientsInterested() && partitionedRegion.hasListener());
+      if (acquireClearLockForClientNotification) {
+        obtainLockForClear(regionEvent);
+      }
+      try {
+        List bucketsCleared = clearRegion(regionEvent, cacheWrite, null);
+
+        if (partitionedRegion.getTotalNumberOfBuckets() != bucketsCleared.size()) {
+          String message = "Unable to clear all the buckets from the partitioned region "
+              + partitionedRegion.getName()
+              + ", either data (buckets) moved or member departed.";
+
+          logger.warn(message + " expected to clear number of buckets: "
+              + partitionedRegion.getTotalNumberOfBuckets() +
+              " actual cleared: " + bucketsCleared.size());
+
+          throw new PartitionedRegionPartialClearException(message);
+        }
+      } finally {
+        if (acquireClearLockForClientNotification) {
+          releaseLockForClear(regionEvent);
+        }
+      }
+
+    } finally {
+      releaseDistributedClearLock(lockName);
+    }
+  }
+
+  void handleClearFromDepartedMember(InternalDistributedMember departedMember) {
+    if (departedMember.equals(lockForListenerAndClientNotification.getLockRequester())) {
+      synchronized (lockForListenerAndClientNotification) {
+        if (lockForListenerAndClientNotification.getLockRequester() != null) {
+          releaseClearLockLocal();
+        }
+      }
+    }
+  }
+
+  class LockForListenerAndClientNotification {
+
+    private boolean locked = false;
+
+    private InternalDistributedMember lockRequester;
+
+    synchronized void setLocked(InternalDistributedMember member) {
+      locked = true;
+      lockRequester = member;
+    }
+
+    synchronized void setUnLocked() {
+      locked = false;
+      lockRequester = null;
+    }
+
+    synchronized boolean isLocked() {
+      return locked;
+    }
+
+    synchronized InternalDistributedMember getLockRequester() {
+      return lockRequester;
+    }
+  }
+
+  protected class PartitionedRegionClearListener implements MembershipListener {
+
+    @Override
+    public synchronized void memberDeparted(DistributionManager distributionManager,
+        InternalDistributedMember id, boolean crashed) {
+      membershipChange = true;
+      handleClearFromDepartedMember(id);
+    }
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java
new file mode 100755
index 0000000..b66ab44
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.ReplyException;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
+import org.apache.geode.distributed.internal.ReplySender;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.NanoTimer;
+import org.apache.geode.internal.cache.partitioned.PartitionMessage;
+import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+public class PartitionedRegionClearMessage extends PartitionMessage {
+
+  public enum OperationType {
+    OP_LOCK_FOR_PR_CLEAR, OP_UNLOCK_FOR_PR_CLEAR, OP_PR_CLEAR,
+  }
+
+  private Object cbArg;
+
+  private OperationType op;
+
+  private EventID eventID;
+
+  private PartitionedRegion partitionedRegion;
+
+  private ArrayList bucketsCleared;
+
+  @Override
+  public EventID getEventID() {
+    return eventID;
+  }
+
+  public PartitionedRegionClearMessage() {}
+
+  PartitionedRegionClearMessage(Set recipients, PartitionedRegion region,
+      ReplyProcessor21 processor, PartitionedRegionClearMessage.OperationType operationType,
+      final RegionEventImpl event) {
+    super(recipients, region.getPRId(), processor);
+    partitionedRegion = region;
+    op = operationType;
+    cbArg = event.getRawCallbackArgument();
+    eventID = event.getEventId();
+  }
+
+  public OperationType getOp() {
+    return op;
+  }
+
+  public void send() {
+    Assert.assertTrue(getRecipients() != null, "ClearMessage NULL recipients set");
+    setTransactionDistributed(partitionedRegion.getCache().getTxManager().isDistributed());
+    partitionedRegion.getDistributionManager().putOutgoing(this);
+  }
+
+  @Override
+  protected Throwable processCheckForPR(PartitionedRegion pr,
+      DistributionManager distributionManager) {
+    if (pr != null && !pr.getDistributionAdvisor().isInitialized()) {
+      Throwable thr = new ForceReattemptException(
+          String.format("%s : could not find partitioned region with Id %s",
+              distributionManager.getDistributionManagerId(),
+              pr.getRegionIdentifier()));
+      return thr;
+    }
+    return null;
+  }
+
+  @Override
+  protected boolean operateOnPartitionedRegion(ClusterDistributionManager dm,
+      PartitionedRegion partitionedRegion,
+      long startTime) throws CacheException {
+
+    if (partitionedRegion == null) {
+      return true;
+    }
+
+    if (partitionedRegion.isDestroyed()) {
+      return true;
+    }
+
+    if (op == OperationType.OP_LOCK_FOR_PR_CLEAR) {
+      partitionedRegion.getPartitionedRegionClear().obtainClearLockLocal(getSender());
+    } else if (op == OperationType.OP_UNLOCK_FOR_PR_CLEAR) {
+      partitionedRegion.getPartitionedRegionClear().releaseClearLockLocal();
+    } else {
+      RegionEventImpl event =
+          new RegionEventImpl(partitionedRegion, Operation.REGION_CLEAR, this.cbArg, true,
+              partitionedRegion.getMyId(),
+              getEventID());
+      bucketsCleared = partitionedRegion.getPartitionedRegionClear().clearRegionLocal(event);
+    }
+    return true;
+  }
+
+  @Override
+  protected void appendFields(StringBuilder buff) {
+    super.appendFields(buff);
+    buff.append(" cbArg=").append(this.cbArg).append(" op=").append(this.op);
+  }
+
+  @Override
+  public int getDSFID() {
+    return CLEAR_PARTITIONED_REGION_MESSAGE;
+  }
+
+  @Override
+  public void fromData(DataInput in,
+      DeserializationContext context) throws IOException, ClassNotFoundException {
+    super.fromData(in, context);
+    this.cbArg = DataSerializer.readObject(in);
+    op = PartitionedRegionClearMessage.OperationType.values()[in.readByte()];
+    eventID = DataSerializer.readObject(in);
+  }
+
+  @Override
+  public void toData(DataOutput out,
+      SerializationContext context) throws IOException {
+    super.toData(out, context);
+    DataSerializer.writeObject(this.cbArg, out);
+    out.writeByte(op.ordinal());
+    DataSerializer.writeObject(eventID, out);
+  }
+
+  /**
+   * The response on which to wait for all the replies. This response ignores any exceptions
+   * received from the "far side"
+   */
+  public static class PartitionedRegionClearResponse extends ReplyProcessor21 {
+    CopyOnWriteArrayList bucketsCleared = new CopyOnWriteArrayList();
+
+    public PartitionedRegionClearResponse(InternalDistributedSystem system, Set initMembers) {
+      super(system, initMembers);
+    }
+
+    @Override
+    public void process(DistributionMessage msg) {
+      if (msg instanceof PartitionedRegionClearReplyMessage) {
+        List buckets = ((PartitionedRegionClearReplyMessage) msg).bucketsCleared;
+        if (buckets != null) {
+          bucketsCleared.addAll(buckets);
+        }
+      }
+      super.process(msg, true);
+    }
+  }
+
+  @Override
+  protected void sendReply(InternalDistributedMember member, int processorId,
+      DistributionManager distributionManager, ReplyException ex,
+      PartitionedRegion partitionedRegion, long startTime) {
+    if (partitionedRegion != null) {
+      if (startTime > 0) {
+        partitionedRegion.getPrStats().endPartitionMessagesProcessing(startTime);
+      }
+    }
+    PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage
+        .send(member, processorId, getReplySender(distributionManager), op, bucketsCleared,
+            ex);
+  }
+
+  public static class PartitionedRegionClearReplyMessage extends ReplyMessage {
+
+    private ArrayList bucketsCleared;
+
+    private OperationType op;
+
+    @Override
+    public boolean getInlineProcess() {
+      return true;
+    }
+
+    /**
+     * Empty constructor to conform to DataSerializable interface
+     */
+    public PartitionedRegionClearReplyMessage() {}
+
+    private PartitionedRegionClearReplyMessage(int processorId, OperationType op,
+        ArrayList bucketsCleared, ReplyException ex) {
+      super();
+      this.bucketsCleared = bucketsCleared;
+      this.op = op;
+      setProcessorId(processorId);
+      setException(ex);
+    }
+
+    /** Send an ack */
+    public static void send(InternalDistributedMember recipient, int processorId, ReplySender dm,
+        OperationType op, ArrayList bucketsCleared, ReplyException ex) {
+
+      Assert.assertTrue(recipient != null, "partitionedRegionClearReplyMessage NULL reply message");
+
+      PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage m =
+          new PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage(processorId, op,
+              bucketsCleared, ex);
+
+      m.setRecipient(recipient);
+      dm.putOutgoing(m);
+    }
+
+    /**
+     * Processes this message. This method is invoked by the receiver of the message.
+     *
+     * @param dm the distribution manager that is processing the message.
+     */
+    @Override
+    public void process(final DistributionManager dm, final ReplyProcessor21 rp) {
+      final long startTime = getTimestamp();
+
+      if (rp == null) {
+        if (LogService.getLogger().isTraceEnabled(LogMarker.DM_VERBOSE)) {
+          LogService.getLogger().trace(LogMarker.DM_VERBOSE, "{}: processor not found", this);
+        }
+        return;
+      }
+
+      rp.process(this);
+
+      dm.getStats().incReplyMessageTime(NanoTimer.getTime() - startTime);
+    }
+
+    @Override
+    public int getDSFID() {
+      return CLEAR_PARTITIONED_REGION_REPLY_MESSAGE;
+    }
+
+    @Override
+    public void fromData(DataInput in,
+        DeserializationContext context) throws IOException, ClassNotFoundException {
+      super.fromData(in, context);
+      op = PartitionedRegionClearMessage.OperationType.values()[in.readByte()];
+      bucketsCleared = DataSerializer.readArrayList(in);
+    }
+
+    @Override
+    public void toData(DataOutput out,
+        SerializationContext context) throws IOException {
+      super.toData(out, context);
+      out.writeByte(op.ordinal());
+      DataSerializer.writeArrayList(bucketsCleared, out);
+    }
+
+    @Override
+    public String toString() {
+      StringBuffer sb = new StringBuffer();
+      sb.append("PartitionedRegionClearReplyMessage ")
+          .append("processorId=").append(this.processorId)
+          .append(" sender=").append(sender)
+          .append(" bucketsCleared ").append(this.bucketsCleared)
+          .append(" exception=").append(getException());
+      return sb.toString();
+    }
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index 23a7487..578ed3e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -980,6 +980,14 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
     }
   }
 
+  protected void lockBucketCreationForRegionClear() {
+    bucketCreationLock.writeLock().lock();
+  }
+
+  protected void unlockBucketCreationForRegionClear() {
+    bucketCreationLock.writeLock().unlock();
+  }
+
   /**
    * Gets the total amount of memory in bytes allocated for all values for this PR in this VM. This
    * is the current memory (MB) watermark for data in this PR.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
index 5d2ff24..13ad666 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
@@ -851,10 +851,21 @@ public class RegionAdvisor extends CacheDistributionAdvisor {
         && prof.filterProfile.hasInterest();
   };
 
+  @Immutable
+  private static final Filter prServerWithCqFilter = profile -> {
+    CacheProfile prof = (CacheProfile) profile;
+    return prof.isPartitioned && prof.hasCacheServer && prof.filterProfile != null
+        && prof.filterProfile.hasCQs();
+  };
+
   public boolean hasPRServerWithInterest() {
     return satisfiesFilter(prServerWithInterestFilter);
   }
 
+  public boolean hasPRServerWithCQs() {
+    return satisfiesFilter(prServerWithCqFilter);
+  }
+
   /**
    * return the set of all members who must receive operation notifications
    *
diff --git a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
index 652d1b2..644fbc2 100644
--- a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
+++ b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
@@ -79,6 +79,7 @@ org/apache/geode/cache/NoSubscriptionServersAvailableException,true,848408601915
 org/apache/geode/cache/Operation,true,-7521751729852504238,ordinal:byte
 org/apache/geode/cache/OperationAbortedException,true,-8293166225026556949
 org/apache/geode/cache/PartitionedRegionDistributionException,true,-3004093739855972548
+org/apache/geode/cache/PartitionedRegionPartialClearException,false
 org/apache/geode/cache/PartitionedRegionStorageException,true,5905463619475329732
 org/apache/geode/cache/RegionAccessException,true,3142958723089038406
 org/apache/geode/cache/RegionDestroyedException,true,319804842308010754,regionFullPath:java/lang/String
@@ -302,6 +303,7 @@ org/apache/geode/internal/cache/PRContainsValueFunction,false
 org/apache/geode/internal/cache/PRHARedundancyProvider$ArrayListWithClearState,true,1,wasCleared:boolean
 org/apache/geode/internal/cache/PartitionedRegion$PRIdMap,true,3667357372967498179,cleared:boolean
 org/apache/geode/internal/cache/PartitionedRegion$SizeEntry,false,isPrimary:boolean,size:int
+org/apache/geode/internal/cache/PartitionedRegionClearMessage$OperationType,false
 org/apache/geode/internal/cache/PartitionedRegionDataStore$CreateBucketResult,false,nowExists:boolean
 org/apache/geode/internal/cache/PartitionedRegionException,true,5113786059279106007
 org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator$MemberResultsList,false,isLastChunkReceived:boolean
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
index c7cf5a6..d3397eb 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
@@ -51,7 +51,9 @@ public class BucketRegionJUnitTest extends DistributedRegionJUnitTest {
     when(ba.getPrimaryMoveReadLock()).thenReturn(primaryMoveReadLock);
     when(ba.getProxyBucketRegion()).thenReturn(mock(ProxyBucketRegion.class));
     when(ba.isPrimary()).thenReturn(true);
-
+    PartitionedRegionClear clearPR = mock(PartitionedRegionClear.class);
+    when(clearPR.isLockedForListenerAndClientNotification()).thenReturn(true);
+    when(pr.getPartitionedRegionClear()).thenReturn(clearPR);
     ira.setPartitionedRegion(pr).setPartitionedRegionBucketRedundancy(1).setBucketAdvisor(ba);
   }
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
index 898c4f7..e02ba2c 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
@@ -58,7 +58,6 @@ import org.mockito.junit.MockitoRule;
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.Statistics;
 import org.apache.geode.cache.AttributesFactory;
-import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.CacheLoader;
 import org.apache.geode.cache.CacheWriter;
 import org.apache.geode.cache.Operation;
@@ -221,22 +220,6 @@ public class PartitionedRegionTest {
     spyPartitionedRegion.clear();
   }
 
-  @Test(expected = CacheClosedException.class)
-  public void clearShouldThrowCacheClosedExceptionIfShutdownAll() {
-    PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
-    RegionEventImpl regionEvent =
-        new RegionEventImpl(spyPartitionedRegion, Operation.REGION_CLEAR, null, false,
-            spyPartitionedRegion.getMyId(), true);
-    when(cache.isCacheAtShutdownAll()).thenReturn(true);
-    when(cache.getCacheClosedException("Cache is shutting down"))
-        .thenReturn(new CacheClosedException("Cache is shutting down"));
-    DistributedLockService lockService = mock(DistributedLockService.class);
-    when(spyPartitionedRegion.getPartitionedRegionLockService()).thenReturn(lockService);
-    String lockName = "_clearOperation" + spyPartitionedRegion.getFullPath().replace('/', '_');
-    when(lockService.lock(lockName, -1, -1)).thenReturn(true);
-    spyPartitionedRegion.basicClear(regionEvent, true);
-  }
-
   @Test
   public void createClearPRMessagesShouldCreateMessagePerBucket() {
     PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
@@ -249,28 +232,6 @@ public class PartitionedRegionTest {
     assertThat(msgs.size()).isEqualTo(3);
   }
 
-  @Test
-  public void sendEachMessagePerBucket() {
-    PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
-    RegionEventImpl regionEvent =
-        new RegionEventImpl(spyPartitionedRegion, Operation.REGION_CLEAR, null, false,
-            spyPartitionedRegion.getMyId(), true);
-    when(cache.isCacheAtShutdownAll()).thenReturn(false);
-    DistributedLockService lockService = mock(DistributedLockService.class);
-    when(spyPartitionedRegion.getPartitionedRegionLockService()).thenReturn(lockService);
-    when(spyPartitionedRegion.getTotalNumberOfBuckets()).thenReturn(3);
-    String lockName = "_clearOperation" + spyPartitionedRegion.getFullPath().replace('/', '_');
-    when(lockService.lock(lockName, -1, -1)).thenReturn(true);
-    when(spyPartitionedRegion.hasListener()).thenReturn(true);
-    doNothing().when(spyPartitionedRegion).dispatchListenerEvent(any(), any());
-    doNothing().when(spyPartitionedRegion).notifyBridgeClients(eq(regionEvent));
-    doNothing().when(spyPartitionedRegion).checkReadiness();
-    doNothing().when(lockService).unlock(lockName);
-    spyPartitionedRegion.basicClear(regionEvent, true);
-    verify(spyPartitionedRegion, times(3)).sendClearMsgByBucket(any(), any());
-    verify(spyPartitionedRegion, times(1)).dispatchListenerEvent(any(), any());
-    verify(spyPartitionedRegion, times(1)).notifyBridgeClients(eq(regionEvent));
-  }
 
   @Test
   public void getBucketNodeForReadOrWriteReturnsPrimaryNodeForRegisterInterest() {
diff --git a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
index 3598b5d..481c78c 100644
--- a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
+++ b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
@@ -56,6 +56,8 @@ public interface DataSerializableFixedID extends SerializationVersions, BasicSer
 
   // NOTE, codes < -65536 will take 4 bytes to serialize
   // NOTE, codes < -128 will take 2 bytes to serialize
+  short CLEAR_PARTITIONED_REGION_REPLY_MESSAGE = -166;
+  short CLEAR_PARTITIONED_REGION_MESSAGE = -165;
 
   short PR_CLEAR_REPLY_MESSAGE = -164;
   short PR_CLEAR_MESSAGE = -163;