You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2019/07/08 16:23:43 UTC

[geode] branch develop updated: GEODE-6918: Cleanup PRHARedundancyProvider and PRHARedundancyProviderTest (#3779)

This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new f688bbb  GEODE-6918: Cleanup PRHARedundancyProvider and PRHARedundancyProviderTest (#3779)
f688bbb is described below

commit f688bbbe773b141dce6d635e830295304186886e
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Mon Jul 8 09:23:31 2019 -0700

    GEODE-6918: Cleanup PRHARedundancyProvider and PRHARedundancyProviderTest (#3779)
    
    * Cleanup recommendations found in review
    * Use constructor injection
    * Use MockitoRule and STRICT_STUBS
    * Improve field and variable names
    * Improve overall consistency, formatting, and comments
    
    Co-authored-by: Aaron Lindsey <al...@pivotal.io>
---
 .../control/RebalanceOperationDistributedTest.java |   4 +-
 .../internal/cache/PRHARedundancyProvider.java     | 741 +++++++++++----------
 .../geode/internal/cache/PartitionedRegion.java    |   2 +-
 .../internal/cache/PRHARedundancyProviderTest.java |  72 +-
 .../partitioned/PersistentBucketRecovererTest.java |   8 +-
 5 files changed, 459 insertions(+), 368 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java
index 69a1a85..53ef1a7 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java
@@ -36,6 +36,7 @@ import static org.mockito.Mockito.anyBoolean;
 import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
 import java.io.File;
@@ -566,7 +567,8 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
 
       PartitionedRegion origRegion = (PartitionedRegion) cache.getRegion("region1");
       PartitionedRegion spyRegion = spy(origRegion);
-      PRHARedundancyProvider redundancyProvider = spy(new PRHARedundancyProvider(spyRegion));
+      PRHARedundancyProvider redundancyProvider =
+          spy(new PRHARedundancyProvider(spyRegion, mock(InternalResourceManager.class)));
 
       // return the spied region when ever getPartitionedRegions() is invoked
       Set<PartitionedRegion> parRegions = cache.getPartitionedRegions();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
index 9f0eb3b..cf70f2f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
@@ -14,9 +14,13 @@
  */
 package org.apache.geode.internal.cache;
 
+import static java.lang.String.format;
 import static java.lang.System.lineSeparator;
+import static java.util.Collections.emptyList;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.geode.distributed.internal.DistributionConfig.GEMFIRE_PREFIX;
+import static org.apache.geode.internal.cache.ColocationHelper.checkMembersColocation;
+import static org.apache.geode.internal.cache.PartitionedRegionHelper.printCollection;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -34,12 +38,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
 
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.annotations.Immutable;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.annotations.internal.MakeNotStatic;
 import org.apache.geode.cache.PartitionedRegionStorageException;
 import org.apache.geode.cache.Region;
@@ -93,13 +99,13 @@ import org.apache.geode.internal.monitoring.ThreadsMonitoring;
  * This class provides the redundancy management for partitioned region. It will provide the
  * following to the PartitionedRegion:
  *
- * <pre>
- * (1) Redundancy Management at the time of bucket creation.
- * (2) Redundancy management at the new node arrival.
- * (3) Redundancy management when the node leaves the partitioned region distributed system
+ * <ol>
+ * <li>Redundancy management at the time of bucket creation.
+ * <li>Redundancy management at the new node arrival.
+ * <li>Redundancy management when the node leaves the partitioned region distributed system
  * gracefully. i.e. Cache.close()
- * (4) Redundancy management at random node failure.
- * </pre>
+ * <li>Redundancy management at random node failure.
+ * </ol>
  */
 public class PRHARedundancyProvider {
   private static final Logger logger = LogService.getLogger();
@@ -145,7 +151,7 @@ public class PRHARedundancyProvider {
   @MakeNotStatic
   private final AtomicBoolean firstInsufficientStoresLogged = new AtomicBoolean(false);
 
-  private final PartitionedRegion prRegion;
+  private final PartitionedRegion partitionedRegion;
 
   /**
    * An executor to submit tasks for redundancy recovery too. It makes sure that there will only be
@@ -155,6 +161,8 @@ public class PRHARedundancyProvider {
 
   private final Object shutdownLock = new Object();
 
+  private final BiFunction<PRHARedundancyProvider, Integer, PersistentBucketRecoverer> persistentBucketRecovererFunction;
+
   private volatile ScheduledFuture<?> recoveryFuture;
 
   /**
@@ -164,58 +172,69 @@ public class PRHARedundancyProvider {
 
   private boolean shutdown;
 
+
   /**
    * Constructor for PRHARedundancyProvider.
    *
-   * @param region The PartitionedRegion for which the HA redundancy is required to be managed.
+   * @param partitionedRegion The PartitionedRegion for which the HA redundancy is required to be
+   *        managed.
    */
-  public PRHARedundancyProvider(final PartitionedRegion region) {
-    prRegion = region;
-    final InternalResourceManager resourceManager =
-        region.getGemFireCache().getInternalResourceManager();
+  public PRHARedundancyProvider(PartitionedRegion partitionedRegion,
+      InternalResourceManager resourceManager) {
+    this(partitionedRegion, resourceManager, PersistentBucketRecoverer::new);
+  }
+
+  @VisibleForTesting
+  PRHARedundancyProvider(PartitionedRegion partitionedRegion,
+      InternalResourceManager resourceManager,
+      BiFunction<PRHARedundancyProvider, Integer, PersistentBucketRecoverer> persistentBucketRecovererFunction) {
+    this.partitionedRegion = partitionedRegion;
     recoveryExecutor = new OneTaskOnlyExecutor(resourceManager.getExecutor(),
-        () -> InternalResourceManager.getResourceObserver().recoveryConflated(region),
+        () -> InternalResourceManager.getResourceObserver().recoveryConflated(partitionedRegion),
         getThreadMonitorObj());
+    this.persistentBucketRecovererFunction = persistentBucketRecovererFunction;
   }
 
   /**
    * Display bucket allocation status
    *
-   * @param prRegion the given region
+   * @param partitionedRegion the given region
    * @param allStores the list of available stores. If null, unknown.
    * @param alreadyUsed stores allocated; only used if allStores != null
    * @param forLog true if the generated string is for a log message
    * @return the description string
    */
-  private static String regionStatus(PartitionedRegion prRegion,
+  private static String regionStatus(PartitionedRegion partitionedRegion,
       Collection<InternalDistributedMember> allStores,
       Collection<InternalDistributedMember> alreadyUsed, boolean forLog) {
+    String newLine = forLog ? " " : lineSeparator();
+    String spaces = forLog ? "" : "   ";
+
     StringBuilder sb = new StringBuilder();
-    sb.append("Partitioned Region name = ").append(prRegion.getFullPath());
-    final String newLine;
-    final String spaces;
-    if (forLog) {
-      newLine = " ";
-      spaces = "";
-    } else {
-      newLine = lineSeparator();
-      spaces = "   ";
-    }
+    sb.append("Partitioned Region name = ");
+    sb.append(partitionedRegion.getFullPath());
+
     if (allStores != null) {
-      sb.append(newLine).append(spaces).append("Redundancy level set to ")
-          .append(prRegion.getRedundantCopies());
-      sb.append(newLine).append(". Number of available data stores: ").append(allStores.size());
-      sb.append(newLine).append(spaces).append(". Number successfully allocated = ")
-          .append(alreadyUsed.size());
-      sb.append(newLine).append(". Data stores: ")
-          .append(PartitionedRegionHelper.printCollection(allStores));
-      sb.append(newLine).append(". Data stores successfully allocated: ")
-          .append(PartitionedRegionHelper.printCollection(alreadyUsed));
-      sb.append(newLine).append(". Equivalent members: ").append(PartitionedRegionHelper
-          .printCollection(prRegion
-              .getDistributionManager()
-              .getMembersInThisZone()));
+      sb.append(newLine).append(spaces);
+      sb.append("Redundancy level set to ");
+      sb.append(partitionedRegion.getRedundantCopies());
+      sb.append(newLine);
+      sb.append(". Number of available data stores: ");
+      sb.append(allStores.size());
+      sb.append(newLine).append(spaces);
+      sb.append(". Number successfully allocated = ");
+      sb.append(alreadyUsed.size());
+      sb.append(newLine);
+      sb.append(". Data stores: ");
+      sb.append(printCollection(allStores));
+      sb.append(newLine);
+      sb.append(". Data stores successfully allocated: ");
+      sb.append(printCollection(alreadyUsed));
+      sb.append(newLine);
+      sb.append(". Equivalent members: ");
+      sb.append(printCollection(partitionedRegion.getDistributionManager().getMembersInThisZone()));
     }
+
     return sb.toString();
   }
 
@@ -226,31 +245,28 @@ public class PRHARedundancyProvider {
    * @param alreadyUsed those that have already accepted, only used if allStores != null
    * @param opString description of the operation which timed out
    */
-  public static void timedOut(PartitionedRegion prRegion, Set<InternalDistributedMember> allStores,
-      Collection<InternalDistributedMember> alreadyUsed,
-      String opString, long timeOut) {
-    final String tooManyRetries =
-        String.format("Timed out attempting to %s in the partitioned region.%sWaited for: %s ms.",
-            opString,
-            regionStatus(prRegion, allStores, alreadyUsed, true), timeOut)
-            + TIMEOUT_MSG;
-    throw new PartitionedRegionStorageException(tooManyRetries);
+  public static void timedOut(PartitionedRegion partitionedRegion,
+      Set<InternalDistributedMember> allStores,
+      Collection<InternalDistributedMember> alreadyUsed, String opString, long timeOut) {
+    throw new PartitionedRegionStorageException(
+        format("Timed out attempting to %s in the partitioned region.%sWaited for: %s ms.",
+            opString, regionStatus(partitionedRegion, allStores, alreadyUsed, true), timeOut)
+            + TIMEOUT_MSG);
   }
 
   public PartitionedRegion getPartitionedRegion() {
-    return prRegion;
+    return partitionedRegion;
   }
 
   private Set<InternalDistributedMember> getAllStores(String partitionName) {
     if (partitionName != null) {
-
       return getFixedPartitionStores(partitionName);
     }
     final Set<InternalDistributedMember> allStores =
-        prRegion.getRegionAdvisor().adviseDataStore(true);
-    PartitionedRegionDataStore myDS = prRegion.getDataStore();
-    if (myDS != null) {
-      allStores.add(prRegion.getDistributionManager().getId());
+        partitionedRegion.getRegionAdvisor().adviseDataStore(true);
+    PartitionedRegionDataStore localDataStore = partitionedRegion.getDataStore();
+    if (localDataStore != null) {
+      allStores.add(partitionedRegion.getDistributionManager().getId());
     }
     return allStores;
   }
@@ -263,14 +279,15 @@ public class PRHARedundancyProvider {
    */
   private Set<InternalDistributedMember> getFixedPartitionStores(String partitionName) {
     Set<InternalDistributedMember> members =
-        prRegion.getRegionAdvisor().adviseFixedPartitionDataStores(partitionName);
+        partitionedRegion.getRegionAdvisor().adviseFixedPartitionDataStores(partitionName);
 
-    List<FixedPartitionAttributesImpl> FPAs = prRegion.getFixedPartitionAttributesImpl();
+    List<FixedPartitionAttributesImpl> allFixedPartitionAttributes =
+        partitionedRegion.getFixedPartitionAttributesImpl();
 
-    if (FPAs != null) {
-      for (FixedPartitionAttributesImpl fpa : FPAs) {
-        if (fpa.getPartitionName().equals(partitionName)) {
-          members.add(prRegion.getMyId());
+    if (allFixedPartitionAttributes != null) {
+      for (FixedPartitionAttributesImpl fixedPartitionAttributes : allFixedPartitionAttributes) {
+        if (fixedPartitionAttributes.getPartitionName().equals(partitionName)) {
+          members.add(partitionedRegion.getMyId());
         }
       }
     }
@@ -286,28 +303,17 @@ public class PRHARedundancyProvider {
    */
   private void insufficientStores(Set<InternalDistributedMember> allStores,
       Collection<InternalDistributedMember> alreadyUsed, boolean onlyLog) {
-    final String regionStat = regionStatus(prRegion, allStores, alreadyUsed, onlyLog);
-    final String newLine;
+    String regionStat = regionStatus(partitionedRegion, allStores, alreadyUsed, onlyLog);
+    String newLine = onlyLog ? " " : lineSeparator();
+    String notEnoughValidNodes = alreadyUsed.isEmpty()
+        ? "Unable to find any members to host a bucket in the partitioned region. %s.%s"
+        : "Configured redundancy level could not be satisfied. %s to satisfy redundancy for the region.%s";
     if (onlyLog) {
-      newLine = " ";
-    } else {
-      newLine = lineSeparator();
-    }
-    final String notEnoughValidNodes;
-    if (alreadyUsed.isEmpty()) {
-      notEnoughValidNodes =
-          "Unable to find any members to host a bucket in the partitioned region. %s.%s";
-    } else {
-      notEnoughValidNodes =
-          "Configured redundancy level could not be satisfied. %s to satisfy redundancy for the region.%s";
-    }
-    final Object[] notEnoughValidNodesArgs = new Object[] {
-        INSUFFICIENT_STORES_MSG, newLine + regionStat + newLine};
-    if (onlyLog) {
-      logger.warn(String.format(notEnoughValidNodes, notEnoughValidNodesArgs));
+      logger.warn(format(notEnoughValidNodes, INSUFFICIENT_STORES_MSG,
+          newLine + regionStat + newLine));
     } else {
       throw new PartitionedRegionStorageException(
-          String.format(notEnoughValidNodes, notEnoughValidNodesArgs));
+          format(notEnoughValidNodes, INSUFFICIENT_STORES_MSG, newLine + regionStat + newLine));
     }
   }
 
@@ -322,13 +328,13 @@ public class PRHARedundancyProvider {
    * @return the new member, null if it fails.
    * @throws PartitionedRegionStorageException if there are not enough data stores
    */
-  private InternalDistributedMember createBucketInstance(int bucketId, final int newBucketSize,
-      final Collection<InternalDistributedMember> excludedMembers,
+  private InternalDistributedMember createBucketInstance(int bucketId, int newBucketSize,
+      Collection<InternalDistributedMember> excludedMembers,
       Collection<InternalDistributedMember> alreadyUsed,
-      ArrayListWithClearState<InternalDistributedMember> failedMembers, final long timeOut,
-      final Set<InternalDistributedMember> allStores) {
+      ArrayListWithClearState<InternalDistributedMember> failedMembers, long timeOut,
+      Set<InternalDistributedMember> allStores) {
 
-    final boolean isDebugEnabled = logger.isDebugEnabled();
+    boolean isDebugEnabled = logger.isDebugEnabled();
 
     // Recalculate list of candidates
     Set<InternalDistributedMember> candidateMembers = new HashSet<>(allStores);
@@ -340,8 +346,9 @@ public class PRHARedundancyProvider {
       logger.debug("AllStores={} AlreadyUsed={} excluded={} failed={}", allStores, alreadyUsed,
           excludedMembers, failedMembers);
     }
+
     if (candidateMembers.isEmpty()) {
-      prRegion.checkReadiness();
+      partitionedRegion.checkReadiness();
 
       // Run out of candidates. Refetch?
       if (System.currentTimeMillis() > timeOut) {
@@ -373,18 +380,18 @@ public class PRHARedundancyProvider {
       return null;
     }
 
-    // In case of FPR, candidateMembers is the set of members on which
+    // In case of FixedPartitionedRegion, candidateMembers is the set of members on which
     // required fixed partition is defined.
     InternalDistributedMember candidate;
-    if (prRegion.isFixedPartitionedRegion()) {
+    if (partitionedRegion.isFixedPartitionedRegion()) {
       candidate = candidateMembers.iterator().next();
     } else {
-      String prName = prRegion.getAttributes().getPartitionAttributes().getColocatedWith();
-      if (prName != null) {
-        candidate = getColocatedDataStore(candidateMembers, alreadyUsed, bucketId, prName);
+      String colocatedWith =
+          partitionedRegion.getAttributes().getPartitionAttributes().getColocatedWith();
+      if (colocatedWith != null) {
+        candidate = getColocatedDataStore(candidateMembers, alreadyUsed, bucketId, colocatedWith);
       } else {
-        final Collection<InternalDistributedMember> orderedCandidates =
-            new ArrayList<>(candidateMembers);
+        Collection<InternalDistributedMember> orderedCandidates = new ArrayList<>(candidateMembers);
         candidate = getPreferredDataStore(orderedCandidates, alreadyUsed);
       }
     }
@@ -394,23 +401,23 @@ public class PRHARedundancyProvider {
       return null;
     }
 
-    if (!prRegion.isShadowPR()
-        && !ColocationHelper.checkMembersColocation(prRegion, candidate)) {
+    if (!partitionedRegion.isShadowPR() && !checkMembersColocation(partitionedRegion, candidate)) {
       if (isDebugEnabled) {
         logger.debug(
-            "createBucketInstances - Member does not have all of the regions colocated with prRegion {}",
+            "createBucketInstances - Member does not have all of the regions colocated with partitionedRegion {}",
             candidate);
       }
       failedMembers.add(candidate);
       return null;
     }
 
-    if (!candidate.equals(prRegion.getMyId())) {
-      PartitionProfile pp = prRegion.getRegionAdvisor().getPartitionProfile(candidate);
-      if (pp == null) {
+    if (!candidate.equals(partitionedRegion.getMyId())) {
+      PartitionProfile profile =
+          partitionedRegion.getRegionAdvisor().getPartitionProfile(candidate);
+      if (profile == null) {
         if (isDebugEnabled) {
           logger.debug("createBucketInstance: {}: no partition profile for {}",
-              prRegion.getFullPath(), candidate);
+              partitionedRegion.getFullPath(), candidate);
         }
         failedMembers.add(candidate);
         return null;
@@ -419,7 +426,7 @@ public class PRHARedundancyProvider {
 
     // Coordinate with any remote close occurring, causing it to wait until
     // this create bucket attempt has been made.
-    final ManageBucketRsp response =
+    ManageBucketRsp response =
         createBucketOnMember(bucketId, candidate, newBucketSize, failedMembers.wasCleared());
 
     // Add targetNode to bucketNodes if successful, else to failedNodeList
@@ -430,7 +437,8 @@ public class PRHARedundancyProvider {
 
     if (isDebugEnabled) {
       logger.debug("createBucketInstance: {}: candidate {} declined to manage bucketId={}: {}",
-          prRegion.getFullPath(), candidate, prRegion.bucketStringForLogs(bucketId),
+          partitionedRegion.getFullPath(), candidate,
+          partitionedRegion.bucketStringForLogs(bucketId),
           response);
     }
     if (response.equals(ManageBucketRsp.CLOSED)) {
@@ -443,31 +451,32 @@ public class PRHARedundancyProvider {
   }
 
   InternalDistributedMember createBucketOnDataStore(int bucketId, int size,
-      RetryTimeKeeper snoozer) {
+      RetryTimeKeeper retryTimeKeeper) {
+    boolean isDebugEnabled = logger.isDebugEnabled();
+
     InternalDistributedMember primaryForFixedPartition = null;
-    if (prRegion.isFixedPartitionedRegion()) {
+    if (partitionedRegion.isFixedPartitionedRegion()) {
       primaryForFixedPartition =
-          prRegion.getRegionAdvisor().adviseFixedPrimaryPartitionDataStore(bucketId);
+          partitionedRegion.getRegionAdvisor().adviseFixedPrimaryPartitionDataStore(bucketId);
     }
-    final boolean isDebugEnabled = logger.isDebugEnabled();
 
-    InternalDistributedMember ret;
+    InternalDistributedMember memberHostingBucket;
     Collection<InternalDistributedMember> attempted = new HashSet<>();
     do {
-      prRegion.checkReadiness();
+      partitionedRegion.checkReadiness();
       Set<InternalDistributedMember> available =
-          prRegion.getRegionAdvisor().adviseInitializedDataStore();
+          partitionedRegion.getRegionAdvisor().adviseInitializedDataStore();
       available.removeAll(attempted);
-      InternalDistributedMember target = null;
+      InternalDistributedMember targetMember = null;
       for (InternalDistributedMember member : available) {
         if (available.contains(primaryForFixedPartition)) {
-          target = primaryForFixedPartition;
+          targetMember = primaryForFixedPartition;
         } else {
-          target = member;
+          targetMember = member;
         }
         break;
       }
-      if (target == null) {
+      if (targetMember == null) {
         if (shouldLogInsufficientStores()) {
           insufficientStores(available, Collections.emptySet(), true);
         }
@@ -476,21 +485,23 @@ public class PRHARedundancyProvider {
       }
       try {
         if (isDebugEnabled) {
-          logger.debug("Attempting to get data store {} to create the bucket {} for us", target,
-              prRegion.bucketStringForLogs(bucketId));
+          logger.debug("Attempting to get data store {} to create the bucket {} for us",
+              targetMember,
+              partitionedRegion.bucketStringForLogs(bucketId));
         }
         CreateBucketMessage.NodeResponse response =
-            CreateBucketMessage.send(target, prRegion, bucketId, size);
-        ret = response.waitForResponse();
-        if (ret != null) {
-          return ret;
+            CreateBucketMessage.send(targetMember, partitionedRegion, bucketId, size);
+        memberHostingBucket = response.waitForResponse();
+        if (memberHostingBucket != null) {
+          return memberHostingBucket;
         }
       } catch (ForceReattemptException e) {
         // do nothing, we will already check again for a primary.
       }
-      attempted.add(target);
-    } while ((ret = prRegion.getNodeForBucketWrite(bucketId, snoozer)) == null);
-    return ret;
+      attempted.add(targetMember);
+    } while ((memberHostingBucket =
+        partitionedRegion.getNodeForBucketWrite(bucketId, retryTimeKeeper)) == null);
+    return memberHostingBucket;
   }
 
   /**
@@ -517,38 +528,40 @@ public class PRHARedundancyProvider {
    * @throws PartitionOfflineException if persistent data recovery is not complete for a partitioned
    *         region referred to in the query.
    */
-  public InternalDistributedMember createBucketAtomically(final int bucketId,
-      final int newBucketSize, final boolean finishIncompleteCreation, String partitionName)
+  public InternalDistributedMember createBucketAtomically(int bucketId, int newBucketSize,
+      boolean finishIncompleteCreation, String partitionName)
       throws PartitionedRegionStorageException, PartitionedRegionException,
       PartitionOfflineException {
-    final boolean isDebugEnabled = logger.isDebugEnabled();
+    boolean isDebugEnabled = logger.isDebugEnabled();
 
-    prRegion.checkPROffline();
+    partitionedRegion.checkPROffline();
 
     // If there are insufficient stores throw *before* we try acquiring the
     // (very expensive) bucket lock or the (somewhat expensive) monitor on this
     earlySufficientStoresCheck(partitionName);
 
     synchronized (this) {
-      if (prRegion.getCache().isCacheAtShutdownAll()) {
-        throw prRegion.getCache().getCacheClosedException("Cache is shutting down");
+      if (partitionedRegion.getCache().isCacheAtShutdownAll()) {
+        throw partitionedRegion.getCache().getCacheClosedException("Cache is shutting down");
       }
 
       if (isDebugEnabled) {
         logger.debug("Starting atomic creation of bucketId={}",
-            prRegion.bucketStringForLogs(bucketId));
+            partitionedRegion.bucketStringForLogs(bucketId));
       }
-      final long timeOut = System.currentTimeMillis() + computeTimeout();
+
+      long timeOut = System.currentTimeMillis() + computeTimeout();
       BucketMembershipObserver observer = null;
       boolean needToElectPrimary = true;
       InternalDistributedMember bucketPrimary = null;
+
       try {
-        prRegion.checkReadiness();
+        partitionedRegion.checkReadiness();
 
-        Bucket toCreate = prRegion.getRegionAdvisor().getBucket(bucketId);
+        Bucket toCreate = partitionedRegion.getRegionAdvisor().getBucket(bucketId);
 
         if (!finishIncompleteCreation) {
-          bucketPrimary = prRegion.getBucketPrimary(bucketId);
+          bucketPrimary = partitionedRegion.getBucketPrimary(bucketId);
           if (bucketPrimary != null) {
             if (isDebugEnabled) {
               logger.debug(
@@ -561,26 +574,27 @@ public class PRHARedundancyProvider {
         }
 
         observer = new BucketMembershipObserver(toCreate).beginMonitoring();
-        // detected
         ArrayListWithClearState<InternalDistributedMember> failedMembers =
             new ArrayListWithClearState<>();
         Set<InternalDistributedMember> excludedMembers = new HashSet<>();
         Collection<InternalDistributedMember> acceptedMembers = new ArrayList<>();
+
         for (boolean loggedInsufficientStores = false;;) {
-          prRegion.checkReadiness();
-          if (prRegion.getCache().isCacheAtShutdownAll()) {
+          partitionedRegion.checkReadiness();
+          if (partitionedRegion.getCache().isCacheAtShutdownAll()) {
             if (isDebugEnabled) {
               logger.debug("Aborted createBucketAtomically due to ShutdownAll");
             }
-            throw prRegion.getCache().getCacheClosedException("Cache is shutting down");
+            throw partitionedRegion.getCache().getCacheClosedException("Cache is shutting down");
           }
 
           long timeLeft = timeOut - System.currentTimeMillis();
           if (timeLeft < 0) {
             // It took too long.
-            timedOut(prRegion, getAllStores(partitionName), acceptedMembers,
+            timedOut(partitionedRegion, getAllStores(partitionName), acceptedMembers,
                 ALLOCATE_ENOUGH_MEMBERS_TO_HOST_BUCKET, computeTimeout());
           }
+
           if (isDebugEnabled) {
             logger.debug("createBucketAtomically: have {} ms left to finish this", timeLeft);
           }
@@ -593,9 +607,10 @@ public class PRHARedundancyProvider {
           InternalDistributedMember candidate = createBucketInstance(bucketId, newBucketSize,
               excludedMembers, acceptedMembers, failedMembers, timeOut, allStores);
           if (candidate != null) {
-            if (prRegion.getDistributionManager().enforceUniqueZone()) {
+            if (partitionedRegion.getDistributionManager().enforceUniqueZone()) {
               // enforceUniqueZone property has no effect for a loner
-              if (!(prRegion.getDistributionManager() instanceof LonerDistributionManager)) {
+              if (!(partitionedRegion
+                  .getDistributionManager() instanceof LonerDistributionManager)) {
                 Set<InternalDistributedMember> exm = getBuddyMembersInZone(candidate, allStores);
                 exm.remove(candidate);
                 exm.removeAll(acceptedMembers);
@@ -610,7 +625,7 @@ public class PRHARedundancyProvider {
 
           // Get an updated list of bucket owners, which should include
           // buckets created concurrently with this createBucketAtomically call
-          acceptedMembers = prRegion.getRegionAdvisor().getBucketOwners(bucketId);
+          acceptedMembers = partitionedRegion.getRegionAdvisor().getBucketOwners(bucketId);
 
           if (isDebugEnabled) {
             logger.debug("Accepted members: {}", acceptedMembers);
@@ -630,18 +645,19 @@ public class PRHARedundancyProvider {
           // select the primary.
 
           // Have we exhausted all candidates?
-          final int potentialCandidateCount = allStores.size()
+          int potentialCandidateCount = allStores.size()
               - (excludedMembers.size() + acceptedMembers.size() + failedMembers.size());
+
           // Determining exhausted members competes with bucket balancing; it's
           // important to re-visit all failed members since "failed" set may
           // contain datastores which at the moment are imbalanced, but yet could
           // be candidates. If the failed members list is empty, its expected
           // that the next iteration clears the (already empty) list.
-          final boolean exhaustedPotentialCandidates =
+          boolean exhaustedPotentialCandidates =
               failedMembers.wasCleared() && potentialCandidateCount <= 0;
-          final boolean redundancySatisfied =
-              acceptedMembers.size() > prRegion.getRedundantCopies();
-          final boolean bucketNotCreated = acceptedMembers.isEmpty();
+          boolean redundancySatisfied =
+              acceptedMembers.size() > partitionedRegion.getRedundantCopies();
+          boolean bucketNotCreated = acceptedMembers.isEmpty();
 
           if (isDebugEnabled) {
             logger.debug(
@@ -665,8 +681,9 @@ public class PRHARedundancyProvider {
             // The rest of the members will be allowed to volunteer for primary.
             endBucketCreation(bucketId, acceptedMembers, bucketPrimary, partitionName);
 
-            final int expectedRemoteHosts = acceptedMembers.size()
-                - (acceptedMembers.contains(prRegion.getMyId()) ? 1 : 0);
+            int expectedRemoteHosts = acceptedMembers.size()
+                - (acceptedMembers.contains(partitionedRegion.getMyId()) ? 1 : 0);
+
             boolean interrupted = Thread.interrupted();
             try {
               BucketMembershipObserverResults results = observer
@@ -678,13 +695,15 @@ public class PRHARedundancyProvider {
               bucketPrimary = results.primary;
             } catch (InterruptedException e) {
               interrupted = true;
-              prRegion.getCancelCriterion().checkCancelInProgress(e);
+              partitionedRegion.getCancelCriterion().checkCancelInProgress(e);
             } finally {
               if (interrupted) {
                 Thread.currentThread().interrupt();
               }
             }
+
             needToElectPrimary = false;
+
             return bucketPrimary;
           }
         }
@@ -708,15 +727,17 @@ public class PRHARedundancyProvider {
         if (observer != null) {
           observer.stopMonitoring();
         }
+
         // Try to make sure everyone that created the bucket can volunteer for primary
         if (needToElectPrimary) {
           try {
-            endBucketCreation(bucketId, prRegion.getRegionAdvisor().getBucketOwners(bucketId),
+            endBucketCreation(bucketId,
+                partitionedRegion.getRegionAdvisor().getBucketOwners(bucketId),
                 bucketPrimary, partitionName);
           } catch (Exception e) {
             // if region is going down, then no warning level logs
             if (e instanceof CancelException
-                || prRegion.getCancelCriterion().isCancelInProgress()) {
+                || partitionedRegion.getCancelCriterion().isCancelInProgress()) {
               logger.debug("Exception trying choose a primary after bucket creation failure", e);
             } else {
               logger.warn("Exception trying choose a primary after bucket creation failure", e);
@@ -747,24 +768,27 @@ public class PRHARedundancyProvider {
     // secondary partition will become primary
     if (partitionName != null) {
       if (isLocalPrimary(partitionName)) {
-        targetPrimary = prRegion.getMyId();
+        targetPrimary = partitionedRegion.getMyId();
       } else {
         targetPrimary =
-            prRegion.getRegionAdvisor().adviseFixedPrimaryPartitionDataStore(bucketId);
+            partitionedRegion.getRegionAdvisor().adviseFixedPrimaryPartitionDataStore(bucketId);
         if (targetPrimary == null) {
           Set<InternalDistributedMember> fpDataStores = getFixedPartitionStores(partitionName);
           targetPrimary = fpDataStores.iterator().next();
         }
       }
     }
+
     if (targetPrimary == null) {
       // we need to select the same primary as chosen earlier (e.g.
       // the parent's in case of colocation) so it is now passed
       targetPrimary =
           getPreferredDataStore(acceptedMembers, Collections.emptySet());
     }
-    boolean isHosting = acceptedMembers.remove(prRegion.getDistributionManager().getId());
-    EndBucketCreationMessage.send(acceptedMembers, targetPrimary, prRegion, bucketId);
+
+    boolean isHosting = acceptedMembers.remove(partitionedRegion.getDistributionManager().getId());
+
+    EndBucketCreationMessage.send(acceptedMembers, targetPrimary, partitionedRegion, bucketId);
 
     if (isHosting) {
       endBucketCreationLocally(bucketId, targetPrimary);
@@ -772,10 +796,12 @@ public class PRHARedundancyProvider {
   }
 
   private boolean isLocalPrimary(String partitionName) {
-    List<FixedPartitionAttributesImpl> FPAs = prRegion.getFixedPartitionAttributesImpl();
-    if (FPAs != null) {
-      for (FixedPartitionAttributesImpl fpa : FPAs) {
-        if (fpa.getPartitionName().equals(partitionName) && fpa.isPrimary()) {
+    List<FixedPartitionAttributesImpl> allFixedPartitionAttributes =
+        partitionedRegion.getFixedPartitionAttributesImpl();
+    if (allFixedPartitionAttributes != null) {
+      for (FixedPartitionAttributesImpl fixedPartitionAttributes : allFixedPartitionAttributes) {
+        if (fixedPartitionAttributes.getPartitionName().equals(partitionName)
+            && fixedPartitionAttributes.isPrimary()) {
           return true;
         }
       }
@@ -786,17 +812,18 @@ public class PRHARedundancyProvider {
   public void endBucketCreationLocally(int bucketId, InternalDistributedMember newPrimary) {
     // Don't elect ourselves as primary or tell others to persist our ID
     // if this member has been destroyed.
-    if (prRegion.getCancelCriterion().isCancelInProgress() || prRegion.isDestroyed()) {
+    if (partitionedRegion.getCancelCriterion().isCancelInProgress()
+        || partitionedRegion.isDestroyed()) {
       return;
     }
 
     if (logger.isDebugEnabled()) {
       logger.debug("endBucketCreationLocally: for region {} bucketId={} new primary: {}",
-          prRegion.getFullPath(), bucketId, newPrimary);
+          partitionedRegion.getFullPath(), bucketId, newPrimary);
     }
-    BucketAdvisor bucketAdvisor = prRegion.getRegionAdvisor().getBucketAdvisor(bucketId);
 
-    final ProxyBucketRegion proxyBucketRegion = bucketAdvisor.getProxyBucketRegion();
+    BucketAdvisor bucketAdvisor = partitionedRegion.getRegionAdvisor().getBucketAdvisor(bucketId);
+    ProxyBucketRegion proxyBucketRegion = bucketAdvisor.getProxyBucketRegion();
     BucketPersistenceAdvisor persistentAdvisor = proxyBucketRegion.getPersistenceAdvisor();
 
     // prevent multiple threads from ending bucket creation at the same time.
@@ -804,8 +831,8 @@ public class PRHARedundancyProvider {
       if (persistentAdvisor != null) {
         BucketRegion realBucket = proxyBucketRegion.getCreatedBucketRegion();
         if (realBucket != null) {
-          PersistentMemberID persistentID = realBucket.getPersistentID();
-          persistentAdvisor.endBucketCreation(persistentID);
+          PersistentMemberID persistentId = realBucket.getPersistentID();
+          persistentAdvisor.endBucketCreation(persistentId);
         }
       }
 
@@ -813,7 +840,7 @@ public class PRHARedundancyProvider {
       // may not have. So now we wait for the chosen member to become primary.
       bucketAdvisor.setPrimaryElector(newPrimary);
 
-      if (prRegion.getGemFireCache().getMyId().equals(newPrimary)) {
+      if (partitionedRegion.getGemFireCache().getMyId().equals(newPrimary)) {
         // If we're the chosen primary, volunteer for primary now
         if (bucketAdvisor.isHosting()) {
           bucketAdvisor.clearPrimaryElector();
@@ -841,7 +868,8 @@ public class PRHARedundancyProvider {
       bucketAdvisor.endBucketCreation();
     }
 
-    List<PartitionedRegion> colocatedWithList = ColocationHelper.getColocatedChildRegions(prRegion);
+    List<PartitionedRegion> colocatedWithList = ColocationHelper.getColocatedChildRegions(
+        partitionedRegion);
     for (PartitionedRegion child : colocatedWithList) {
       if (child.getRegionAdvisor().isBucketLocal(bucketId)) {
         child.getRedundancyProvider().endBucketCreationLocally(bucketId, newPrimary);
@@ -856,10 +884,8 @@ public class PRHARedundancyProvider {
    * @since GemFire 5.9
    */
   private Set<InternalDistributedMember> getBuddyMembersInZone(
-      final InternalDistributedMember acceptedMember,
-      final Collection<InternalDistributedMember> allStores) {
-
-    DistributionManager dm = prRegion.getDistributionManager();
+      InternalDistributedMember acceptedMember, Collection<InternalDistributedMember> allStores) {
+    DistributionManager dm = partitionedRegion.getDistributionManager();
     Set<InternalDistributedMember> buddies = dm.getMembersInSameZone(acceptedMember);
     buddies.retainAll(allStores);
     return buddies;
@@ -876,9 +902,9 @@ public class PRHARedundancyProvider {
     Set<InternalDistributedMember> currentStores = getAllStores(partitionName);
     if (currentStores.isEmpty()) {
       if (shouldLogInsufficientStores()) {
-        insufficientStores(currentStores, Collections.emptyList(), true);
+        insufficientStores(currentStores, emptyList(), true);
       }
-      insufficientStores(currentStores, Collections.emptyList(), false);
+      insufficientStores(currentStores, emptyList(), false);
     }
   }
 
@@ -915,7 +941,7 @@ public class PRHARedundancyProvider {
         return millis;
       }
     }
-    return prRegion.getRetryTimeout();
+    return partitionedRegion.getRetryTimeout();
   }
 
   /**
@@ -926,23 +952,22 @@ public class PRHARedundancyProvider {
    * @param loggedInsufficientStores indicates whether a warning has been logged
    * @return true when a warning has been logged, false if a warning should be logged.
    */
-  private boolean checkSufficientStores(final Set<InternalDistributedMember> allStores,
-      final boolean loggedInsufficientStores) {
+  private boolean checkSufficientStores(Set<InternalDistributedMember> allStores,
+      boolean loggedInsufficientStores) {
     // Report (only once) if insufficient data store have been detected.
     if (!loggedInsufficientStores) {
       if (allStores.isEmpty()) {
-        insufficientStores(allStores, Collections.emptyList(), true);
+        insufficientStores(allStores, emptyList(), true);
         return true;
       }
     } else {
       if (!allStores.isEmpty()) {
         // Excellent, sufficient resources were found!
-        final String logStr = "{} Region name, {}";
-        logger.info(logStr, SUFFICIENT_STORES_MSG, prRegion.getFullPath());
+        logger.info("{} Region name, {}", SUFFICIENT_STORES_MSG, partitionedRegion.getFullPath());
         return false;
       }
       // Already logged warning, there are no datastores
-      insufficientStores(allStores, Collections.emptyList(), false);
+      insufficientStores(allStores, emptyList(), false);
     }
     return loggedInsufficientStores;
   }
@@ -950,18 +975,19 @@ public class PRHARedundancyProvider {
   /**
    * Clean up locally created bucket and tell other VMs to attempt recovering redundancy
    *
-   * @param buck the bucket identifier
+   * @param bucketId the bucket identifier
    */
-  private void cleanUpBucket(int buck) {
-    Set<InternalDistributedMember> dataStores = prRegion.getRegionAdvisor().adviseDataStore();
-    BucketBackupMessage.send(dataStores, prRegion, buck);
+  private void cleanUpBucket(int bucketId) {
+    Set<InternalDistributedMember> dataStores =
+        partitionedRegion.getRegionAdvisor().adviseDataStore();
+    BucketBackupMessage.send(dataStores, partitionedRegion, bucketId);
   }
 
   public void finishIncompleteBucketCreation(int bucketId) {
     String partitionName = null;
-    if (prRegion.isFixedPartitionedRegion()) {
+    if (partitionedRegion.isFixedPartitionedRegion()) {
       FixedPartitionAttributesImpl fpa =
-          PartitionedRegionHelper.getFixedPartitionAttributesForBucket(prRegion, bucketId);
+          PartitionedRegionHelper.getFixedPartitionAttributesForBucket(partitionedRegion, bucketId);
       partitionName = fpa.getPartitionName();
     }
     createBucketAtomically(bucketId, 0, true, partitionName);
@@ -974,41 +1000,44 @@ public class PRHARedundancyProvider {
    * @param isRebalance true if bucket creation is directed by rebalancing
    * @return true if the bucket was sucessfully created
    */
-  public boolean createBackupBucketOnMember(final int bucketId,
-      final InternalDistributedMember targetNMember, final boolean isRebalance,
-      boolean replaceOfflineData, InternalDistributedMember moveSource, boolean forceCreation) {
+  public boolean createBackupBucketOnMember(int bucketId, InternalDistributedMember targetMember,
+      boolean isRebalance, boolean replaceOfflineData, InternalDistributedMember fromMember,
+      boolean forceCreation) {
     if (logger.isDebugEnabled()) {
       logger.debug("createBackupBucketOnMember for bucketId={} member: {}",
-          prRegion.bucketStringForLogs(bucketId), targetNMember);
+          partitionedRegion.bucketStringForLogs(bucketId), targetMember);
     }
 
-    if (!targetNMember.equals(prRegion.getMyId())) {
-      PartitionProfile pp = prRegion.getRegionAdvisor().getPartitionProfile(targetNMember);
-      if (pp == null) {
+    if (!targetMember.equals(partitionedRegion.getMyId())) {
+      PartitionProfile profile =
+          partitionedRegion.getRegionAdvisor().getPartitionProfile(targetMember);
+      if (profile == null) {
         return false;
       }
 
       try {
         ManageBackupBucketMessage.NodeResponse response =
-            ManageBackupBucketMessage.send(targetNMember, prRegion, bucketId, isRebalance,
-                replaceOfflineData, moveSource, forceCreation);
+            ManageBackupBucketMessage.send(targetMember, partitionedRegion, bucketId, isRebalance,
+                replaceOfflineData, fromMember, forceCreation);
 
         if (response.waitForAcceptance()) {
           if (logger.isDebugEnabled()) {
             logger.debug(
                 "createBackupBucketOnMember: Bucket creation succeed for bucketId={} on member = {}",
-                prRegion.bucketStringForLogs(bucketId), targetNMember);
+                partitionedRegion.bucketStringForLogs(bucketId), targetMember);
           }
 
           return true;
         }
+
         if (logger.isDebugEnabled()) {
           logger.debug(
               "createBackupBucketOnMember: Bucket creation failed for bucketId={} on member = {}",
-              prRegion.bucketStringForLogs(bucketId), targetNMember);
+              partitionedRegion.bucketStringForLogs(bucketId), targetMember);
         }
 
         return false;
+
       } catch (VirtualMachineError err) {
         SystemFailure.initiateFailure(err);
         // If this ever returns, rethrow the error. We're poisoned
@@ -1027,19 +1056,21 @@ public class PRHARedundancyProvider {
             || e.getCause() != null && e.getCause() instanceof CancelException) {
           // no need to log exceptions caused by cache closure
         } else {
-          logger.warn("Exception creating partition on {}", targetNMember, e);
+          logger.warn("Exception creating partition on {}", targetMember, e);
         }
         return false;
       }
     }
-    final PartitionedRegionDataStore prDS = prRegion.getDataStore();
-    boolean bucketManaged = prDS != null && prDS.grabBucket(bucketId, moveSource, forceCreation,
-        replaceOfflineData, isRebalance, null, false).equals(CreateBucketResult.CREATED);
+
+    PartitionedRegionDataStore dataStore = partitionedRegion.getDataStore();
+    boolean bucketManaged =
+        dataStore != null && dataStore.grabBucket(bucketId, fromMember, forceCreation,
+            replaceOfflineData, isRebalance, null, false).equals(CreateBucketResult.CREATED);
     if (!bucketManaged) {
       if (logger.isDebugEnabled()) {
         logger.debug(
-            "createBackupBucketOnMember: Local data store refused to accommodate the data for bucketId={} prDS={}",
-            prRegion.bucketStringForLogs(bucketId), prDS);
+            "createBackupBucketOnMember: Local data store refused to accommodate the data for bucketId={} dataStore={}",
+            partitionedRegion.bucketStringForLogs(bucketId), dataStore);
       }
     }
     return bucketManaged;
@@ -1047,9 +1078,9 @@ public class PRHARedundancyProvider {
 
   private boolean getForceLocalPrimaries() {
     boolean result = false;
-    Boolean v = forceLocalPrimaries.get();
-    if (v != null) {
-      result = v;
+    Boolean forceLocalPrimariesValue = forceLocalPrimaries.get();
+    if (forceLocalPrimariesValue != null) {
+      result = forceLocalPrimariesValue;
     }
     return result;
   }
@@ -1061,42 +1092,44 @@ public class PRHARedundancyProvider {
    *        ignoring it's maximums
    * @return a response object
    */
-  private ManageBucketRsp createBucketOnMember(final int bucketId,
-      final InternalDistributedMember targetNMember, final int newBucketSize,
-      boolean forceCreation) {
+  private ManageBucketRsp createBucketOnMember(int bucketId, InternalDistributedMember targetMember,
+      int newBucketSize, boolean forceCreation) {
     if (logger.isDebugEnabled()) {
       logger.debug("createBucketOnMember for bucketId={} member: {}{}",
-          prRegion.bucketStringForLogs(bucketId), targetNMember,
+          partitionedRegion.bucketStringForLogs(bucketId), targetMember,
           forceCreation ? " forced" : "");
     }
 
-    if (!targetNMember.equals(prRegion.getMyId())) {
-      PartitionProfile pp = prRegion.getRegionAdvisor().getPartitionProfile(targetNMember);
-      if (pp == null) {
+    if (!targetMember.equals(partitionedRegion.getMyId())) {
+      PartitionProfile profile =
+          partitionedRegion.getRegionAdvisor().getPartitionProfile(targetMember);
+      if (profile == null) {
         return ManageBucketRsp.NO;
       }
 
       try {
-        NodeResponse response = ManageBucketMessage.send(targetNMember, prRegion, bucketId,
+        NodeResponse response = ManageBucketMessage.send(targetMember, partitionedRegion, bucketId,
             newBucketSize, forceCreation);
 
         if (response.waitForAcceptance()) {
           if (logger.isDebugEnabled()) {
             logger.debug(
                 "createBucketOnMember: Bucket creation succeed for bucketId={} on member = {}",
-                prRegion.bucketStringForLogs(bucketId), targetNMember);
+                partitionedRegion.bucketStringForLogs(bucketId), targetMember);
           }
 
           return ManageBucketRsp.YES;
         }
+
         if (logger.isDebugEnabled()) {
           logger.debug(
               "createBucketOnMember: Bucket creation failed for bucketId={} on member = {}",
-              prRegion.bucketStringForLogs(bucketId), targetNMember);
+              partitionedRegion.bucketStringForLogs(bucketId), targetMember);
         }
 
         return response.rejectedDueToInitialization() ? ManageBucketRsp.NO_INITIALIZING
             : ManageBucketRsp.NO;
+
       } catch (PartitionOfflineException e) {
         throw e;
       } catch (VirtualMachineError err) {
@@ -1119,21 +1152,20 @@ public class PRHARedundancyProvider {
         if (e instanceof ForceReattemptException) {
           // no log needed
         } else {
-          logger.warn("Exception creating partition on " +
-              targetNMember,
-              e);
+          logger.warn("Exception creating partition on {}", targetMember, e);
         }
         return ManageBucketRsp.NO;
       }
     }
-    final PartitionedRegionDataStore prDS = prRegion.getDataStore();
-    boolean bucketManaged = prDS != null && prDS.handleManageBucketRequest(bucketId,
-        newBucketSize, prRegion.getMyId(), forceCreation);
+
+    PartitionedRegionDataStore dataStore = partitionedRegion.getDataStore();
+    boolean bucketManaged = dataStore != null && dataStore.handleManageBucketRequest(bucketId,
+        newBucketSize, partitionedRegion.getMyId(), forceCreation);
     if (!bucketManaged) {
       if (logger.isDebugEnabled()) {
         logger.debug(
             "createBucketOnMember: Local data store not able to accommodate the data for bucketId={}",
-            prRegion.bucketStringForLogs(bucketId));
+            partitionedRegion.bucketStringForLogs(bucketId));
       }
     }
     return ManageBucketRsp.valueOf(bucketManaged);
@@ -1151,10 +1183,11 @@ public class PRHARedundancyProvider {
       Collection<InternalDistributedMember> candidates,
       Collection<InternalDistributedMember> alreadyUsed, int bucketId, String prName) {
     Assert.assertTrue(prName != null);
-    PartitionedRegion colocatedRegion = ColocationHelper.getColocatedRegion(prRegion);
-    Region<?, ?> prRoot = PartitionedRegionHelper.getPRRoot(prRegion.getCache());
+    PartitionedRegion colocatedRegion = ColocationHelper.getColocatedRegion(partitionedRegion);
+    Region<?, ?> prRoot = PartitionedRegionHelper.getPRRoot(partitionedRegion.getCache());
     PartitionRegionConfig config =
-        (PartitionRegionConfig) prRoot.get(prRegion.getRegionIdentifier());
+        (PartitionRegionConfig) prRoot.get(partitionedRegion.getRegionIdentifier());
+
     if (!config.isColocationComplete()) {
       throw new IllegalStateException(
           "Cannot create buckets, as colocated regions are not configured to be at the same nodes.");
@@ -1168,12 +1201,14 @@ public class PRHARedundancyProvider {
       }
       return primary;
     }
+
     Set<InternalDistributedMember> bucketOwnersSet = advisor.getBucketOwners(bucketId);
     bucketOwnersSet.retainAll(candidates);
     Collection<InternalDistributedMember> members = new ArrayList<>(bucketOwnersSet);
     if (members.isEmpty()) {
       return null;
     }
+
     return getPreferredDataStore(members, alreadyUsed);
   }
 
@@ -1183,20 +1218,20 @@ public class PRHARedundancyProvider {
    * Under concurrent access, the data that this method uses, may be somewhat volatile, note that
    * createBucketAtomically synchronizes to enhance the consistency of the data used in this method.
    *
-   * @param candidates ArrayList of InternalDistributedMember, potential datastores
+   * @param candidates collection of InternalDistributedMember, potential datastores
    * @param alreadyUsed data stores already in use
    * @return a member with the fewest buckets or null if no datastores
    */
   private InternalDistributedMember getPreferredDataStore(
       Collection<InternalDistributedMember> candidates,
-      final Collection<InternalDistributedMember> alreadyUsed) {
+      Collection<InternalDistributedMember> alreadyUsed) {
     // has a primary already been chosen?
-    final boolean forPrimary = alreadyUsed.isEmpty();
+    boolean forPrimary = alreadyUsed.isEmpty();
 
     if (forPrimary && getForceLocalPrimaries()) {
-      PartitionedRegionDataStore myDS = prRegion.getDataStore();
-      if (myDS != null) {
-        return prRegion.getMyId();
+      PartitionedRegionDataStore localDataStore = partitionedRegion.getDataStore();
+      if (localDataStore != null) {
+        return partitionedRegion.getMyId();
       }
     }
 
@@ -1206,26 +1241,28 @@ public class PRHARedundancyProvider {
     Assert.assertTrue(candidates.size() > 1);
 
     // Convert peers to DataStoreBuckets
-    ArrayList<DataStoreBuckets> stores = prRegion.getRegionAdvisor()
+    List<DataStoreBuckets> stores = partitionedRegion.getRegionAdvisor()
         .adviseFilteredDataStores(new HashSet<>(candidates));
 
-    final DistributionManager dm = prRegion.getDistributionManager();
+    DistributionManager dm = partitionedRegion.getDistributionManager();
+
     // Add local member as a candidate, if appropriate
-    InternalDistributedMember moi = dm.getId();
-    PartitionedRegionDataStore myDS = prRegion.getDataStore();
-    if (myDS != null && candidates.contains(moi)) {
-      int bucketCount = myDS.getBucketsManaged();
-      int priCount = myDS.getNumberOfPrimaryBucketsManaged();
-      int localMaxMemory = prRegion.getLocalMaxMemory();
-      stores.add(new DataStoreBuckets(moi, bucketCount, priCount, localMaxMemory));
+    InternalDistributedMember localMember = dm.getId();
+    PartitionedRegionDataStore localDataStore = partitionedRegion.getDataStore();
+    if (localDataStore != null && candidates.contains(localMember)) {
+      int bucketCount = localDataStore.getBucketsManaged();
+      int priCount = localDataStore.getNumberOfPrimaryBucketsManaged();
+      int localMaxMemory = partitionedRegion.getLocalMaxMemory();
+      stores.add(new DataStoreBuckets(localMember, bucketCount, priCount, localMaxMemory));
     }
+
     if (stores.isEmpty()) {
       return null;
     }
 
     // ---------------------------------------------
     // Calculate all hosts who already have this bucket
-    final Set<InternalDistributedMember> existingHosts = new HashSet<>();
+    Collection<InternalDistributedMember> existingHosts = new HashSet<>();
     for (InternalDistributedMember mem : alreadyUsed) {
       existingHosts.addAll(dm.getMembersInSameZone(mem));
     }
@@ -1244,16 +1281,17 @@ public class PRHARedundancyProvider {
       }
 
       // Look for least loaded
-      float metric1;
-      float metric2;
+      float load1;
+      float load2;
       if (forPrimary) {
-        metric1 = d1.numPrimaries() / (float) d1.localMaxMemoryMB();
-        metric2 = d2.numPrimaries() / (float) d2.localMaxMemoryMB();
+        load1 = d1.numPrimaries() / (float) d1.localMaxMemoryMB();
+        load2 = d2.numPrimaries() / (float) d2.localMaxMemoryMB();
       } else {
-        metric1 = d1.numBuckets() / (float) d1.localMaxMemoryMB();
-        metric2 = d2.numBuckets() / (float) d2.localMaxMemoryMB();
+        load1 = d1.numBuckets() / (float) d1.localMaxMemoryMB();
+        load2 = d2.numBuckets() / (float) d2.localMaxMemoryMB();
       }
-      int result = Float.compare(metric1, metric2);
+
+      int result = Float.compare(load1, load2);
       if (result == 0) {
         // if they have the same load, choose the member with the higher localMaxMemory
         result = d2.localMaxMemoryMB() - d1.localMaxMemoryMB();
@@ -1265,6 +1303,7 @@ public class PRHARedundancyProvider {
     // First step is to sort datastores first by those whose hosts don't
     // hold this bucket, and then secondarily by loading.
     stores.sort(comparator);
+
     if (logger.isDebugEnabled()) {
       logger.debug(fancyFormatBucketAllocation("Sorted ", stores, existingHosts));
     }
@@ -1275,7 +1314,7 @@ public class PRHARedundancyProvider {
     List<DataStoreBuckets> bestStores = new ArrayList<>();
     bestStores.add(bestDataStore);
 
-    final boolean allStoresInUse = alreadyUsed.contains(bestDataStore.memberId());
+    boolean allStoresInUse = alreadyUsed.contains(bestDataStore.memberId());
 
     // ---------------------------------------------
     // Collect all of the other hosts in this sorted list that are as good as the very first one.
@@ -1291,6 +1330,7 @@ public class PRHARedundancyProvider {
       }
       bestStores.add(aDataStore);
     }
+
     if (logger.isDebugEnabled()) {
       logger.debug(fancyFormatBucketAllocation("Best Stores ", bestStores, existingHosts));
     }
@@ -1303,8 +1343,8 @@ public class PRHARedundancyProvider {
       // Pick one (at random)
       chosen = PartitionedRegion.RANDOM.nextInt(bestStores.size());
     }
-    DataStoreBuckets aDataStore = bestStores.get(chosen);
-    return aDataStore.memberId();
+
+    return bestStores.get(chosen).memberId();
   }
 
   /**
@@ -1312,7 +1352,7 @@ public class PRHARedundancyProvider {
    * redundancy of existing buckets
    */
   void startRedundancyRecovery() {
-    prRegion.getRegionAdvisor().addMembershipListener(new PRMembershipListener());
+    partitionedRegion.getRegionAdvisor().addMembershipListener(new PRMembershipListener());
     scheduleRedundancyRecovery(null);
   }
 
@@ -1339,19 +1379,20 @@ public class PRHARedundancyProvider {
     if (prefix != null) {
       logStr.append(prefix);
     }
-    logStr.append("Bucket Allocation for prId=").append(prRegion.getPRId()).append(":").append(
-        lineSeparator());
+    logStr.append("Bucket Allocation for prId=").append(partitionedRegion.getPRId()).append(":");
+    logStr.append(lineSeparator());
+
     for (Object dataStore : dataStores) {
-      DataStoreBuckets dsb = (DataStoreBuckets) dataStore;
-      logStr.append(dsb.memberId()).append(": ");
-      if (existingStores.contains(dsb.memberId())) {
+      DataStoreBuckets buckets = (DataStoreBuckets) dataStore;
+      logStr.append(buckets.memberId()).append(": ");
+      if (existingStores.contains(buckets.memberId())) {
         logStr.append("+");
       } else {
         logStr.append("-");
       }
-      logStr.append(dsb.numPrimaries());
+      logStr.append(buckets.numPrimaries());
       logStr.append("/");
-      logStr.append(dsb.numBuckets() - dsb.numPrimaries());
+      logStr.append(buckets.numBuckets() - buckets.numPrimaries());
 
       logStr.append(lineSeparator());
     }
@@ -1372,15 +1413,15 @@ public class PRHARedundancyProvider {
     }
 
     // Revisit region advisor, get current bucket stores.
-    final Set<InternalDistributedMember> availableMembers = getAllStores(partitionName);
+    Set<InternalDistributedMember> availableMembers = getAllStores(partitionName);
 
-    for (Iterator<InternalDistributedMember> itr = members.iterator(); itr.hasNext();) {
-      InternalDistributedMember node = itr.next();
+    for (Iterator<InternalDistributedMember> iterator = members.iterator(); iterator.hasNext();) {
+      InternalDistributedMember node = iterator.next();
       if (!availableMembers.contains(node)) {
         if (logger.isDebugEnabled()) {
           logger.debug("verifyBucketNodes: removing member {}", node);
         }
-        itr.remove();
+        iterator.remove();
         Assert.assertTrue(!members.contains(node), "return value does not contain " + node);
       }
     }
@@ -1391,32 +1432,33 @@ public class PRHARedundancyProvider {
    */
   private void scheduleRedundancyRecovery(Object failedMemberId) {
     // note: isStartup is true even when not starting
-    final boolean isStartup = failedMemberId == null;
-    final long delay;
-    final boolean movePrimaries;
+    boolean isStartup = failedMemberId == null;
+    long delay;
+    boolean movePrimaries;
 
     if (isStartup) {
-      delay = prRegion.getPartitionAttributes().getStartupRecoveryDelay();
+      delay = partitionedRegion.getPartitionAttributes().getStartupRecoveryDelay();
       movePrimaries = !Boolean
           .getBoolean(GEMFIRE_PREFIX + "DISABLE_MOVE_PRIMARIES_ON_STARTUP");
     } else {
-      delay = prRegion.getPartitionAttributes().getRecoveryDelay();
+      delay = partitionedRegion.getPartitionAttributes().getRecoveryDelay();
       movePrimaries = false;
     }
+
     final boolean requiresRedundancyRecovery = delay >= 0;
 
     if (!requiresRedundancyRecovery) {
       return;
     }
-    if (!prRegion.isDataStore()) {
+    if (!partitionedRegion.isDataStore()) {
       return;
     }
+
     Runnable task = new RecoveryRunnable(this) {
       @Override
       public void run2() {
         try {
-          final boolean isFixedPartitionedRegion =
-              prRegion.isFixedPartitionedRegion();
+          boolean isFixedPartitionedRegion = partitionedRegion.isFixedPartitionedRegion();
 
           // always replace offline data for fixed partitioned regions -
           // this guarantees we create the buckets we are supposed to create on this node.
@@ -1429,10 +1471,10 @@ public class PRHARedundancyProvider {
             director = new CompositeDirector(true, true, false, movePrimaries);
           }
 
-          final PartitionedRegionRebalanceOp rebalance = new PartitionedRegionRebalanceOp(
-              prRegion, false, director, replaceOfflineData, false);
+          PartitionedRegionRebalanceOp rebalance = new PartitionedRegionRebalanceOp(
+              partitionedRegion, false, director, replaceOfflineData, false);
 
-          long start = prRegion.getPrStats().startRecovery();
+          long start = partitionedRegion.getPrStats().startRecovery();
 
           if (isFixedPartitionedRegion) {
             rebalance.executeFPA();
@@ -1440,7 +1482,7 @@ public class PRHARedundancyProvider {
             rebalance.execute();
           }
 
-          prRegion.getPrStats().endRecovery(start);
+          partitionedRegion.getPrStats().endRecovery(start);
           recoveryFuture = null;
         } catch (CancelException e) {
           logger.debug("Cache closed while recovery in progress");
@@ -1457,10 +1499,10 @@ public class PRHARedundancyProvider {
         try {
           if (logger.isDebugEnabled()) {
             if (isStartup) {
-              logger.debug(prRegion + " scheduling redundancy recovery in {} ms", delay);
+              logger.debug("{} scheduling redundancy recovery in {} ms", partitionedRegion, delay);
             } else {
               logger.debug(
-                  "prRegion scheduling redundancy recovery after departure/crash/error in {} in {} ms",
+                  "partitionedRegion scheduling redundancy recovery after departure/crash/error in {} in {} ms",
                   failedMemberId, delay);
             }
           }
@@ -1473,11 +1515,11 @@ public class PRHARedundancyProvider {
   }
 
   public boolean isRedundancyImpaired() {
-    int numBuckets = prRegion.getPartitionAttributes().getTotalNumBuckets();
-    int targetRedundancy = prRegion.getPartitionAttributes().getRedundantCopies();
+    int numBuckets = partitionedRegion.getPartitionAttributes().getTotalNumBuckets();
+    int targetRedundancy = partitionedRegion.getPartitionAttributes().getRedundantCopies();
 
     for (int i = 0; i < numBuckets; i++) {
-      int redundancy = prRegion.getRegionAdvisor().getBucketRedundancy(i);
+      int redundancy = partitionedRegion.getRegionAdvisor().getBucketRedundancy(i);
       if (redundancy < targetRedundancy && redundancy != -1 || redundancy > targetRedundancy) {
         return true;
       }
@@ -1492,7 +1534,7 @@ public class PRHARedundancyProvider {
      * the GatewaySender buckets for ParallelGatewaySender irrespective of whether colocation is
      * complete or not.
      */
-    PartitionedRegion leaderRegion = ColocationHelper.getLeaderRegion(prRegion);
+    PartitionedRegion leaderRegion = ColocationHelper.getLeaderRegion(partitionedRegion);
 
 
     // Check if the leader region or some child shadow PR region is persistent
@@ -1504,26 +1546,24 @@ public class PRHARedundancyProvider {
       return true;
     }
 
-    if (!ColocationHelper.checkMembersColocation(leaderRegion,
-        leaderRegion.getDistributionManager().getDistributionManagerId())) {
+    if (!checkMembersColocation(leaderRegion, leaderRegion.getDistributionManager().getId())) {
       if (logger.isDebugEnabled()) {
         logger.debug("Skipping persistent recovery of {} because colocation is not complete for {}",
-            prRegion, leaderRegion);
+            partitionedRegion, leaderRegion);
       }
       return false;
     }
 
-    final ProxyBucketRegion[] proxyBucketArray =
+    ProxyBucketRegion[] proxyBucketArray =
         persistentLeader.getRegionAdvisor().getProxyBucketArray();
-
     if (proxyBucketArray.length == 0) {
       throw new IllegalStateException("Unexpected empty proxy bucket array");
     }
-
     for (ProxyBucketRegion proxyBucket : proxyBucketArray) {
       proxyBucket.initializePersistenceAdvisor();
     }
-    Set<InternalDistributedMember> peers = prRegion.getRegionAdvisor().adviseGeneric();
+
+    Set<InternalDistributedMember> peers = partitionedRegion.getRegionAdvisor().adviseGeneric();
 
     // We need to make sure here that we don't run into this race condition:
     // 1) We get a membership view from member A
@@ -1532,8 +1572,8 @@ public class PRHARedundancyProvider {
     // That will add B back into the set.
     // This state flush will make sure that any membership changes
     // That are in progress on the peers are finished.
-    MembershipFlushRequest.send(peers, prRegion.getDistributionManager(),
-        prRegion.getFullPath());
+    MembershipFlushRequest.send(peers, partitionedRegion.getDistributionManager(),
+        partitionedRegion.getFullPath());
 
     List<ProxyBucketRegion> bucketsNotHostedLocally = new ArrayList<>(proxyBucketArray.length);
     List<ProxyBucketRegion> bucketsHostedLocally = new ArrayList<>(proxyBucketArray.length);
@@ -1556,9 +1596,9 @@ public class PRHARedundancyProvider {
      * return and pass the GII task to a separate thread pool.
      *
      */
-    for (final ProxyBucketRegion proxyBucket : proxyBucketArray) {
+    for (ProxyBucketRegion proxyBucket : proxyBucketArray) {
       if (proxyBucket.getPersistenceAdvisor().wasHosting()) {
-        final RecoveryRunnable recoveryRunnable = new RecoveryRunnable(this) {
+        RecoveryRunnable recoveryRunnable = new RecoveryRunnable(this) {
 
           @Override
           public void run() {
@@ -1577,6 +1617,7 @@ public class PRHARedundancyProvider {
             proxyBucket.recoverFromDiskRecursively();
           }
         };
+
         Thread recoveryThread =
             new LoggingThread("Recovery thread for bucket " + proxyBucket.getName(),
                 false, recoveryRunnable);
@@ -1588,12 +1629,12 @@ public class PRHARedundancyProvider {
     }
 
     try {
-      // try to recover the local buckets before the proxy buckets. This will allow us to detect any
-      // ConflictingDataException before the proxy buckets update their membership view.
-      for (final ProxyBucketRegion proxyBucket : bucketsHostedLocally) {
+      // try to recover the local buckets before the proxy buckets. This will allow us to detect
+      // any ConflictingDataException before the proxy buckets update their membership view.
+      for (ProxyBucketRegion proxyBucket : bucketsHostedLocally) {
         proxyBucket.waitForPrimaryPersistentRecovery();
       }
-      for (final ProxyBucketRegion proxyBucket : bucketsNotHostedLocally) {
+      for (ProxyBucketRegion proxyBucket : bucketsNotHostedLocally) {
         proxyBucket.recoverFromDiskRecursively();
       }
     } finally {
@@ -1605,11 +1646,13 @@ public class PRHARedundancyProvider {
     return true;
   }
 
-  private void createPersistentBucketRecoverer(int proxyBuckets) {
-    persistentBucketRecoverer = new PersistentBucketRecoverer(this, proxyBuckets);
+  @VisibleForTesting
+  void createPersistentBucketRecoverer(int proxyBuckets) {
+    persistentBucketRecoverer = persistentBucketRecovererFunction.apply(this, proxyBuckets);
     persistentBucketRecoverer.startLoggingThread();
   }
 
+  @VisibleForTesting
   PersistentBucketRecoverer getPersistentBucketRecoverer() {
     return persistentBucketRecoverer;
   }
@@ -1623,15 +1666,15 @@ public class PRHARedundancyProvider {
    *         persistent.
    */
   private PartitionedRegion getPersistentLeader() {
-    PartitionedRegion leader = ColocationHelper.getLeaderRegion(prRegion);
+    PartitionedRegion leader = ColocationHelper.getLeaderRegion(partitionedRegion);
     return findPersistentRegionRecursively(leader);
   }
 
-  private PartitionedRegion findPersistentRegionRecursively(PartitionedRegion pr) {
-    if (pr.getDataPolicy().withPersistence()) {
-      return pr;
+  private PartitionedRegion findPersistentRegionRecursively(PartitionedRegion partitionedRegion) {
+    if (partitionedRegion.getDataPolicy().withPersistence()) {
+      return partitionedRegion;
     }
-    for (PartitionedRegion child : ColocationHelper.getColocatedChildRegions(pr)) {
+    for (PartitionedRegion child : ColocationHelper.getColocatedChildRegions(partitionedRegion)) {
       PartitionedRegion leader = findPersistentRegionRecursively(child);
       if (leader != null) {
         return leader;
@@ -1641,11 +1684,11 @@ public class PRHARedundancyProvider {
   }
 
   void scheduleCreateMissingBuckets() {
-    if (prRegion.getColocatedWith() != null
-        && ColocationHelper.isColocationComplete(prRegion)) {
+    if (partitionedRegion.getColocatedWith() != null
+        && ColocationHelper.isColocationComplete(partitionedRegion)) {
       Runnable task = new CreateMissingBucketsTask(this);
       final InternalResourceManager resourceManager =
-          prRegion.getGemFireCache().getInternalResourceManager();
+          partitionedRegion.getGemFireCache().getInternalResourceManager();
       resourceManager.getExecutor().execute(task);
     }
   }
@@ -1668,9 +1711,8 @@ public class PRHARedundancyProvider {
    * @param loadProbe the LoadProbe to use
    * @return PartitionRegionInfo for the partitioned region
    */
-  public InternalPRInfo buildPartitionedRegionInfo(final boolean internal,
-      final LoadProbe loadProbe) {
-    final PartitionedRegion pr = prRegion;
+  public InternalPRInfo buildPartitionedRegionInfo(boolean internal, LoadProbe loadProbe) {
+    PartitionedRegion pr = partitionedRegion;
 
     if (pr == null) {
       return null;
@@ -1683,7 +1725,7 @@ public class PRHARedundancyProvider {
     int configuredRedundantCopies = pr.getRedundantCopies();
     int actualRedundantCopies = pr.getRedundancyTracker().getActualRedundancy();
 
-    final PartitionedRegionDataStore ds = pr.getDataStore();
+    PartitionedRegionDataStore dataStore = pr.getDataStore();
 
     Set<InternalDistributedMember> datastores = pr.getRegionAdvisor().adviseDataStore();
 
@@ -1691,7 +1733,7 @@ public class PRHARedundancyProvider {
 
     OfflineMemberDetails offlineMembers = null;
     boolean fetchOfflineMembers = false;
-    if (ds != null) {
+    if (dataStore != null) {
       memberDetails.add(buildPartitionMemberDetails(internal, loadProbe));
       offlineMembers = fetchOfflineMembers();
     } else {
@@ -1719,13 +1761,13 @@ public class PRHARedundancyProvider {
    * Retrieve the set of members which are currently offline for all buckets.
    */
   public OfflineMemberDetailsImpl fetchOfflineMembers() {
-    ProxyBucketRegion[] proxyBuckets = prRegion.getRegionAdvisor().getProxyBucketArray();
+    ProxyBucketRegion[] proxyBuckets = partitionedRegion.getRegionAdvisor().getProxyBucketArray();
     Set<PersistentMemberID>[] offlineMembers = new Set[proxyBuckets.length];
     for (int i = 0; i < proxyBuckets.length; i++) {
-      ProxyBucketRegion proxy = proxyBuckets[i];
-      if (prRegion.getDataPolicy().withPersistence()) {
+      ProxyBucketRegion proxyBucket = proxyBuckets[i];
+      if (partitionedRegion.getDataPolicy().withPersistence()) {
         Set<PersistentMemberID> persistedMembers =
-            proxy.getPersistenceAdvisor().getMissingMembers();
+            proxyBucket.getPersistenceAdvisor().getMissingMembers();
         if (persistedMembers == null) {
           persistedMembers = Collections.emptySet();
         }
@@ -1744,12 +1786,12 @@ public class PRHARedundancyProvider {
    * @param loadProbe the LoadProbe to use
    * @return PartitionMemberDetails for the local member
    */
-  public InternalPartitionDetails buildPartitionMemberDetails(final boolean internal,
-      final LoadProbe loadProbe) {
-    final PartitionedRegion pr = prRegion;
+  public InternalPartitionDetails buildPartitionMemberDetails(boolean internal,
+      LoadProbe loadProbe) {
+    final PartitionedRegion pr = partitionedRegion;
 
-    PartitionedRegionDataStore ds = pr.getDataStore();
-    if (ds == null) {
+    PartitionedRegionDataStore dataStore = pr.getDataStore();
+    if (dataStore == null) {
       return null;
     }
 
@@ -1758,11 +1800,13 @@ public class PRHARedundancyProvider {
 
     int configuredBucketCount = pr.getTotalNumberOfBuckets();
     long[] bucketSizes = new long[configuredBucketCount];
+
     // key: bid, value: size
-    Map<Integer, Integer> bucketSizeMap = ds.getSizeLocally();
+    Map<Integer, Integer> bucketSizeMap = dataStore.getSizeLocally();
+
     for (Map.Entry<Integer, Integer> me : bucketSizeMap.entrySet()) {
       int bid = me.getKey();
-      long bucketSize = ds.getBucketSize(bid);
+      long bucketSize = dataStore.getBucketSize(bid);
       bucketSizes[bid] = bucketSize;
       size += bucketSize;
     }
@@ -1774,11 +1818,12 @@ public class PRHARedundancyProvider {
       PRLoad prLoad = loadProbe.getLoad(pr);
       localDetails =
           new PartitionMemberInfoImpl(localMember, pr.getLocalMaxMemory() * 1024L * 1024L, size,
-              ds.getBucketsManaged(), ds.getNumberOfPrimaryBucketsManaged(), prLoad, bucketSizes);
+              dataStore.getBucketsManaged(), dataStore.getNumberOfPrimaryBucketsManaged(), prLoad,
+              bucketSizes);
     } else {
       localDetails =
           new PartitionMemberInfoImpl(localMember, pr.getLocalMaxMemory() * 1024L * 1024L, size,
-              ds.getBucketsManaged(), ds.getNumberOfPrimaryBucketsManaged());
+              dataStore.getBucketsManaged(), dataStore.getNumberOfPrimaryBucketsManaged());
     }
     return localDetails;
   }
@@ -1794,7 +1839,7 @@ public class PRHARedundancyProvider {
     }
 
     List<PartitionedRegion> colocatedRegions =
-        ColocationHelper.getColocatedChildRegions(prRegion);
+        ColocationHelper.getColocatedChildRegions(partitionedRegion);
     for (PartitionedRegion child : colocatedRegions) {
       child.getRedundancyProvider().waitForPersistentBucketRecoveryOrClose();
     }
@@ -1811,7 +1856,7 @@ public class PRHARedundancyProvider {
   }
 
   boolean isPersistentRecoveryComplete() {
-    if (!ColocationHelper.checkMembersColocation(prRegion, prRegion.getMyId())) {
+    if (!checkMembersColocation(partitionedRegion, partitionedRegion.getMyId())) {
       return false;
     }
 
@@ -1821,7 +1866,7 @@ public class PRHARedundancyProvider {
     }
 
     Map<String, PartitionedRegion> colocatedRegions =
-        ColocationHelper.getAllColocationRegions(prRegion);
+        ColocationHelper.getAllColocationRegions(partitionedRegion);
 
     for (PartitionedRegion region : colocatedRegions.values()) {
       PRHARedundancyProvider redundancyProvider = region.getRedundancyProvider();
@@ -1834,7 +1879,7 @@ public class PRHARedundancyProvider {
   }
 
   private ThreadsMonitoring getThreadMonitorObj() {
-    DistributionManager distributionManager = prRegion.getDistributionManager();
+    DistributionManager distributionManager = partitionedRegion.getDistributionManager();
     if (distributionManager != null) {
       return distributionManager.getThreadMonitoring();
     }
@@ -1845,12 +1890,13 @@ public class PRHARedundancyProvider {
    * Monitors distributed membership for a given bucket
    */
   private class BucketMembershipObserver implements MembershipListener {
+
     private final Bucket bucketToMonitor;
     private final AtomicInteger arrivals = new AtomicInteger(0);
     private final AtomicBoolean departures = new AtomicBoolean(false);
 
-    private BucketMembershipObserver(Bucket b) {
-      bucketToMonitor = b;
+    private BucketMembershipObserver(Bucket bucketToMonitor) {
+      this.bucketToMonitor = bucketToMonitor;
     }
 
     private BucketMembershipObserver beginMonitoring() {
@@ -1897,8 +1943,8 @@ public class PRHARedundancyProvider {
      * @param expectedOwners the list of owners used when a departure is detected
      * @return if no problematic departures are detected, the primary
      */
-    private BucketMembershipObserverResults waitForOwnersGetPrimary(final int expectedCount,
-        final Collection<InternalDistributedMember> expectedOwners, String partitionName)
+    private BucketMembershipObserverResults waitForOwnersGetPrimary(int expectedCount,
+        Collection<InternalDistributedMember> expectedOwners, String partitionName)
         throws InterruptedException {
       boolean problematicDeparture = false;
       synchronized (this) {
@@ -1912,9 +1958,11 @@ public class PRHARedundancyProvider {
             if (expectedOwners.isEmpty()) {
               problematicDeparture = true; // need to pick new victims
             }
+
             // need to pick new victims
             arrivals.set(expectedOwners.size());
             departures.set(false);
+
             if (problematicDeparture) {
               if (logger.isDebugEnabled()) {
                 logger.debug("Bucket observer found departed members - retrying");
@@ -1929,26 +1977,29 @@ public class PRHARedundancyProvider {
             // success!
             break;
           }
+
           if (logger.isDebugEnabled()) {
             logger.debug("Waiting for bucket {} to finish being created",
-                prRegion.bucketStringForLogs(bucketToMonitor.getId()));
+                partitionedRegion.bucketStringForLogs(bucketToMonitor.getId()));
           }
 
-          prRegion.checkReadiness();
-          final int creationWaitMillis = 5 * 1000;
+          partitionedRegion.checkReadiness();
+
+          int creationWaitMillis = 5 * 1000;
           wait(creationWaitMillis);
 
           if (oldArrivals == arrivals.get() && oldDepartures == departures.get()) {
             logger.warn(
                 "Time out waiting {} ms for creation of bucket for partitioned region {}. Members requested to create the bucket are: {}",
-                new Object[] {creationWaitMillis, prRegion.getFullPath(),
-                    expectedOwners});
+                creationWaitMillis, partitionedRegion.getFullPath(), expectedOwners);
           }
         }
       }
+
       if (problematicDeparture) {
         return new BucketMembershipObserverResults(true, null);
       }
+
       InternalDistributedMember primary = bucketToMonitor.getBucketAdvisor().getPrimary();
       if (primary == null) {
         /*
@@ -1965,11 +2016,12 @@ public class PRHARedundancyProvider {
    * This class extends MembershipListener to perform cleanup when a node leaves DistributedSystem.
    */
   private class PRMembershipListener implements MembershipListener {
+
     @Override
     public void memberDeparted(DistributionManager distributionManager,
-        final InternalDistributedMember id, final boolean crashed) {
+        InternalDistributedMember id, boolean crashed) {
       try {
-        DistributedMember member = prRegion.getSystem().getDistributedMember();
+        DistributedMember member = partitionedRegion.getSystem().getDistributedMember();
         if (logger.isDebugEnabled()) {
           logger.debug(
               "MembershipListener invoked on DistributedMember = {} for failed memberId = {}",
@@ -1977,27 +2029,29 @@ public class PRHARedundancyProvider {
               id);
         }
 
-        if (!prRegion.isCacheClosing() && !prRegion.isDestroyed() && !member.equals(id)) {
+        if (!partitionedRegion.isCacheClosing() && !partitionedRegion.isDestroyed()
+            && !member.equals(id)) {
           Runnable postRecoveryTask = null;
 
           // Only schedule redundancy recovery if this not a fixed PR.
-          if (!prRegion.isFixedPartitionedRegion()) {
+          if (!partitionedRegion.isFixedPartitionedRegion()) {
             postRecoveryTask = () -> {
               // After the metadata has been cleaned, recover redundancy.
               scheduleRedundancyRecovery(id);
             };
           }
           // Schedule clean up the metadata for the failed member.
-          PartitionedRegionHelper.cleanUpMetaDataForRegion(prRegion.getCache(),
-              prRegion.getRegionIdentifier(), id, postRecoveryTask);
+          PartitionedRegionHelper.cleanUpMetaDataForRegion(partitionedRegion.getCache(),
+              partitionedRegion.getRegionIdentifier(), id, postRecoveryTask);
         }
-      } catch (CancelException e) {
+      } catch (CancelException ignore) {
         // ignore
       }
     }
   }
 
   private static class ManageBucketRsp {
+
     @Immutable
     private static final ManageBucketRsp NO = new ManageBucketRsp("NO");
     @Immutable
@@ -2044,6 +2098,7 @@ public class PRHARedundancyProvider {
   }
 
   private static class ArrayListWithClearState<T> extends ArrayList<T> {
+
     private static final long serialVersionUID = 1L;
     private volatile boolean wasCleared;
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 476bc81..953eada 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -768,7 +768,7 @@ public class PartitionedRegion extends LocalRegion
     this.distAdvisor = RegionAdvisor.createRegionAdvisor(this);
     senderIdMonitor = createSenderIdMonitor();
     // Warning: potential early escape of instance
-    this.redundancyProvider = new PRHARedundancyProvider(this);
+    this.redundancyProvider = new PRHARedundancyProvider(this, cache.getInternalResourceManager());
 
     // localCacheEnabled = ra.getPartitionAttributes().isLocalCacheEnabled();
     // This is to make sure that local-cache get and put works properly.
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
index d16a7af..a7bd338 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
@@ -15,7 +15,6 @@
 package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
@@ -24,60 +23,76 @@ import static org.mockito.Mockito.when;
 import java.util.HashSet;
 
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.mockito.quality.Strictness;
 
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.internal.cache.control.InternalResourceManager;
 import org.apache.geode.internal.cache.partitioned.InternalPRInfo;
 import org.apache.geode.internal.cache.partitioned.LoadProbe;
 import org.apache.geode.internal.cache.partitioned.PersistentBucketRecoverer;
+import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
 
 public class PRHARedundancyProviderTest {
 
+  private InternalCache cache;
   private PartitionedRegion partitionedRegion;
+  private InternalResourceManager resourceManager;
+
   private PRHARedundancyProvider prHaRedundancyProvider;
 
+  @Rule
+  public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
+
   @Before
   public void setUp() {
-    partitionedRegion = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS);
-    InternalCache cache = mock(InternalCache.class);
-    DistributedRegion rootRegion = mock(DistributedRegion.class);
-
-    when(partitionedRegion.getCache()).thenReturn(cache);
-    when(cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, true)).thenReturn(rootRegion);
-
-    prHaRedundancyProvider = spy(new PRHARedundancyProvider(partitionedRegion));
+    cache = mock(InternalCache.class);
+    partitionedRegion = mock(PartitionedRegion.class);
+    resourceManager = mock(InternalResourceManager.class);
   }
 
   @Test
   public void waitForPersistentBucketRecoveryProceedsWhenPersistentBucketRecovererLatchIsNotSet() {
-    PersistentBucketRecoverer recoverer = mock(PersistentBucketRecoverer.class);
-    when(prHaRedundancyProvider.getPersistentBucketRecoverer()).thenReturn(recoverer);
+    prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager,
+        (a, b) -> mock(PersistentBucketRecoverer.class));
 
     prHaRedundancyProvider.waitForPersistentBucketRecovery();
   }
 
   @Test
   public void waitForPersistentBucketRecoveryProceedsAfterLatchCountDown() {
-    PersistentBucketRecoverer recoverer =
-        spy(new PersistentBucketRecoverer(prHaRedundancyProvider, 1));
-    when(prHaRedundancyProvider.getPersistentBucketRecoverer()).thenReturn(recoverer);
+    when(cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, true))
+        .thenReturn(mock(DistributedRegion.class));
+    when(partitionedRegion.getCache()).thenReturn(cache);
+    when(partitionedRegion.getDataPolicy()).thenReturn(DataPolicy.PARTITION);
+    when(partitionedRegion.getPartitionAttributes()).thenReturn(mock(PartitionAttributes.class));
+    prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager,
+        (a, b) -> spy(new ThreadlessPersistentBucketRecoverer(a, b)));
+    prHaRedundancyProvider.createPersistentBucketRecoverer(1);
     prHaRedundancyProvider.getPersistentBucketRecoverer().countDown();
 
     prHaRedundancyProvider.waitForPersistentBucketRecovery();
 
-    verify(recoverer).await();
+    verify(prHaRedundancyProvider.getPersistentBucketRecoverer()).await();
   }
 
   @Test
   public void buildPartitionedRegionInfo() {
-    when(partitionedRegion.getRegionAdvisor().adviseDataStore()).thenReturn(new HashSet<>());
-    when(partitionedRegion.getRegionAdvisor().getProxyBucketArray())
-        .thenReturn(new ProxyBucketRegion[] {});
-
-    when(partitionedRegion.getTotalNumberOfBuckets()).thenReturn(42);
-    when(partitionedRegion.getRegionAdvisor().getCreatedBucketsCount()).thenReturn(17);
+    prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager,
+        (a, b) -> mock(PersistentBucketRecoverer.class));
+    when(partitionedRegion.getRedundancyTracker())
+        .thenReturn(mock(PartitionedRegionRedundancyTracker.class));
+    when(partitionedRegion.getRedundancyTracker().getActualRedundancy()).thenReturn(33);
     when(partitionedRegion.getRedundancyTracker().getLowRedundancyBuckets()).thenReturn(3);
     when(partitionedRegion.getRedundantCopies()).thenReturn(12);
-    when(partitionedRegion.getRedundancyTracker().getActualRedundancy()).thenReturn(33);
+    when(partitionedRegion.getRegionAdvisor()).thenReturn(mock(RegionAdvisor.class));
+    when(partitionedRegion.getRegionAdvisor().adviseDataStore()).thenReturn(new HashSet<>());
+    when(partitionedRegion.getRegionAdvisor().getCreatedBucketsCount()).thenReturn(17);
+    when(partitionedRegion.getTotalNumberOfBuckets()).thenReturn(42);
 
     InternalPRInfo internalPRInfo =
         prHaRedundancyProvider.buildPartitionedRegionInfo(false, mock(LoadProbe.class));
@@ -88,4 +103,17 @@ public class PRHARedundancyProviderTest {
     assertThat(internalPRInfo.getConfiguredRedundantCopies()).isEqualTo(12);
     assertThat(internalPRInfo.getActualRedundantCopies()).isEqualTo(33);
   }
+
+  private static class ThreadlessPersistentBucketRecoverer extends PersistentBucketRecoverer {
+
+    public ThreadlessPersistentBucketRecoverer(
+        PRHARedundancyProvider prhaRedundancyProvider, int proxyBuckets) {
+      super(prhaRedundancyProvider, proxyBuckets);
+    }
+
+    @Override
+    public void startLoggingThread() {
+      // do nothing
+    }
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecovererTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecovererTest.java
index 12168cd..a45b72b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecovererTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecovererTest.java
@@ -27,21 +27,27 @@ import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.PRHARedundancyProvider;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.internal.cache.control.InternalResourceManager;
 
 public class PersistentBucketRecovererTest {
+
   private PartitionedRegion partitionedRegion;
   private InternalCache cache;
   private DistributedRegion root;
   private PRHARedundancyProvider provider;
+  private InternalResourceManager resourceManager;
 
   @Before
   public void setUp() {
     partitionedRegion = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS);
+    resourceManager = mock(InternalResourceManager.class);
     cache = mock(InternalCache.class);
     root = mock(DistributedRegion.class);
+
     when(partitionedRegion.getCache()).thenReturn(cache);
     when(cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, true)).thenReturn(root);
-    provider = new PRHARedundancyProvider(partitionedRegion);
+
+    provider = new PRHARedundancyProvider(partitionedRegion, resourceManager);
   }
 
   @Test