You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2021/04/25 18:00:33 UTC

[geode] 01/01: GEODE-9191: PR clear could miss clearing bucket which lost primary

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-9191
in repository https://gitbox.apache.org/repos/asf/geode.git

commit a260cf2aa1bedd1d9ee485afcf0aa0fdf8528e6b
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Sun Apr 25 10:57:50 2021 -0700

    GEODE-9191: PR clear could miss clearing bucket which lost primary
---
 .../cache/ClearPRDuringGIIProviderDUnitTest.java   | 325 ++++++++++++++++++++
 .../cache/ClearPRDuringGIIRequesterDUnitTest.java  | 339 +++++++++++++++++++++
 .../cache/PartitionedRegionClearDUnitTest.java     |   4 +-
 .../apache/geode/internal/cache/BucketRegion.java  |  11 +-
 .../internal/cache/InitialImageOperation.java      |   8 +-
 .../internal/cache/PartitionedRegionClear.java     |   2 +-
 6 files changed, 675 insertions(+), 14 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClearPRDuringGIIProviderDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClearPRDuringGIIProviderDUnitTest.java
new file mode 100644
index 0000000..ad82124
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClearPRDuringGIIProviderDUnitTest.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT;
+import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_OVERFLOW;
+import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_PERSISTENT;
+import static org.apache.geode.internal.cache.InitialImageOperation.getGIITestHookForCheckingPurpose;
+import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.fail;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.IntStream;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.PartitionedRegionPartialClearException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.Assert;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.SerializableRunnable;
+import org.apache.geode.test.dunit.WaitCriterion;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+@RunWith(Parameterized.class)
+public class ClearPRDuringGIIProviderDUnitTest implements Serializable {
+  protected static final String REGION_NAME = "testPR";
+  protected static final int TOTAL_BUCKET_NUM = 10;
+  protected static final int DATA_SIZE = 100;
+  protected static final int NUM_SERVERS = 3;
+
+  @Parameterized.Parameter(0)
+  public RegionShortcut regionShortcut;
+
+  @Parameterized.Parameter(1)
+  public InitialImageOperation.GIITestHookType giiTestHookType;
+
+  protected int locatorPort;
+  protected MemberVM[] memberVMS;
+
+  private static final Logger logger = LogManager.getLogger();
+
+  static RegionShortcut[] regionTypes() {
+    return new RegionShortcut[] {
+        PARTITION_REDUNDANT, PARTITION_REDUNDANT_OVERFLOW, PARTITION_REDUNDANT_PERSISTENT
+    };
+  }
+
+  @Parameterized.Parameters(name = "{index}: regionShortcut={0} {1}")
+  public static Collection<Object[]> getCombinations() {
+    List<Object[]> params = new ArrayList<>();
+    RegionShortcut[] regionShortcuts = regionTypes();
+    Arrays.stream(regionShortcuts).forEach(regionShortcut -> {
+      params.add(
+          new Object[] {regionShortcut, InitialImageOperation.GIITestHookType.AfterSentImageReply});
+      params.add(
+          new Object[] {regionShortcut, InitialImageOperation.GIITestHookType.DuringPackingImage});
+      params.add(new Object[] {regionShortcut,
+          InitialImageOperation.GIITestHookType.AfterReceivedRequestImage});
+    });
+    return params;
+  }
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule(NUM_SERVERS + 1);
+
+  @Before
+  public void setUp() throws Exception {
+    // serverVMs[1] is the accessor, which is feeder and invokes clear
+    // serverVMs[2] will add GII TestHook of requester
+    // serverVMs[3] will be GII provider for the specified bucket region
+    memberVMS = new MemberVM[NUM_SERVERS + 1];
+    memberVMS[0] = cluster.startLocatorVM(0);
+    locatorPort = memberVMS[0].getPort();
+    memberVMS[1] = cluster.startServerVM(1, locatorPort);
+    memberVMS[1].invoke(() -> initAccessor());
+    for (int i = 2; i <= NUM_SERVERS; i++) {
+      memberVMS[i] = cluster.startServerVM(i, locatorPort);
+      memberVMS[i].invoke(() -> initDataStore(regionShortcut));
+    }
+    feed("valueOne");
+    verifyRegionSizes(DATA_SIZE);
+  }
+
+  @After
+  public final void preTearDown() throws Exception {
+    for (int i = 1; i <= NUM_SERVERS; i++) {
+      memberVMS[i].invoke(() -> InitialImageOperation.resetAllGIITestHooks());
+    }
+  }
+
+  private void initDataStore(RegionShortcut regionShortcut) {
+    RegionFactory factory = getCache().createRegionFactory(regionShortcut);
+    factory.setPartitionAttributes(
+        new PartitionAttributesFactory().setTotalNumBuckets(TOTAL_BUCKET_NUM).create());
+    factory.create(REGION_NAME);
+  }
+
+  private void initAccessor() {
+    RegionFactory factory = getCache().createRegionFactory(PARTITION_REDUNDANT);
+    factory.setPartitionAttributes(new PartitionAttributesFactory()
+        .setTotalNumBuckets(TOTAL_BUCKET_NUM).setLocalMaxMemory(0).create());
+    factory.create(REGION_NAME);
+  }
+
+  private void feed(String valueStub) {
+    memberVMS[1].invoke(() -> {
+      Region region = getCache().getRegion(REGION_NAME);
+      IntStream.range(0, DATA_SIZE).forEach(i -> region.put(i, valueStub + i));
+    });
+  }
+
+  private void verifyRegionSize(int expectedNum) {
+    Region region = getCache().getRegion(REGION_NAME);
+    assertThat(region.size()).isEqualTo(expectedNum);
+  }
+
+  protected void giiTestHookSyncWithClear(boolean clearBeforeGII) {
+    // set test hook at server3, the provider
+    memberVMS[3].invoke(() -> {
+      PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(REGION_NAME);
+      List<Integer> localBucketList = pr.getLocalBucketsListTestOnly();
+      final String bucketName = "_B__testPR_" + localBucketList.get(0);
+
+      PauseDuringGIICallback myGIITestHook =
+          // using bucket name for region name to ensure callback is triggered
+          new PauseDuringGIICallback(giiTestHookType, bucketName);
+      InitialImageOperation.setGIITestHook(myGIITestHook);
+    });
+
+    memberVMS[2].invoke(() -> getCache().getRegion(REGION_NAME).close());
+    feed("valueTwo");
+
+    for (int i = 1; i < memberVMS.length; i++) {
+      memberVMS[i].invoke(() -> {
+        DistributionMessageObserver.setInstance(new PauseDuringClearDistributionMessageObserver());
+      });
+    }
+
+    AsyncInvocation asyncGII = memberVMS[2].invokeAsync(() -> initDataStore(regionShortcut));
+    AsyncInvocation asyncClear =
+        memberVMS[1].invokeAsync(() -> getCache().getRegion(REGION_NAME).clear());
+
+    waitForGIITeskHookStarted(memberVMS[3], giiTestHookType);
+
+    if (clearBeforeGII) {
+      for (int i = 1; i < memberVMS.length; i++) {
+        memberVMS[i].invoke(() -> {
+          PauseDuringClearDistributionMessageObserver observer =
+              (PauseDuringClearDistributionMessageObserver) DistributionMessageObserver
+                  .getInstance();
+          DistributionMessageObserver.setInstance(null);
+          observer.latch.countDown();
+        });
+      }
+
+      memberVMS[3].invoke(() -> {
+        InitialImageOperation.resetGIITestHook(giiTestHookType, true);
+      });
+    } else {
+      memberVMS[3].invoke(() -> {
+        InitialImageOperation.resetGIITestHook(giiTestHookType, true);
+      });
+
+      for (int i = 1; i < memberVMS.length; i++) {
+        memberVMS[i].invoke(() -> {
+          PauseDuringClearDistributionMessageObserver observer =
+              (PauseDuringClearDistributionMessageObserver) DistributionMessageObserver
+                  .getInstance();
+          DistributionMessageObserver.setInstance(null);
+          observer.latch.countDown();
+        });
+      }
+    }
+
+    try {
+      asyncGII.join(10000);
+    } catch (InterruptedException ex) {
+      Assert.fail("Async recreate region interupted" + ex.getMessage());
+    }
+    try {
+      asyncClear.join(10000);
+    } catch (InterruptedException ex) {
+      Assert.fail("Async clear interupted" + ex.getMessage());
+    }
+
+    if (asyncClear.exceptionOccurred()) {
+      assertThat(asyncClear.getException() instanceof PartitionedRegionPartialClearException);
+    } else {
+      verifyRegionSizes(0);
+    }
+  }
+
+  @Test
+  public void clearBeforeGIIShouldClearTheRegion() {
+    giiTestHookSyncWithClear(true);
+  }
+
+  @Test
+  public void clearAfterGIIShouldClearTheRegion() {
+    giiTestHookSyncWithClear(false);
+  }
+
+  private void verifyRegionSizes(int expectedSize) {
+    for (int i = 2; i < memberVMS.length; i++) {
+      memberVMS[i].invoke(() -> {
+        PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(REGION_NAME);
+        for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) {
+          logger.info("verifyRegionSizes:" + br.getFullPath() + ":"
+              + br.getBucketAdvisor().isPrimary() + ":" + br.size());
+        }
+      });
+    }
+    for (int i = 1; i < memberVMS.length; i++) {
+      memberVMS[i].invoke(() -> verifyRegionSize(expectedSize));
+    }
+  }
+
+  public void waitForGIITeskHookStarted(final MemberVM vm,
+      final InitialImageOperation.GIITestHookType callbacktype) {
+    SerializableRunnable waitForCallbackStarted = new SerializableRunnable() {
+      @Override
+      public void run() {
+
+        final InitialImageOperation.GIITestHook callback =
+            getGIITestHookForCheckingPurpose(callbacktype);
+        WaitCriterion ev = new WaitCriterion() {
+
+          @Override
+          public boolean done() {
+            return (callback != null && callback.isRunning);
+          }
+
+          @Override
+          public String description() {
+            return null;
+          }
+        };
+
+        GeodeAwaitility.await().untilAsserted(ev);
+        if (callback == null || !callback.isRunning) {
+          fail("GII tesk hook is not started yet");
+        }
+      }
+    };
+    vm.invoke(waitForCallbackStarted);
+  }
+
+  private static class PauseDuringGIICallback extends InitialImageOperation.GIITestHook {
+    private Object lockObject = new Object();
+
+    public PauseDuringGIICallback(InitialImageOperation.GIITestHookType type, String region_name) {
+      super(type, region_name);
+    }
+
+    @Override
+    public void reset() {
+      synchronized (this.lockObject) {
+        this.lockObject.notify();
+      }
+    }
+
+    @Override
+    public void run() {
+      synchronized (this.lockObject) {
+        try {
+          isRunning = true;
+          this.lockObject.wait();
+        } catch (InterruptedException e) {
+        }
+      }
+    }
+  } // Mycallback
+
+  private class PauseDuringClearDistributionMessageObserver
+      extends DistributionMessageObserver {
+    private CountDownLatch latch = new CountDownLatch(1);
+
+    @Override
+    public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
+      if (message instanceof PartitionedRegionClearMessage) {
+        PartitionedRegionClearMessage prcm = (PartitionedRegionClearMessage) message;
+        try {
+          logger.info("before wait for clear message");
+          latch.await();
+          logger.info("after wait for clear message");
+        } catch (InterruptedException ex) {
+        }
+      }
+    }
+  }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClearPRDuringGIIRequesterDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClearPRDuringGIIRequesterDUnitTest.java
new file mode 100644
index 0000000..8a31127
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClearPRDuringGIIRequesterDUnitTest.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT;
+import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_OVERFLOW;
+import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_PERSISTENT;
+import static org.apache.geode.internal.cache.InitialImageOperation.getGIITestHookForCheckingPurpose;
+import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.fail;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.IntStream;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.PartitionedRegionPartialClearException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.Assert;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.SerializableRunnable;
+import org.apache.geode.test.dunit.WaitCriterion;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+@RunWith(Parameterized.class)
+public class ClearPRDuringGIIRequesterDUnitTest implements Serializable {
+  protected static final String REGION_NAME = "testPR";
+  protected static final int TOTAL_BUCKET_NUM = 10;
+  protected static final int DATA_SIZE = 100;
+  protected static final int NUM_SERVERS = 3;
+
+  @Parameterized.Parameter(0)
+  public RegionShortcut regionShortcut;
+
+  @Parameterized.Parameter(1)
+  public InitialImageOperation.GIITestHookType giiTestHookType;
+
+  protected int locatorPort;
+  protected MemberVM[] memberVMS;
+
+  private static final Logger logger = LogManager.getLogger();
+
+  static RegionShortcut[] regionTypes() {
+    return new RegionShortcut[] {
+        PARTITION_REDUNDANT, PARTITION_REDUNDANT_OVERFLOW, PARTITION_REDUNDANT_PERSISTENT
+    };
+  }
+
+  @Parameterized.Parameters(name = "{index}: regionShortcut={0} {1}")
+  public static Collection<Object[]> getCombinations() {
+    List<Object[]> params = new ArrayList<>();
+    RegionShortcut[] regionShortcuts = regionTypes();
+    Arrays.stream(regionShortcuts).forEach(regionShortcut -> {
+      params.add(
+          new Object[] {regionShortcut, InitialImageOperation.GIITestHookType.AfterSavedRVVEnd});
+      params.add(new Object[] {regionShortcut,
+          InitialImageOperation.GIITestHookType.BeforeCleanExpiredTombstones});
+      params.add(
+          new Object[] {regionShortcut, InitialImageOperation.GIITestHookType.DuringApplyDelta});
+      params.add(new Object[] {regionShortcut,
+          InitialImageOperation.GIITestHookType.AfterReceivedImageReply});
+      params.add(new Object[] {regionShortcut,
+          InitialImageOperation.GIITestHookType.AfterSentRequestImage});
+      params.add(new Object[] {regionShortcut,
+          InitialImageOperation.GIITestHookType.AfterSavedReceivedRVV});
+      params.add(new Object[] {regionShortcut,
+          InitialImageOperation.GIITestHookType.BeforeSavedReceivedRVV});
+      params.add(new Object[] {regionShortcut,
+          InitialImageOperation.GIITestHookType.AfterCalculatedUnfinishedOps});
+      params.add(
+          new Object[] {regionShortcut, InitialImageOperation.GIITestHookType.AfterRequestRVV});
+      params.add(
+          new Object[] {regionShortcut, InitialImageOperation.GIITestHookType.BeforeRequestRVV});
+    });
+    return params;
+  }
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule(NUM_SERVERS + 1);
+
+  @Before
+  public void setUp() throws Exception {
+    // serverVMs[1] is the accessor, which is feeder and invokes clear
+    // serverVMs[2] will add GII TestHook of requester
+    // serverVMs[3] will be GII provider for the specified bucket region
+    memberVMS = new MemberVM[NUM_SERVERS + 1];
+    memberVMS[0] = cluster.startLocatorVM(0);
+    locatorPort = memberVMS[0].getPort();
+    memberVMS[1] = cluster.startServerVM(1, locatorPort);
+    memberVMS[1].invoke(() -> initAccessor());
+    for (int i = 2; i <= NUM_SERVERS; i++) {
+      memberVMS[i] = cluster.startServerVM(i, locatorPort);
+      memberVMS[i].invoke(() -> initDataStore(regionShortcut));
+    }
+    feed("valueOne");
+    verifyRegionSizes(DATA_SIZE);
+  }
+
+  @After
+  public final void preTearDown() throws Exception {
+    for (int i = 1; i <= NUM_SERVERS; i++) {
+      memberVMS[i].invoke(() -> InitialImageOperation.resetAllGIITestHooks());
+    }
+  }
+
+  private void initDataStore(RegionShortcut regionShortcut) {
+    RegionFactory factory = getCache().createRegionFactory(regionShortcut);
+    factory.setPartitionAttributes(
+        new PartitionAttributesFactory().setTotalNumBuckets(TOTAL_BUCKET_NUM).create());
+    factory.create(REGION_NAME);
+  }
+
+  private void initAccessor() {
+    RegionFactory factory = getCache().createRegionFactory(PARTITION_REDUNDANT);
+    factory.setPartitionAttributes(new PartitionAttributesFactory()
+        .setTotalNumBuckets(TOTAL_BUCKET_NUM).setLocalMaxMemory(0).create());
+    factory.create(REGION_NAME);
+  }
+
+  private void feed(String valueStub) {
+    memberVMS[1].invoke(() -> {
+      Region region = getCache().getRegion(REGION_NAME);
+      IntStream.range(0, DATA_SIZE).forEach(i -> region.put(i, valueStub + i));
+    });
+  }
+
+  private void verifyRegionSize(int expectedNum) {
+    Region region = getCache().getRegion(REGION_NAME);
+    assertThat(region.size()).isEqualTo(expectedNum);
+  }
+
+  protected void giiTestHookSyncWithClear(boolean clearBeforeGII) {
+    // set test hook at server2, the requester
+    memberVMS[2].invoke(() -> {
+      PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(REGION_NAME);
+      List<Integer> localBucketList = pr.getLocalBucketsListTestOnly();
+      final String bucketName = "_B__testPR_" + localBucketList.get(0);
+
+      PauseDuringGIICallback myGIITestHook =
+          // using bucket name for region name to ensure callback is triggered
+          new PauseDuringGIICallback(giiTestHookType, bucketName);
+      InitialImageOperation.setGIITestHook(myGIITestHook);
+    });
+
+    memberVMS[2].invoke(() -> getCache().getRegion(REGION_NAME).close());
+    feed("valueTwo");
+
+    for (int i = 1; i < memberVMS.length; i++) {
+      memberVMS[i].invoke(() -> {
+        DistributionMessageObserver.setInstance(new PauseDuringClearDistributionMessageObserver());
+      });
+    }
+
+    AsyncInvocation asyncGII = memberVMS[2].invokeAsync(() -> initDataStore(regionShortcut));
+    AsyncInvocation asyncClear =
+        memberVMS[1].invokeAsync(() -> getCache().getRegion(REGION_NAME).clear());
+
+    waitForGIITeskHookStarted(memberVMS[2], giiTestHookType);
+
+    if (clearBeforeGII) {
+      for (int i = 1; i < memberVMS.length; i++) {
+        memberVMS[i].invoke(() -> {
+          PauseDuringClearDistributionMessageObserver observer =
+              (PauseDuringClearDistributionMessageObserver) DistributionMessageObserver
+                  .getInstance();
+          DistributionMessageObserver.setInstance(null);
+          observer.latch.countDown();
+        });
+      }
+
+      memberVMS[2].invoke(() -> {
+        InitialImageOperation.resetGIITestHook(giiTestHookType, true);
+      });
+    } else {
+      memberVMS[2].invoke(() -> {
+        InitialImageOperation.resetGIITestHook(giiTestHookType, true);
+      });
+
+      for (int i = 1; i < memberVMS.length; i++) {
+        memberVMS[i].invoke(() -> {
+          PauseDuringClearDistributionMessageObserver observer =
+              (PauseDuringClearDistributionMessageObserver) DistributionMessageObserver
+                  .getInstance();
+          DistributionMessageObserver.setInstance(null);
+          observer.latch.countDown();
+        });
+      }
+    }
+
+    try {
+      asyncGII.join(10000);
+    } catch (InterruptedException ex) {
+      Assert.fail("Async recreate region interupted" + ex.getMessage());
+    }
+    try {
+      asyncClear.join(10000);
+    } catch (InterruptedException ex) {
+      Assert.fail("Async clear interupted" + ex.getMessage());
+    }
+
+    if (!clearBeforeGII && asyncClear.exceptionOccurred()) {
+      assertThat(asyncClear.getException() instanceof PartitionedRegionPartialClearException);
+    } else {
+      verifyRegionSizes(0);
+    }
+  }
+
+  @Test
+  public void clearBeforeGIIShouldClearTheRegion() {
+    giiTestHookSyncWithClear(true);
+  }
+
+  @Test
+  public void clearAfterGIIShouldClearTheRegion() {
+    giiTestHookSyncWithClear(false);
+  }
+
+  private void verifyRegionSizes(int expectedSize) {
+    for (int i = 2; i < memberVMS.length; i++) {
+      memberVMS[i].invoke(() -> {
+        PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(REGION_NAME);
+        for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) {
+          logger.info("verifyRegionSizes:" + br.getFullPath() + ":"
+              + br.getBucketAdvisor().isPrimary() + ":" + br.size());
+        }
+      });
+    }
+    for (int i = 1; i < memberVMS.length; i++) {
+      memberVMS[i].invoke(() -> verifyRegionSize(expectedSize));
+    }
+  }
+
+  public void waitForGIITeskHookStarted(final MemberVM vm,
+      final InitialImageOperation.GIITestHookType callbacktype) {
+    SerializableRunnable waitForCallbackStarted = new SerializableRunnable() {
+      @Override
+      public void run() {
+
+        final InitialImageOperation.GIITestHook callback =
+            getGIITestHookForCheckingPurpose(callbacktype);
+        WaitCriterion ev = new WaitCriterion() {
+
+          @Override
+          public boolean done() {
+            return (callback != null && callback.isRunning);
+          }
+
+          @Override
+          public String description() {
+            return null;
+          }
+        };
+
+        GeodeAwaitility.await().untilAsserted(ev);
+        if (callback == null || !callback.isRunning) {
+          fail("GII tesk hook is not started yet");
+        }
+      }
+    };
+    vm.invoke(waitForCallbackStarted);
+  }
+
+  private static class PauseDuringGIICallback extends InitialImageOperation.GIITestHook {
+    private Object lockObject = new Object();
+
+    public PauseDuringGIICallback(InitialImageOperation.GIITestHookType type, String region_name) {
+      super(type, region_name);
+    }
+
+    @Override
+    public void reset() {
+      synchronized (this.lockObject) {
+        this.lockObject.notify();
+      }
+    }
+
+    @Override
+    public void run() {
+      synchronized (this.lockObject) {
+        try {
+          isRunning = true;
+          this.lockObject.wait();
+        } catch (InterruptedException e) {
+        }
+      }
+    }
+  } // Mycallback
+
+  private class PauseDuringClearDistributionMessageObserver
+      extends DistributionMessageObserver {
+    private CountDownLatch latch = new CountDownLatch(1);
+
+    @Override
+    public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
+      if (message instanceof PartitionedRegionClearMessage) {
+        PartitionedRegionClearMessage prcm = (PartitionedRegionClearMessage) message;
+        try {
+          logger.info("before wait for clear message");
+          latch.await();
+          logger.info("after wait for clear message");
+        } catch (InterruptedException ex) {
+        }
+      }
+    }
+  }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
index b871926..fd19d4b 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
@@ -130,8 +130,8 @@ public class PartitionedRegionClearDUnitTest implements Serializable {
     }
     RegionFactory factory = getCache().createRegionFactory(shortcut)
         .setPartitionAttributes(
-            new PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create())
-        .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create());
+            new PartitionAttributesFactory().setTotalNumBuckets(TOTAL_BUCKET_NUM)
+                .setLocalMaxMemory(0).create());
     if (withWriter) {
       factory.setCacheWriter(new CountingCacheWriter());
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 49f6aad..0651768 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -567,19 +567,13 @@ public class BucketRegion extends DistributedRegion implements Bucket {
    */
   @Override
   public void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) {
-    if (!getBucketAdvisor().isPrimary()) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Not primary bucket when doing clear, do nothing");
-      }
-      return;
-    }
-
     // get rvvLock
     Set<InternalDistributedMember> participants =
         getCacheDistributionAdvisor().adviseInvalidateRegion();
     boolean isLockedAlready = this.partitionedRegion.getPartitionedRegionClear()
         .isLockedForListenerAndClientNotification();
 
+    boolean lockedForPrimary = doLockForPrimary(false);
     try {
       obtainWriteLocksForClear(regionEvent, participants, isLockedAlready);
       // no need to dominate my own rvv.
@@ -590,6 +584,9 @@ public class BucketRegion extends DistributedRegion implements Bucket {
       // TODO: call reindexUserDataRegion if there're lucene indexes
     } finally {
       releaseWriteLocksForClear(regionEvent, participants, isLockedAlready);
+      if (lockedForPrimary) {
+        doUnlockForPrimary();
+      }
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
index 3278a3e..a7b691c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
@@ -846,12 +846,12 @@ public class InitialImageOperation {
       }
       List<Entry> entriesToSynchronize = new ArrayList<>();
 
+      if (internalDuringApplyDelta != null && !internalDuringApplyDelta.isRunning
+          && internalDuringApplyDelta.getRegionName().equals(this.region.getName())) {
+        internalDuringApplyDelta.run();
+      }
       for (int i = 0; i < entryCount; i++) {
         // stream is null-terminated
-        if (internalDuringApplyDelta != null && !internalDuringApplyDelta.isRunning
-            && internalDuringApplyDelta.getRegionName().equals(this.region.getName())) {
-          internalDuringApplyDelta.run();
-        }
         if (slow > 0) {
           // make sure we are still slow
           slow = slowImageProcessing;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
index 569f78c..4533f8b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
@@ -165,8 +165,8 @@ public class PartitionedRegionClear {
                     new RegionEventImpl(localPrimaryBucketRegion, Operation.REGION_CLEAR, null,
                         false, partitionedRegion.getMyId(), regionEvent.getEventId());
                 localPrimaryBucketRegion.cmnClearRegion(bucketRegionEvent, false, true);
+                clearedBuckets.add(localPrimaryBucketRegion.getId());
               }
-              clearedBuckets.add(localPrimaryBucketRegion.getId());
             }
 
             if (getMembershipChange()) {