You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2019/07/08 16:23:43 UTC
[geode] branch develop updated: GEODE-6918: Cleanup
PRHARedundancyProvider and PRHARedundancyProviderTest (#3779)
This is an automated email from the ASF dual-hosted git repository.
klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new f688bbb GEODE-6918: Cleanup PRHARedundancyProvider and PRHARedundancyProviderTest (#3779)
f688bbb is described below
commit f688bbbe773b141dce6d635e830295304186886e
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Mon Jul 8 09:23:31 2019 -0700
GEODE-6918: Cleanup PRHARedundancyProvider and PRHARedundancyProviderTest (#3779)
* Cleanup recommendations found in review
* Use constructor injection
* Use MockitoRule and STRICT_STUBS
* Improve field and variable names
* Improve overall consistency, formatting, and comments
Co-authored-by: Aaron Lindsey <al...@pivotal.io>
---
.../control/RebalanceOperationDistributedTest.java | 4 +-
.../internal/cache/PRHARedundancyProvider.java | 741 +++++++++++----------
.../geode/internal/cache/PartitionedRegion.java | 2 +-
.../internal/cache/PRHARedundancyProviderTest.java | 72 +-
.../partitioned/PersistentBucketRecovererTest.java | 8 +-
5 files changed, 459 insertions(+), 368 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java
index 69a1a85..53ef1a7 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java
@@ -36,6 +36,7 @@ import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import java.io.File;
@@ -566,7 +567,8 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
PartitionedRegion origRegion = (PartitionedRegion) cache.getRegion("region1");
PartitionedRegion spyRegion = spy(origRegion);
- PRHARedundancyProvider redundancyProvider = spy(new PRHARedundancyProvider(spyRegion));
+ PRHARedundancyProvider redundancyProvider =
+ spy(new PRHARedundancyProvider(spyRegion, mock(InternalResourceManager.class)));
// return the spied region when ever getPartitionedRegions() is invoked
Set<PartitionedRegion> parRegions = cache.getPartitionedRegions();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
index 9f0eb3b..cf70f2f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
@@ -14,9 +14,13 @@
*/
package org.apache.geode.internal.cache;
+import static java.lang.String.format;
import static java.lang.System.lineSeparator;
+import static java.util.Collections.emptyList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.geode.distributed.internal.DistributionConfig.GEMFIRE_PREFIX;
+import static org.apache.geode.internal.cache.ColocationHelper.checkMembersColocation;
+import static org.apache.geode.internal.cache.PartitionedRegionHelper.printCollection;
import java.util.ArrayList;
import java.util.Collection;
@@ -34,12 +38,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.Immutable;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.cache.PartitionedRegionStorageException;
import org.apache.geode.cache.Region;
@@ -93,13 +99,13 @@ import org.apache.geode.internal.monitoring.ThreadsMonitoring;
* This class provides the redundancy management for partitioned region. It will provide the
* following to the PartitionedRegion:
*
- * <pre>
- * (1) Redundancy Management at the time of bucket creation.
- * (2) Redundancy management at the new node arrival.
- * (3) Redundancy management when the node leaves the partitioned region distributed system
+ * <ol>
+ * <li>Redundancy management at the time of bucket creation.
+ * <li>Redundancy management at the new node arrival.
+ * <li>Redundancy management when the node leaves the partitioned region distributed system
* gracefully. i.e. Cache.close()
- * (4) Redundancy management at random node failure.
- * </pre>
+ * <li>Redundancy management at random node failure.
+ * </ol>
*/
public class PRHARedundancyProvider {
private static final Logger logger = LogService.getLogger();
@@ -145,7 +151,7 @@ public class PRHARedundancyProvider {
@MakeNotStatic
private final AtomicBoolean firstInsufficientStoresLogged = new AtomicBoolean(false);
- private final PartitionedRegion prRegion;
+ private final PartitionedRegion partitionedRegion;
/**
* An executor to submit tasks for redundancy recovery too. It makes sure that there will only be
@@ -155,6 +161,8 @@ public class PRHARedundancyProvider {
private final Object shutdownLock = new Object();
+ private final BiFunction<PRHARedundancyProvider, Integer, PersistentBucketRecoverer> persistentBucketRecovererFunction;
+
private volatile ScheduledFuture<?> recoveryFuture;
/**
@@ -164,58 +172,69 @@ public class PRHARedundancyProvider {
private boolean shutdown;
+
/**
* Constructor for PRHARedundancyProvider.
*
- * @param region The PartitionedRegion for which the HA redundancy is required to be managed.
+ * @param partitionedRegion The PartitionedRegion for which the HA redundancy is required to be
+ * managed.
*/
- public PRHARedundancyProvider(final PartitionedRegion region) {
- prRegion = region;
- final InternalResourceManager resourceManager =
- region.getGemFireCache().getInternalResourceManager();
+ public PRHARedundancyProvider(PartitionedRegion partitionedRegion,
+ InternalResourceManager resourceManager) {
+ this(partitionedRegion, resourceManager, PersistentBucketRecoverer::new);
+ }
+
+ @VisibleForTesting
+ PRHARedundancyProvider(PartitionedRegion partitionedRegion,
+ InternalResourceManager resourceManager,
+ BiFunction<PRHARedundancyProvider, Integer, PersistentBucketRecoverer> persistentBucketRecovererFunction) {
+ this.partitionedRegion = partitionedRegion;
recoveryExecutor = new OneTaskOnlyExecutor(resourceManager.getExecutor(),
- () -> InternalResourceManager.getResourceObserver().recoveryConflated(region),
+ () -> InternalResourceManager.getResourceObserver().recoveryConflated(partitionedRegion),
getThreadMonitorObj());
+ this.persistentBucketRecovererFunction = persistentBucketRecovererFunction;
}
/**
* Display bucket allocation status
*
- * @param prRegion the given region
+ * @param partitionedRegion the given region
* @param allStores the list of available stores. If null, unknown.
* @param alreadyUsed stores allocated; only used if allStores != null
* @param forLog true if the generated string is for a log message
* @return the description string
*/
- private static String regionStatus(PartitionedRegion prRegion,
+ private static String regionStatus(PartitionedRegion partitionedRegion,
Collection<InternalDistributedMember> allStores,
Collection<InternalDistributedMember> alreadyUsed, boolean forLog) {
+ String newLine = forLog ? " " : lineSeparator();
+ String spaces = forLog ? "" : " ";
+
StringBuilder sb = new StringBuilder();
- sb.append("Partitioned Region name = ").append(prRegion.getFullPath());
- final String newLine;
- final String spaces;
- if (forLog) {
- newLine = " ";
- spaces = "";
- } else {
- newLine = lineSeparator();
- spaces = " ";
- }
+ sb.append("Partitioned Region name = ");
+ sb.append(partitionedRegion.getFullPath());
+
if (allStores != null) {
- sb.append(newLine).append(spaces).append("Redundancy level set to ")
- .append(prRegion.getRedundantCopies());
- sb.append(newLine).append(". Number of available data stores: ").append(allStores.size());
- sb.append(newLine).append(spaces).append(". Number successfully allocated = ")
- .append(alreadyUsed.size());
- sb.append(newLine).append(". Data stores: ")
- .append(PartitionedRegionHelper.printCollection(allStores));
- sb.append(newLine).append(". Data stores successfully allocated: ")
- .append(PartitionedRegionHelper.printCollection(alreadyUsed));
- sb.append(newLine).append(". Equivalent members: ").append(PartitionedRegionHelper
- .printCollection(prRegion
- .getDistributionManager()
- .getMembersInThisZone()));
+ sb.append(newLine).append(spaces);
+ sb.append("Redundancy level set to ");
+ sb.append(partitionedRegion.getRedundantCopies());
+ sb.append(newLine);
+ sb.append(". Number of available data stores: ");
+ sb.append(allStores.size());
+ sb.append(newLine).append(spaces);
+ sb.append(". Number successfully allocated = ");
+ sb.append(alreadyUsed.size());
+ sb.append(newLine);
+ sb.append(". Data stores: ");
+ sb.append(printCollection(allStores));
+ sb.append(newLine);
+ sb.append(". Data stores successfully allocated: ");
+ sb.append(printCollection(alreadyUsed));
+ sb.append(newLine);
+ sb.append(". Equivalent members: ");
+ sb.append(printCollection(partitionedRegion.getDistributionManager().getMembersInThisZone()));
}
+
return sb.toString();
}
@@ -226,31 +245,28 @@ public class PRHARedundancyProvider {
* @param alreadyUsed those that have already accepted, only used if allStores != null
* @param opString description of the operation which timed out
*/
- public static void timedOut(PartitionedRegion prRegion, Set<InternalDistributedMember> allStores,
- Collection<InternalDistributedMember> alreadyUsed,
- String opString, long timeOut) {
- final String tooManyRetries =
- String.format("Timed out attempting to %s in the partitioned region.%sWaited for: %s ms.",
- opString,
- regionStatus(prRegion, allStores, alreadyUsed, true), timeOut)
- + TIMEOUT_MSG;
- throw new PartitionedRegionStorageException(tooManyRetries);
+ public static void timedOut(PartitionedRegion partitionedRegion,
+ Set<InternalDistributedMember> allStores,
+ Collection<InternalDistributedMember> alreadyUsed, String opString, long timeOut) {
+ throw new PartitionedRegionStorageException(
+ format("Timed out attempting to %s in the partitioned region.%sWaited for: %s ms.",
+ opString, regionStatus(partitionedRegion, allStores, alreadyUsed, true), timeOut)
+ + TIMEOUT_MSG);
}
public PartitionedRegion getPartitionedRegion() {
- return prRegion;
+ return partitionedRegion;
}
private Set<InternalDistributedMember> getAllStores(String partitionName) {
if (partitionName != null) {
-
return getFixedPartitionStores(partitionName);
}
final Set<InternalDistributedMember> allStores =
- prRegion.getRegionAdvisor().adviseDataStore(true);
- PartitionedRegionDataStore myDS = prRegion.getDataStore();
- if (myDS != null) {
- allStores.add(prRegion.getDistributionManager().getId());
+ partitionedRegion.getRegionAdvisor().adviseDataStore(true);
+ PartitionedRegionDataStore localDataStore = partitionedRegion.getDataStore();
+ if (localDataStore != null) {
+ allStores.add(partitionedRegion.getDistributionManager().getId());
}
return allStores;
}
@@ -263,14 +279,15 @@ public class PRHARedundancyProvider {
*/
private Set<InternalDistributedMember> getFixedPartitionStores(String partitionName) {
Set<InternalDistributedMember> members =
- prRegion.getRegionAdvisor().adviseFixedPartitionDataStores(partitionName);
+ partitionedRegion.getRegionAdvisor().adviseFixedPartitionDataStores(partitionName);
- List<FixedPartitionAttributesImpl> FPAs = prRegion.getFixedPartitionAttributesImpl();
+ List<FixedPartitionAttributesImpl> allFixedPartitionAttributes =
+ partitionedRegion.getFixedPartitionAttributesImpl();
- if (FPAs != null) {
- for (FixedPartitionAttributesImpl fpa : FPAs) {
- if (fpa.getPartitionName().equals(partitionName)) {
- members.add(prRegion.getMyId());
+ if (allFixedPartitionAttributes != null) {
+ for (FixedPartitionAttributesImpl fixedPartitionAttributes : allFixedPartitionAttributes) {
+ if (fixedPartitionAttributes.getPartitionName().equals(partitionName)) {
+ members.add(partitionedRegion.getMyId());
}
}
}
@@ -286,28 +303,17 @@ public class PRHARedundancyProvider {
*/
private void insufficientStores(Set<InternalDistributedMember> allStores,
Collection<InternalDistributedMember> alreadyUsed, boolean onlyLog) {
- final String regionStat = regionStatus(prRegion, allStores, alreadyUsed, onlyLog);
- final String newLine;
+ String regionStat = regionStatus(partitionedRegion, allStores, alreadyUsed, onlyLog);
+ String newLine = onlyLog ? " " : lineSeparator();
+ String notEnoughValidNodes = alreadyUsed.isEmpty()
+ ? "Unable to find any members to host a bucket in the partitioned region. %s.%s"
+ : "Configured redundancy level could not be satisfied. %s to satisfy redundancy for the region.%s";
if (onlyLog) {
- newLine = " ";
- } else {
- newLine = lineSeparator();
- }
- final String notEnoughValidNodes;
- if (alreadyUsed.isEmpty()) {
- notEnoughValidNodes =
- "Unable to find any members to host a bucket in the partitioned region. %s.%s";
- } else {
- notEnoughValidNodes =
- "Configured redundancy level could not be satisfied. %s to satisfy redundancy for the region.%s";
- }
- final Object[] notEnoughValidNodesArgs = new Object[] {
- INSUFFICIENT_STORES_MSG, newLine + regionStat + newLine};
- if (onlyLog) {
- logger.warn(String.format(notEnoughValidNodes, notEnoughValidNodesArgs));
+ logger.warn(format(notEnoughValidNodes, INSUFFICIENT_STORES_MSG,
+ newLine + regionStat + newLine));
} else {
throw new PartitionedRegionStorageException(
- String.format(notEnoughValidNodes, notEnoughValidNodesArgs));
+ format(notEnoughValidNodes, INSUFFICIENT_STORES_MSG, newLine + regionStat + newLine));
}
}
@@ -322,13 +328,13 @@ public class PRHARedundancyProvider {
* @return the new member, null if it fails.
* @throws PartitionedRegionStorageException if there are not enough data stores
*/
- private InternalDistributedMember createBucketInstance(int bucketId, final int newBucketSize,
- final Collection<InternalDistributedMember> excludedMembers,
+ private InternalDistributedMember createBucketInstance(int bucketId, int newBucketSize,
+ Collection<InternalDistributedMember> excludedMembers,
Collection<InternalDistributedMember> alreadyUsed,
- ArrayListWithClearState<InternalDistributedMember> failedMembers, final long timeOut,
- final Set<InternalDistributedMember> allStores) {
+ ArrayListWithClearState<InternalDistributedMember> failedMembers, long timeOut,
+ Set<InternalDistributedMember> allStores) {
- final boolean isDebugEnabled = logger.isDebugEnabled();
+ boolean isDebugEnabled = logger.isDebugEnabled();
// Recalculate list of candidates
Set<InternalDistributedMember> candidateMembers = new HashSet<>(allStores);
@@ -340,8 +346,9 @@ public class PRHARedundancyProvider {
logger.debug("AllStores={} AlreadyUsed={} excluded={} failed={}", allStores, alreadyUsed,
excludedMembers, failedMembers);
}
+
if (candidateMembers.isEmpty()) {
- prRegion.checkReadiness();
+ partitionedRegion.checkReadiness();
// Run out of candidates. Refetch?
if (System.currentTimeMillis() > timeOut) {
@@ -373,18 +380,18 @@ public class PRHARedundancyProvider {
return null;
}
- // In case of FPR, candidateMembers is the set of members on which
+ // In case of FixedPartitionedRegion, candidateMembers is the set of members on which
// required fixed partition is defined.
InternalDistributedMember candidate;
- if (prRegion.isFixedPartitionedRegion()) {
+ if (partitionedRegion.isFixedPartitionedRegion()) {
candidate = candidateMembers.iterator().next();
} else {
- String prName = prRegion.getAttributes().getPartitionAttributes().getColocatedWith();
- if (prName != null) {
- candidate = getColocatedDataStore(candidateMembers, alreadyUsed, bucketId, prName);
+ String colocatedWith =
+ partitionedRegion.getAttributes().getPartitionAttributes().getColocatedWith();
+ if (colocatedWith != null) {
+ candidate = getColocatedDataStore(candidateMembers, alreadyUsed, bucketId, colocatedWith);
} else {
- final Collection<InternalDistributedMember> orderedCandidates =
- new ArrayList<>(candidateMembers);
+ Collection<InternalDistributedMember> orderedCandidates = new ArrayList<>(candidateMembers);
candidate = getPreferredDataStore(orderedCandidates, alreadyUsed);
}
}
@@ -394,23 +401,23 @@ public class PRHARedundancyProvider {
return null;
}
- if (!prRegion.isShadowPR()
- && !ColocationHelper.checkMembersColocation(prRegion, candidate)) {
+ if (!partitionedRegion.isShadowPR() && !checkMembersColocation(partitionedRegion, candidate)) {
if (isDebugEnabled) {
logger.debug(
- "createBucketInstances - Member does not have all of the regions colocated with prRegion {}",
+ "createBucketInstances - Member does not have all of the regions colocated with partitionedRegion {}",
candidate);
}
failedMembers.add(candidate);
return null;
}
- if (!candidate.equals(prRegion.getMyId())) {
- PartitionProfile pp = prRegion.getRegionAdvisor().getPartitionProfile(candidate);
- if (pp == null) {
+ if (!candidate.equals(partitionedRegion.getMyId())) {
+ PartitionProfile profile =
+ partitionedRegion.getRegionAdvisor().getPartitionProfile(candidate);
+ if (profile == null) {
if (isDebugEnabled) {
logger.debug("createBucketInstance: {}: no partition profile for {}",
- prRegion.getFullPath(), candidate);
+ partitionedRegion.getFullPath(), candidate);
}
failedMembers.add(candidate);
return null;
@@ -419,7 +426,7 @@ public class PRHARedundancyProvider {
// Coordinate with any remote close occurring, causing it to wait until
// this create bucket attempt has been made.
- final ManageBucketRsp response =
+ ManageBucketRsp response =
createBucketOnMember(bucketId, candidate, newBucketSize, failedMembers.wasCleared());
// Add targetNode to bucketNodes if successful, else to failedNodeList
@@ -430,7 +437,8 @@ public class PRHARedundancyProvider {
if (isDebugEnabled) {
logger.debug("createBucketInstance: {}: candidate {} declined to manage bucketId={}: {}",
- prRegion.getFullPath(), candidate, prRegion.bucketStringForLogs(bucketId),
+ partitionedRegion.getFullPath(), candidate,
+ partitionedRegion.bucketStringForLogs(bucketId),
response);
}
if (response.equals(ManageBucketRsp.CLOSED)) {
@@ -443,31 +451,32 @@ public class PRHARedundancyProvider {
}
InternalDistributedMember createBucketOnDataStore(int bucketId, int size,
- RetryTimeKeeper snoozer) {
+ RetryTimeKeeper retryTimeKeeper) {
+ boolean isDebugEnabled = logger.isDebugEnabled();
+
InternalDistributedMember primaryForFixedPartition = null;
- if (prRegion.isFixedPartitionedRegion()) {
+ if (partitionedRegion.isFixedPartitionedRegion()) {
primaryForFixedPartition =
- prRegion.getRegionAdvisor().adviseFixedPrimaryPartitionDataStore(bucketId);
+ partitionedRegion.getRegionAdvisor().adviseFixedPrimaryPartitionDataStore(bucketId);
}
- final boolean isDebugEnabled = logger.isDebugEnabled();
- InternalDistributedMember ret;
+ InternalDistributedMember memberHostingBucket;
Collection<InternalDistributedMember> attempted = new HashSet<>();
do {
- prRegion.checkReadiness();
+ partitionedRegion.checkReadiness();
Set<InternalDistributedMember> available =
- prRegion.getRegionAdvisor().adviseInitializedDataStore();
+ partitionedRegion.getRegionAdvisor().adviseInitializedDataStore();
available.removeAll(attempted);
- InternalDistributedMember target = null;
+ InternalDistributedMember targetMember = null;
for (InternalDistributedMember member : available) {
if (available.contains(primaryForFixedPartition)) {
- target = primaryForFixedPartition;
+ targetMember = primaryForFixedPartition;
} else {
- target = member;
+ targetMember = member;
}
break;
}
- if (target == null) {
+ if (targetMember == null) {
if (shouldLogInsufficientStores()) {
insufficientStores(available, Collections.emptySet(), true);
}
@@ -476,21 +485,23 @@ public class PRHARedundancyProvider {
}
try {
if (isDebugEnabled) {
- logger.debug("Attempting to get data store {} to create the bucket {} for us", target,
- prRegion.bucketStringForLogs(bucketId));
+ logger.debug("Attempting to get data store {} to create the bucket {} for us",
+ targetMember,
+ partitionedRegion.bucketStringForLogs(bucketId));
}
CreateBucketMessage.NodeResponse response =
- CreateBucketMessage.send(target, prRegion, bucketId, size);
- ret = response.waitForResponse();
- if (ret != null) {
- return ret;
+ CreateBucketMessage.send(targetMember, partitionedRegion, bucketId, size);
+ memberHostingBucket = response.waitForResponse();
+ if (memberHostingBucket != null) {
+ return memberHostingBucket;
}
} catch (ForceReattemptException e) {
// do nothing, we will already check again for a primary.
}
- attempted.add(target);
- } while ((ret = prRegion.getNodeForBucketWrite(bucketId, snoozer)) == null);
- return ret;
+ attempted.add(targetMember);
+ } while ((memberHostingBucket =
+ partitionedRegion.getNodeForBucketWrite(bucketId, retryTimeKeeper)) == null);
+ return memberHostingBucket;
}
/**
@@ -517,38 +528,40 @@ public class PRHARedundancyProvider {
* @throws PartitionOfflineException if persistent data recovery is not complete for a partitioned
* region referred to in the query.
*/
- public InternalDistributedMember createBucketAtomically(final int bucketId,
- final int newBucketSize, final boolean finishIncompleteCreation, String partitionName)
+ public InternalDistributedMember createBucketAtomically(int bucketId, int newBucketSize,
+ boolean finishIncompleteCreation, String partitionName)
throws PartitionedRegionStorageException, PartitionedRegionException,
PartitionOfflineException {
- final boolean isDebugEnabled = logger.isDebugEnabled();
+ boolean isDebugEnabled = logger.isDebugEnabled();
- prRegion.checkPROffline();
+ partitionedRegion.checkPROffline();
// If there are insufficient stores throw *before* we try acquiring the
// (very expensive) bucket lock or the (somewhat expensive) monitor on this
earlySufficientStoresCheck(partitionName);
synchronized (this) {
- if (prRegion.getCache().isCacheAtShutdownAll()) {
- throw prRegion.getCache().getCacheClosedException("Cache is shutting down");
+ if (partitionedRegion.getCache().isCacheAtShutdownAll()) {
+ throw partitionedRegion.getCache().getCacheClosedException("Cache is shutting down");
}
if (isDebugEnabled) {
logger.debug("Starting atomic creation of bucketId={}",
- prRegion.bucketStringForLogs(bucketId));
+ partitionedRegion.bucketStringForLogs(bucketId));
}
- final long timeOut = System.currentTimeMillis() + computeTimeout();
+
+ long timeOut = System.currentTimeMillis() + computeTimeout();
BucketMembershipObserver observer = null;
boolean needToElectPrimary = true;
InternalDistributedMember bucketPrimary = null;
+
try {
- prRegion.checkReadiness();
+ partitionedRegion.checkReadiness();
- Bucket toCreate = prRegion.getRegionAdvisor().getBucket(bucketId);
+ Bucket toCreate = partitionedRegion.getRegionAdvisor().getBucket(bucketId);
if (!finishIncompleteCreation) {
- bucketPrimary = prRegion.getBucketPrimary(bucketId);
+ bucketPrimary = partitionedRegion.getBucketPrimary(bucketId);
if (bucketPrimary != null) {
if (isDebugEnabled) {
logger.debug(
@@ -561,26 +574,27 @@ public class PRHARedundancyProvider {
}
observer = new BucketMembershipObserver(toCreate).beginMonitoring();
- // detected
ArrayListWithClearState<InternalDistributedMember> failedMembers =
new ArrayListWithClearState<>();
Set<InternalDistributedMember> excludedMembers = new HashSet<>();
Collection<InternalDistributedMember> acceptedMembers = new ArrayList<>();
+
for (boolean loggedInsufficientStores = false;;) {
- prRegion.checkReadiness();
- if (prRegion.getCache().isCacheAtShutdownAll()) {
+ partitionedRegion.checkReadiness();
+ if (partitionedRegion.getCache().isCacheAtShutdownAll()) {
if (isDebugEnabled) {
logger.debug("Aborted createBucketAtomically due to ShutdownAll");
}
- throw prRegion.getCache().getCacheClosedException("Cache is shutting down");
+ throw partitionedRegion.getCache().getCacheClosedException("Cache is shutting down");
}
long timeLeft = timeOut - System.currentTimeMillis();
if (timeLeft < 0) {
// It took too long.
- timedOut(prRegion, getAllStores(partitionName), acceptedMembers,
+ timedOut(partitionedRegion, getAllStores(partitionName), acceptedMembers,
ALLOCATE_ENOUGH_MEMBERS_TO_HOST_BUCKET, computeTimeout());
}
+
if (isDebugEnabled) {
logger.debug("createBucketAtomically: have {} ms left to finish this", timeLeft);
}
@@ -593,9 +607,10 @@ public class PRHARedundancyProvider {
InternalDistributedMember candidate = createBucketInstance(bucketId, newBucketSize,
excludedMembers, acceptedMembers, failedMembers, timeOut, allStores);
if (candidate != null) {
- if (prRegion.getDistributionManager().enforceUniqueZone()) {
+ if (partitionedRegion.getDistributionManager().enforceUniqueZone()) {
// enforceUniqueZone property has no effect for a loner
- if (!(prRegion.getDistributionManager() instanceof LonerDistributionManager)) {
+ if (!(partitionedRegion
+ .getDistributionManager() instanceof LonerDistributionManager)) {
Set<InternalDistributedMember> exm = getBuddyMembersInZone(candidate, allStores);
exm.remove(candidate);
exm.removeAll(acceptedMembers);
@@ -610,7 +625,7 @@ public class PRHARedundancyProvider {
// Get an updated list of bucket owners, which should include
// buckets created concurrently with this createBucketAtomically call
- acceptedMembers = prRegion.getRegionAdvisor().getBucketOwners(bucketId);
+ acceptedMembers = partitionedRegion.getRegionAdvisor().getBucketOwners(bucketId);
if (isDebugEnabled) {
logger.debug("Accepted members: {}", acceptedMembers);
@@ -630,18 +645,19 @@ public class PRHARedundancyProvider {
// select the primary.
// Have we exhausted all candidates?
- final int potentialCandidateCount = allStores.size()
+ int potentialCandidateCount = allStores.size()
- (excludedMembers.size() + acceptedMembers.size() + failedMembers.size());
+
// Determining exhausted members competes with bucket balancing; it's
// important to re-visit all failed members since "failed" set may
// contain datastores which at the moment are imbalanced, but yet could
// be candidates. If the failed members list is empty, its expected
// that the next iteration clears the (already empty) list.
- final boolean exhaustedPotentialCandidates =
+ boolean exhaustedPotentialCandidates =
failedMembers.wasCleared() && potentialCandidateCount <= 0;
- final boolean redundancySatisfied =
- acceptedMembers.size() > prRegion.getRedundantCopies();
- final boolean bucketNotCreated = acceptedMembers.isEmpty();
+ boolean redundancySatisfied =
+ acceptedMembers.size() > partitionedRegion.getRedundantCopies();
+ boolean bucketNotCreated = acceptedMembers.isEmpty();
if (isDebugEnabled) {
logger.debug(
@@ -665,8 +681,9 @@ public class PRHARedundancyProvider {
// The rest of the members will be allowed to volunteer for primary.
endBucketCreation(bucketId, acceptedMembers, bucketPrimary, partitionName);
- final int expectedRemoteHosts = acceptedMembers.size()
- - (acceptedMembers.contains(prRegion.getMyId()) ? 1 : 0);
+ int expectedRemoteHosts = acceptedMembers.size()
+ - (acceptedMembers.contains(partitionedRegion.getMyId()) ? 1 : 0);
+
boolean interrupted = Thread.interrupted();
try {
BucketMembershipObserverResults results = observer
@@ -678,13 +695,15 @@ public class PRHARedundancyProvider {
bucketPrimary = results.primary;
} catch (InterruptedException e) {
interrupted = true;
- prRegion.getCancelCriterion().checkCancelInProgress(e);
+ partitionedRegion.getCancelCriterion().checkCancelInProgress(e);
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
+
needToElectPrimary = false;
+
return bucketPrimary;
}
}
@@ -708,15 +727,17 @@ public class PRHARedundancyProvider {
if (observer != null) {
observer.stopMonitoring();
}
+
// Try to make sure everyone that created the bucket can volunteer for primary
if (needToElectPrimary) {
try {
- endBucketCreation(bucketId, prRegion.getRegionAdvisor().getBucketOwners(bucketId),
+ endBucketCreation(bucketId,
+ partitionedRegion.getRegionAdvisor().getBucketOwners(bucketId),
bucketPrimary, partitionName);
} catch (Exception e) {
// if region is going down, then no warning level logs
if (e instanceof CancelException
- || prRegion.getCancelCriterion().isCancelInProgress()) {
+ || partitionedRegion.getCancelCriterion().isCancelInProgress()) {
logger.debug("Exception trying choose a primary after bucket creation failure", e);
} else {
logger.warn("Exception trying choose a primary after bucket creation failure", e);
@@ -747,24 +768,27 @@ public class PRHARedundancyProvider {
// secondary partition will become primary
if (partitionName != null) {
if (isLocalPrimary(partitionName)) {
- targetPrimary = prRegion.getMyId();
+ targetPrimary = partitionedRegion.getMyId();
} else {
targetPrimary =
- prRegion.getRegionAdvisor().adviseFixedPrimaryPartitionDataStore(bucketId);
+ partitionedRegion.getRegionAdvisor().adviseFixedPrimaryPartitionDataStore(bucketId);
if (targetPrimary == null) {
Set<InternalDistributedMember> fpDataStores = getFixedPartitionStores(partitionName);
targetPrimary = fpDataStores.iterator().next();
}
}
}
+
if (targetPrimary == null) {
// we need to select the same primary as chosen earlier (e.g.
// the parent's in case of colocation) so it is now passed
targetPrimary =
getPreferredDataStore(acceptedMembers, Collections.emptySet());
}
- boolean isHosting = acceptedMembers.remove(prRegion.getDistributionManager().getId());
- EndBucketCreationMessage.send(acceptedMembers, targetPrimary, prRegion, bucketId);
+
+ boolean isHosting = acceptedMembers.remove(partitionedRegion.getDistributionManager().getId());
+
+ EndBucketCreationMessage.send(acceptedMembers, targetPrimary, partitionedRegion, bucketId);
if (isHosting) {
endBucketCreationLocally(bucketId, targetPrimary);
@@ -772,10 +796,12 @@ public class PRHARedundancyProvider {
}
private boolean isLocalPrimary(String partitionName) {
- List<FixedPartitionAttributesImpl> FPAs = prRegion.getFixedPartitionAttributesImpl();
- if (FPAs != null) {
- for (FixedPartitionAttributesImpl fpa : FPAs) {
- if (fpa.getPartitionName().equals(partitionName) && fpa.isPrimary()) {
+ List<FixedPartitionAttributesImpl> allFixedPartitionAttributes =
+ partitionedRegion.getFixedPartitionAttributesImpl();
+ if (allFixedPartitionAttributes != null) {
+ for (FixedPartitionAttributesImpl fixedPartitionAttributes : allFixedPartitionAttributes) {
+ if (fixedPartitionAttributes.getPartitionName().equals(partitionName)
+ && fixedPartitionAttributes.isPrimary()) {
return true;
}
}
@@ -786,17 +812,18 @@ public class PRHARedundancyProvider {
public void endBucketCreationLocally(int bucketId, InternalDistributedMember newPrimary) {
// Don't elect ourselves as primary or tell others to persist our ID
// if this member has been destroyed.
- if (prRegion.getCancelCriterion().isCancelInProgress() || prRegion.isDestroyed()) {
+ if (partitionedRegion.getCancelCriterion().isCancelInProgress()
+ || partitionedRegion.isDestroyed()) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("endBucketCreationLocally: for region {} bucketId={} new primary: {}",
- prRegion.getFullPath(), bucketId, newPrimary);
+ partitionedRegion.getFullPath(), bucketId, newPrimary);
}
- BucketAdvisor bucketAdvisor = prRegion.getRegionAdvisor().getBucketAdvisor(bucketId);
- final ProxyBucketRegion proxyBucketRegion = bucketAdvisor.getProxyBucketRegion();
+ BucketAdvisor bucketAdvisor = partitionedRegion.getRegionAdvisor().getBucketAdvisor(bucketId);
+ ProxyBucketRegion proxyBucketRegion = bucketAdvisor.getProxyBucketRegion();
BucketPersistenceAdvisor persistentAdvisor = proxyBucketRegion.getPersistenceAdvisor();
// prevent multiple threads from ending bucket creation at the same time.
@@ -804,8 +831,8 @@ public class PRHARedundancyProvider {
if (persistentAdvisor != null) {
BucketRegion realBucket = proxyBucketRegion.getCreatedBucketRegion();
if (realBucket != null) {
- PersistentMemberID persistentID = realBucket.getPersistentID();
- persistentAdvisor.endBucketCreation(persistentID);
+ PersistentMemberID persistentId = realBucket.getPersistentID();
+ persistentAdvisor.endBucketCreation(persistentId);
}
}
@@ -813,7 +840,7 @@ public class PRHARedundancyProvider {
// may not have. So now we wait for the chosen member to become primary.
bucketAdvisor.setPrimaryElector(newPrimary);
- if (prRegion.getGemFireCache().getMyId().equals(newPrimary)) {
+ if (partitionedRegion.getGemFireCache().getMyId().equals(newPrimary)) {
// If we're the chosen primary, volunteer for primary now
if (bucketAdvisor.isHosting()) {
bucketAdvisor.clearPrimaryElector();
@@ -841,7 +868,8 @@ public class PRHARedundancyProvider {
bucketAdvisor.endBucketCreation();
}
- List<PartitionedRegion> colocatedWithList = ColocationHelper.getColocatedChildRegions(prRegion);
+ List<PartitionedRegion> colocatedWithList = ColocationHelper.getColocatedChildRegions(
+ partitionedRegion);
for (PartitionedRegion child : colocatedWithList) {
if (child.getRegionAdvisor().isBucketLocal(bucketId)) {
child.getRedundancyProvider().endBucketCreationLocally(bucketId, newPrimary);
@@ -856,10 +884,8 @@ public class PRHARedundancyProvider {
* @since GemFire 5.9
*/
private Set<InternalDistributedMember> getBuddyMembersInZone(
- final InternalDistributedMember acceptedMember,
- final Collection<InternalDistributedMember> allStores) {
-
- DistributionManager dm = prRegion.getDistributionManager();
+ InternalDistributedMember acceptedMember, Collection<InternalDistributedMember> allStores) {
+ DistributionManager dm = partitionedRegion.getDistributionManager();
Set<InternalDistributedMember> buddies = dm.getMembersInSameZone(acceptedMember);
buddies.retainAll(allStores);
return buddies;
@@ -876,9 +902,9 @@ public class PRHARedundancyProvider {
Set<InternalDistributedMember> currentStores = getAllStores(partitionName);
if (currentStores.isEmpty()) {
if (shouldLogInsufficientStores()) {
- insufficientStores(currentStores, Collections.emptyList(), true);
+ insufficientStores(currentStores, emptyList(), true);
}
- insufficientStores(currentStores, Collections.emptyList(), false);
+ insufficientStores(currentStores, emptyList(), false);
}
}
@@ -915,7 +941,7 @@ public class PRHARedundancyProvider {
return millis;
}
}
- return prRegion.getRetryTimeout();
+ return partitionedRegion.getRetryTimeout();
}
/**
@@ -926,23 +952,22 @@ public class PRHARedundancyProvider {
* @param loggedInsufficientStores indicates whether a warning has been logged
* @return true when a warning has been logged, false if a warning should be logged.
*/
- private boolean checkSufficientStores(final Set<InternalDistributedMember> allStores,
- final boolean loggedInsufficientStores) {
+ private boolean checkSufficientStores(Set<InternalDistributedMember> allStores,
+ boolean loggedInsufficientStores) {
// Report (only once) if insufficient data store have been detected.
if (!loggedInsufficientStores) {
if (allStores.isEmpty()) {
- insufficientStores(allStores, Collections.emptyList(), true);
+ insufficientStores(allStores, emptyList(), true);
return true;
}
} else {
if (!allStores.isEmpty()) {
// Excellent, sufficient resources were found!
- final String logStr = "{} Region name, {}";
- logger.info(logStr, SUFFICIENT_STORES_MSG, prRegion.getFullPath());
+ logger.info("{} Region name, {}", SUFFICIENT_STORES_MSG, partitionedRegion.getFullPath());
return false;
}
// Already logged warning, there are no datastores
- insufficientStores(allStores, Collections.emptyList(), false);
+ insufficientStores(allStores, emptyList(), false);
}
return loggedInsufficientStores;
}
@@ -950,18 +975,19 @@ public class PRHARedundancyProvider {
/**
* Clean up locally created bucket and tell other VMs to attempt recovering redundancy
*
- * @param buck the bucket identifier
+ * @param bucketId the bucket identifier
*/
- private void cleanUpBucket(int buck) {
- Set<InternalDistributedMember> dataStores = prRegion.getRegionAdvisor().adviseDataStore();
- BucketBackupMessage.send(dataStores, prRegion, buck);
+ private void cleanUpBucket(int bucketId) {
+ Set<InternalDistributedMember> dataStores =
+ partitionedRegion.getRegionAdvisor().adviseDataStore();
+ BucketBackupMessage.send(dataStores, partitionedRegion, bucketId);
}
public void finishIncompleteBucketCreation(int bucketId) {
String partitionName = null;
- if (prRegion.isFixedPartitionedRegion()) {
+ if (partitionedRegion.isFixedPartitionedRegion()) {
FixedPartitionAttributesImpl fpa =
- PartitionedRegionHelper.getFixedPartitionAttributesForBucket(prRegion, bucketId);
+ PartitionedRegionHelper.getFixedPartitionAttributesForBucket(partitionedRegion, bucketId);
partitionName = fpa.getPartitionName();
}
createBucketAtomically(bucketId, 0, true, partitionName);
@@ -974,41 +1000,44 @@ public class PRHARedundancyProvider {
* @param isRebalance true if bucket creation is directed by rebalancing
* @return true if the bucket was sucessfully created
*/
- public boolean createBackupBucketOnMember(final int bucketId,
- final InternalDistributedMember targetNMember, final boolean isRebalance,
- boolean replaceOfflineData, InternalDistributedMember moveSource, boolean forceCreation) {
+ public boolean createBackupBucketOnMember(int bucketId, InternalDistributedMember targetMember,
+ boolean isRebalance, boolean replaceOfflineData, InternalDistributedMember fromMember,
+ boolean forceCreation) {
if (logger.isDebugEnabled()) {
logger.debug("createBackupBucketOnMember for bucketId={} member: {}",
- prRegion.bucketStringForLogs(bucketId), targetNMember);
+ partitionedRegion.bucketStringForLogs(bucketId), targetMember);
}
- if (!targetNMember.equals(prRegion.getMyId())) {
- PartitionProfile pp = prRegion.getRegionAdvisor().getPartitionProfile(targetNMember);
- if (pp == null) {
+ if (!targetMember.equals(partitionedRegion.getMyId())) {
+ PartitionProfile profile =
+ partitionedRegion.getRegionAdvisor().getPartitionProfile(targetMember);
+ if (profile == null) {
return false;
}
try {
ManageBackupBucketMessage.NodeResponse response =
- ManageBackupBucketMessage.send(targetNMember, prRegion, bucketId, isRebalance,
- replaceOfflineData, moveSource, forceCreation);
+ ManageBackupBucketMessage.send(targetMember, partitionedRegion, bucketId, isRebalance,
+ replaceOfflineData, fromMember, forceCreation);
if (response.waitForAcceptance()) {
if (logger.isDebugEnabled()) {
logger.debug(
"createBackupBucketOnMember: Bucket creation succeed for bucketId={} on member = {}",
- prRegion.bucketStringForLogs(bucketId), targetNMember);
+ partitionedRegion.bucketStringForLogs(bucketId), targetMember);
}
return true;
}
+
if (logger.isDebugEnabled()) {
logger.debug(
"createBackupBucketOnMember: Bucket creation failed for bucketId={} on member = {}",
- prRegion.bucketStringForLogs(bucketId), targetNMember);
+ partitionedRegion.bucketStringForLogs(bucketId), targetMember);
}
return false;
+
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
@@ -1027,19 +1056,21 @@ public class PRHARedundancyProvider {
|| e.getCause() != null && e.getCause() instanceof CancelException) {
// no need to log exceptions caused by cache closure
} else {
- logger.warn("Exception creating partition on {}", targetNMember, e);
+ logger.warn("Exception creating partition on {}", targetMember, e);
}
return false;
}
}
- final PartitionedRegionDataStore prDS = prRegion.getDataStore();
- boolean bucketManaged = prDS != null && prDS.grabBucket(bucketId, moveSource, forceCreation,
- replaceOfflineData, isRebalance, null, false).equals(CreateBucketResult.CREATED);
+
+ PartitionedRegionDataStore dataStore = partitionedRegion.getDataStore();
+ boolean bucketManaged =
+ dataStore != null && dataStore.grabBucket(bucketId, fromMember, forceCreation,
+ replaceOfflineData, isRebalance, null, false).equals(CreateBucketResult.CREATED);
if (!bucketManaged) {
if (logger.isDebugEnabled()) {
logger.debug(
- "createBackupBucketOnMember: Local data store refused to accommodate the data for bucketId={} prDS={}",
- prRegion.bucketStringForLogs(bucketId), prDS);
+ "createBackupBucketOnMember: Local data store refused to accommodate the data for bucketId={} dataStore={}",
+ partitionedRegion.bucketStringForLogs(bucketId), dataStore);
}
}
return bucketManaged;
@@ -1047,9 +1078,9 @@ public class PRHARedundancyProvider {
private boolean getForceLocalPrimaries() {
boolean result = false;
- Boolean v = forceLocalPrimaries.get();
- if (v != null) {
- result = v;
+ Boolean forceLocalPrimariesValue = forceLocalPrimaries.get();
+ if (forceLocalPrimariesValue != null) {
+ result = forceLocalPrimariesValue;
}
return result;
}
@@ -1061,42 +1092,44 @@ public class PRHARedundancyProvider {
* ignoring it's maximums
* @return a response object
*/
- private ManageBucketRsp createBucketOnMember(final int bucketId,
- final InternalDistributedMember targetNMember, final int newBucketSize,
- boolean forceCreation) {
+ private ManageBucketRsp createBucketOnMember(int bucketId, InternalDistributedMember targetMember,
+ int newBucketSize, boolean forceCreation) {
if (logger.isDebugEnabled()) {
logger.debug("createBucketOnMember for bucketId={} member: {}{}",
- prRegion.bucketStringForLogs(bucketId), targetNMember,
+ partitionedRegion.bucketStringForLogs(bucketId), targetMember,
forceCreation ? " forced" : "");
}
- if (!targetNMember.equals(prRegion.getMyId())) {
- PartitionProfile pp = prRegion.getRegionAdvisor().getPartitionProfile(targetNMember);
- if (pp == null) {
+ if (!targetMember.equals(partitionedRegion.getMyId())) {
+ PartitionProfile profile =
+ partitionedRegion.getRegionAdvisor().getPartitionProfile(targetMember);
+ if (profile == null) {
return ManageBucketRsp.NO;
}
try {
- NodeResponse response = ManageBucketMessage.send(targetNMember, prRegion, bucketId,
+ NodeResponse response = ManageBucketMessage.send(targetMember, partitionedRegion, bucketId,
newBucketSize, forceCreation);
if (response.waitForAcceptance()) {
if (logger.isDebugEnabled()) {
logger.debug(
"createBucketOnMember: Bucket creation succeed for bucketId={} on member = {}",
- prRegion.bucketStringForLogs(bucketId), targetNMember);
+ partitionedRegion.bucketStringForLogs(bucketId), targetMember);
}
return ManageBucketRsp.YES;
}
+
if (logger.isDebugEnabled()) {
logger.debug(
"createBucketOnMember: Bucket creation failed for bucketId={} on member = {}",
- prRegion.bucketStringForLogs(bucketId), targetNMember);
+ partitionedRegion.bucketStringForLogs(bucketId), targetMember);
}
return response.rejectedDueToInitialization() ? ManageBucketRsp.NO_INITIALIZING
: ManageBucketRsp.NO;
+
} catch (PartitionOfflineException e) {
throw e;
} catch (VirtualMachineError err) {
@@ -1119,21 +1152,20 @@ public class PRHARedundancyProvider {
if (e instanceof ForceReattemptException) {
// no log needed
} else {
- logger.warn("Exception creating partition on " +
- targetNMember,
- e);
+ logger.warn("Exception creating partition on {}", targetMember, e);
}
return ManageBucketRsp.NO;
}
}
- final PartitionedRegionDataStore prDS = prRegion.getDataStore();
- boolean bucketManaged = prDS != null && prDS.handleManageBucketRequest(bucketId,
- newBucketSize, prRegion.getMyId(), forceCreation);
+
+ PartitionedRegionDataStore dataStore = partitionedRegion.getDataStore();
+ boolean bucketManaged = dataStore != null && dataStore.handleManageBucketRequest(bucketId,
+ newBucketSize, partitionedRegion.getMyId(), forceCreation);
if (!bucketManaged) {
if (logger.isDebugEnabled()) {
logger.debug(
"createBucketOnMember: Local data store not able to accommodate the data for bucketId={}",
- prRegion.bucketStringForLogs(bucketId));
+ partitionedRegion.bucketStringForLogs(bucketId));
}
}
return ManageBucketRsp.valueOf(bucketManaged);
@@ -1151,10 +1183,11 @@ public class PRHARedundancyProvider {
Collection<InternalDistributedMember> candidates,
Collection<InternalDistributedMember> alreadyUsed, int bucketId, String prName) {
Assert.assertTrue(prName != null);
- PartitionedRegion colocatedRegion = ColocationHelper.getColocatedRegion(prRegion);
- Region<?, ?> prRoot = PartitionedRegionHelper.getPRRoot(prRegion.getCache());
+ PartitionedRegion colocatedRegion = ColocationHelper.getColocatedRegion(partitionedRegion);
+ Region<?, ?> prRoot = PartitionedRegionHelper.getPRRoot(partitionedRegion.getCache());
PartitionRegionConfig config =
- (PartitionRegionConfig) prRoot.get(prRegion.getRegionIdentifier());
+ (PartitionRegionConfig) prRoot.get(partitionedRegion.getRegionIdentifier());
+
if (!config.isColocationComplete()) {
throw new IllegalStateException(
"Cannot create buckets, as colocated regions are not configured to be at the same nodes.");
@@ -1168,12 +1201,14 @@ public class PRHARedundancyProvider {
}
return primary;
}
+
Set<InternalDistributedMember> bucketOwnersSet = advisor.getBucketOwners(bucketId);
bucketOwnersSet.retainAll(candidates);
Collection<InternalDistributedMember> members = new ArrayList<>(bucketOwnersSet);
if (members.isEmpty()) {
return null;
}
+
return getPreferredDataStore(members, alreadyUsed);
}
@@ -1183,20 +1218,20 @@ public class PRHARedundancyProvider {
* Under concurrent access, the data that this method uses, may be somewhat volatile, note that
* createBucketAtomically synchronizes to enhance the consistency of the data used in this method.
*
- * @param candidates ArrayList of InternalDistributedMember, potential datastores
+ * @param candidates collection of InternalDistributedMember, potential datastores
* @param alreadyUsed data stores already in use
* @return a member with the fewest buckets or null if no datastores
*/
private InternalDistributedMember getPreferredDataStore(
Collection<InternalDistributedMember> candidates,
- final Collection<InternalDistributedMember> alreadyUsed) {
+ Collection<InternalDistributedMember> alreadyUsed) {
// has a primary already been chosen?
- final boolean forPrimary = alreadyUsed.isEmpty();
+ boolean forPrimary = alreadyUsed.isEmpty();
if (forPrimary && getForceLocalPrimaries()) {
- PartitionedRegionDataStore myDS = prRegion.getDataStore();
- if (myDS != null) {
- return prRegion.getMyId();
+ PartitionedRegionDataStore localDataStore = partitionedRegion.getDataStore();
+ if (localDataStore != null) {
+ return partitionedRegion.getMyId();
}
}
@@ -1206,26 +1241,28 @@ public class PRHARedundancyProvider {
Assert.assertTrue(candidates.size() > 1);
// Convert peers to DataStoreBuckets
- ArrayList<DataStoreBuckets> stores = prRegion.getRegionAdvisor()
+ List<DataStoreBuckets> stores = partitionedRegion.getRegionAdvisor()
.adviseFilteredDataStores(new HashSet<>(candidates));
- final DistributionManager dm = prRegion.getDistributionManager();
+ DistributionManager dm = partitionedRegion.getDistributionManager();
+
// Add local member as a candidate, if appropriate
- InternalDistributedMember moi = dm.getId();
- PartitionedRegionDataStore myDS = prRegion.getDataStore();
- if (myDS != null && candidates.contains(moi)) {
- int bucketCount = myDS.getBucketsManaged();
- int priCount = myDS.getNumberOfPrimaryBucketsManaged();
- int localMaxMemory = prRegion.getLocalMaxMemory();
- stores.add(new DataStoreBuckets(moi, bucketCount, priCount, localMaxMemory));
+ InternalDistributedMember localMember = dm.getId();
+ PartitionedRegionDataStore localDataStore = partitionedRegion.getDataStore();
+ if (localDataStore != null && candidates.contains(localMember)) {
+ int bucketCount = localDataStore.getBucketsManaged();
+ int priCount = localDataStore.getNumberOfPrimaryBucketsManaged();
+ int localMaxMemory = partitionedRegion.getLocalMaxMemory();
+ stores.add(new DataStoreBuckets(localMember, bucketCount, priCount, localMaxMemory));
}
+
if (stores.isEmpty()) {
return null;
}
// ---------------------------------------------
// Calculate all hosts who already have this bucket
- final Set<InternalDistributedMember> existingHosts = new HashSet<>();
+ Collection<InternalDistributedMember> existingHosts = new HashSet<>();
for (InternalDistributedMember mem : alreadyUsed) {
existingHosts.addAll(dm.getMembersInSameZone(mem));
}
@@ -1244,16 +1281,17 @@ public class PRHARedundancyProvider {
}
// Look for least loaded
- float metric1;
- float metric2;
+ float load1;
+ float load2;
if (forPrimary) {
- metric1 = d1.numPrimaries() / (float) d1.localMaxMemoryMB();
- metric2 = d2.numPrimaries() / (float) d2.localMaxMemoryMB();
+ load1 = d1.numPrimaries() / (float) d1.localMaxMemoryMB();
+ load2 = d2.numPrimaries() / (float) d2.localMaxMemoryMB();
} else {
- metric1 = d1.numBuckets() / (float) d1.localMaxMemoryMB();
- metric2 = d2.numBuckets() / (float) d2.localMaxMemoryMB();
+ load1 = d1.numBuckets() / (float) d1.localMaxMemoryMB();
+ load2 = d2.numBuckets() / (float) d2.localMaxMemoryMB();
}
- int result = Float.compare(metric1, metric2);
+
+ int result = Float.compare(load1, load2);
if (result == 0) {
// if they have the same load, choose the member with the higher localMaxMemory
result = d2.localMaxMemoryMB() - d1.localMaxMemoryMB();
@@ -1265,6 +1303,7 @@ public class PRHARedundancyProvider {
// First step is to sort datastores first by those whose hosts don't
// hold this bucket, and then secondarily by loading.
stores.sort(comparator);
+
if (logger.isDebugEnabled()) {
logger.debug(fancyFormatBucketAllocation("Sorted ", stores, existingHosts));
}
@@ -1275,7 +1314,7 @@ public class PRHARedundancyProvider {
List<DataStoreBuckets> bestStores = new ArrayList<>();
bestStores.add(bestDataStore);
- final boolean allStoresInUse = alreadyUsed.contains(bestDataStore.memberId());
+ boolean allStoresInUse = alreadyUsed.contains(bestDataStore.memberId());
// ---------------------------------------------
// Collect all of the other hosts in this sorted list that are as good as the very first one.
@@ -1291,6 +1330,7 @@ public class PRHARedundancyProvider {
}
bestStores.add(aDataStore);
}
+
if (logger.isDebugEnabled()) {
logger.debug(fancyFormatBucketAllocation("Best Stores ", bestStores, existingHosts));
}
@@ -1303,8 +1343,8 @@ public class PRHARedundancyProvider {
// Pick one (at random)
chosen = PartitionedRegion.RANDOM.nextInt(bestStores.size());
}
- DataStoreBuckets aDataStore = bestStores.get(chosen);
- return aDataStore.memberId();
+
+ return bestStores.get(chosen).memberId();
}
/**
@@ -1312,7 +1352,7 @@ public class PRHARedundancyProvider {
* redundancy of existing buckets
*/
void startRedundancyRecovery() {
- prRegion.getRegionAdvisor().addMembershipListener(new PRMembershipListener());
+ partitionedRegion.getRegionAdvisor().addMembershipListener(new PRMembershipListener());
scheduleRedundancyRecovery(null);
}
@@ -1339,19 +1379,20 @@ public class PRHARedundancyProvider {
if (prefix != null) {
logStr.append(prefix);
}
- logStr.append("Bucket Allocation for prId=").append(prRegion.getPRId()).append(":").append(
- lineSeparator());
+ logStr.append("Bucket Allocation for prId=").append(partitionedRegion.getPRId()).append(":");
+ logStr.append(lineSeparator());
+
for (Object dataStore : dataStores) {
- DataStoreBuckets dsb = (DataStoreBuckets) dataStore;
- logStr.append(dsb.memberId()).append(": ");
- if (existingStores.contains(dsb.memberId())) {
+ DataStoreBuckets buckets = (DataStoreBuckets) dataStore;
+ logStr.append(buckets.memberId()).append(": ");
+ if (existingStores.contains(buckets.memberId())) {
logStr.append("+");
} else {
logStr.append("-");
}
- logStr.append(dsb.numPrimaries());
+ logStr.append(buckets.numPrimaries());
logStr.append("/");
- logStr.append(dsb.numBuckets() - dsb.numPrimaries());
+ logStr.append(buckets.numBuckets() - buckets.numPrimaries());
logStr.append(lineSeparator());
}
@@ -1372,15 +1413,15 @@ public class PRHARedundancyProvider {
}
// Revisit region advisor, get current bucket stores.
- final Set<InternalDistributedMember> availableMembers = getAllStores(partitionName);
+ Set<InternalDistributedMember> availableMembers = getAllStores(partitionName);
- for (Iterator<InternalDistributedMember> itr = members.iterator(); itr.hasNext();) {
- InternalDistributedMember node = itr.next();
+ for (Iterator<InternalDistributedMember> iterator = members.iterator(); iterator.hasNext();) {
+ InternalDistributedMember node = iterator.next();
if (!availableMembers.contains(node)) {
if (logger.isDebugEnabled()) {
logger.debug("verifyBucketNodes: removing member {}", node);
}
- itr.remove();
+ iterator.remove();
Assert.assertTrue(!members.contains(node), "return value does not contain " + node);
}
}
@@ -1391,32 +1432,33 @@ public class PRHARedundancyProvider {
*/
private void scheduleRedundancyRecovery(Object failedMemberId) {
// note: isStartup is true even when not starting
- final boolean isStartup = failedMemberId == null;
- final long delay;
- final boolean movePrimaries;
+ boolean isStartup = failedMemberId == null;
+ long delay;
+ boolean movePrimaries;
if (isStartup) {
- delay = prRegion.getPartitionAttributes().getStartupRecoveryDelay();
+ delay = partitionedRegion.getPartitionAttributes().getStartupRecoveryDelay();
movePrimaries = !Boolean
.getBoolean(GEMFIRE_PREFIX + "DISABLE_MOVE_PRIMARIES_ON_STARTUP");
} else {
- delay = prRegion.getPartitionAttributes().getRecoveryDelay();
+ delay = partitionedRegion.getPartitionAttributes().getRecoveryDelay();
movePrimaries = false;
}
+
final boolean requiresRedundancyRecovery = delay >= 0;
if (!requiresRedundancyRecovery) {
return;
}
- if (!prRegion.isDataStore()) {
+ if (!partitionedRegion.isDataStore()) {
return;
}
+
Runnable task = new RecoveryRunnable(this) {
@Override
public void run2() {
try {
- final boolean isFixedPartitionedRegion =
- prRegion.isFixedPartitionedRegion();
+ boolean isFixedPartitionedRegion = partitionedRegion.isFixedPartitionedRegion();
// always replace offline data for fixed partitioned regions -
// this guarantees we create the buckets we are supposed to create on this node.
@@ -1429,10 +1471,10 @@ public class PRHARedundancyProvider {
director = new CompositeDirector(true, true, false, movePrimaries);
}
- final PartitionedRegionRebalanceOp rebalance = new PartitionedRegionRebalanceOp(
- prRegion, false, director, replaceOfflineData, false);
+ PartitionedRegionRebalanceOp rebalance = new PartitionedRegionRebalanceOp(
+ partitionedRegion, false, director, replaceOfflineData, false);
- long start = prRegion.getPrStats().startRecovery();
+ long start = partitionedRegion.getPrStats().startRecovery();
if (isFixedPartitionedRegion) {
rebalance.executeFPA();
@@ -1440,7 +1482,7 @@ public class PRHARedundancyProvider {
rebalance.execute();
}
- prRegion.getPrStats().endRecovery(start);
+ partitionedRegion.getPrStats().endRecovery(start);
recoveryFuture = null;
} catch (CancelException e) {
logger.debug("Cache closed while recovery in progress");
@@ -1457,10 +1499,10 @@ public class PRHARedundancyProvider {
try {
if (logger.isDebugEnabled()) {
if (isStartup) {
- logger.debug(prRegion + " scheduling redundancy recovery in {} ms", delay);
+ logger.debug("{} scheduling redundancy recovery in {} ms", partitionedRegion, delay);
} else {
logger.debug(
- "prRegion scheduling redundancy recovery after departure/crash/error in {} in {} ms",
+ "partitionedRegion scheduling redundancy recovery after departure/crash/error in {} in {} ms",
failedMemberId, delay);
}
}
@@ -1473,11 +1515,11 @@ public class PRHARedundancyProvider {
}
public boolean isRedundancyImpaired() {
- int numBuckets = prRegion.getPartitionAttributes().getTotalNumBuckets();
- int targetRedundancy = prRegion.getPartitionAttributes().getRedundantCopies();
+ int numBuckets = partitionedRegion.getPartitionAttributes().getTotalNumBuckets();
+ int targetRedundancy = partitionedRegion.getPartitionAttributes().getRedundantCopies();
for (int i = 0; i < numBuckets; i++) {
- int redundancy = prRegion.getRegionAdvisor().getBucketRedundancy(i);
+ int redundancy = partitionedRegion.getRegionAdvisor().getBucketRedundancy(i);
if (redundancy < targetRedundancy && redundancy != -1 || redundancy > targetRedundancy) {
return true;
}
@@ -1492,7 +1534,7 @@ public class PRHARedundancyProvider {
* the GatewaySender buckets for ParallelGatewaySender irrespective of whether colocation is
* complete or not.
*/
- PartitionedRegion leaderRegion = ColocationHelper.getLeaderRegion(prRegion);
+ PartitionedRegion leaderRegion = ColocationHelper.getLeaderRegion(partitionedRegion);
// Check if the leader region or some child shadow PR region is persistent
@@ -1504,26 +1546,24 @@ public class PRHARedundancyProvider {
return true;
}
- if (!ColocationHelper.checkMembersColocation(leaderRegion,
- leaderRegion.getDistributionManager().getDistributionManagerId())) {
+ if (!checkMembersColocation(leaderRegion, leaderRegion.getDistributionManager().getId())) {
if (logger.isDebugEnabled()) {
logger.debug("Skipping persistent recovery of {} because colocation is not complete for {}",
- prRegion, leaderRegion);
+ partitionedRegion, leaderRegion);
}
return false;
}
- final ProxyBucketRegion[] proxyBucketArray =
+ ProxyBucketRegion[] proxyBucketArray =
persistentLeader.getRegionAdvisor().getProxyBucketArray();
-
if (proxyBucketArray.length == 0) {
throw new IllegalStateException("Unexpected empty proxy bucket array");
}
-
for (ProxyBucketRegion proxyBucket : proxyBucketArray) {
proxyBucket.initializePersistenceAdvisor();
}
- Set<InternalDistributedMember> peers = prRegion.getRegionAdvisor().adviseGeneric();
+
+ Set<InternalDistributedMember> peers = partitionedRegion.getRegionAdvisor().adviseGeneric();
// We need to make sure here that we don't run into this race condition:
// 1) We get a membership view from member A
@@ -1532,8 +1572,8 @@ public class PRHARedundancyProvider {
// That will add B back into the set.
// This state flush will make sure that any membership changes
// That are in progress on the peers are finished.
- MembershipFlushRequest.send(peers, prRegion.getDistributionManager(),
- prRegion.getFullPath());
+ MembershipFlushRequest.send(peers, partitionedRegion.getDistributionManager(),
+ partitionedRegion.getFullPath());
List<ProxyBucketRegion> bucketsNotHostedLocally = new ArrayList<>(proxyBucketArray.length);
List<ProxyBucketRegion> bucketsHostedLocally = new ArrayList<>(proxyBucketArray.length);
@@ -1556,9 +1596,9 @@ public class PRHARedundancyProvider {
* return and pass the GII task to a separate thread pool.
*
*/
- for (final ProxyBucketRegion proxyBucket : proxyBucketArray) {
+ for (ProxyBucketRegion proxyBucket : proxyBucketArray) {
if (proxyBucket.getPersistenceAdvisor().wasHosting()) {
- final RecoveryRunnable recoveryRunnable = new RecoveryRunnable(this) {
+ RecoveryRunnable recoveryRunnable = new RecoveryRunnable(this) {
@Override
public void run() {
@@ -1577,6 +1617,7 @@ public class PRHARedundancyProvider {
proxyBucket.recoverFromDiskRecursively();
}
};
+
Thread recoveryThread =
new LoggingThread("Recovery thread for bucket " + proxyBucket.getName(),
false, recoveryRunnable);
@@ -1588,12 +1629,12 @@ public class PRHARedundancyProvider {
}
try {
- // try to recover the local buckets before the proxy buckets. This will allow us to detect any
- // ConflictingDataException before the proxy buckets update their membership view.
- for (final ProxyBucketRegion proxyBucket : bucketsHostedLocally) {
+ // try to recover the local buckets before the proxy buckets. This will allow us to detect
+ // any ConflictingDataException before the proxy buckets update their membership view.
+ for (ProxyBucketRegion proxyBucket : bucketsHostedLocally) {
proxyBucket.waitForPrimaryPersistentRecovery();
}
- for (final ProxyBucketRegion proxyBucket : bucketsNotHostedLocally) {
+ for (ProxyBucketRegion proxyBucket : bucketsNotHostedLocally) {
proxyBucket.recoverFromDiskRecursively();
}
} finally {
@@ -1605,11 +1646,13 @@ public class PRHARedundancyProvider {
return true;
}
- private void createPersistentBucketRecoverer(int proxyBuckets) {
- persistentBucketRecoverer = new PersistentBucketRecoverer(this, proxyBuckets);
+ @VisibleForTesting
+ void createPersistentBucketRecoverer(int proxyBuckets) {
+ persistentBucketRecoverer = persistentBucketRecovererFunction.apply(this, proxyBuckets);
persistentBucketRecoverer.startLoggingThread();
}
+ @VisibleForTesting
PersistentBucketRecoverer getPersistentBucketRecoverer() {
return persistentBucketRecoverer;
}
@@ -1623,15 +1666,15 @@ public class PRHARedundancyProvider {
* persistent.
*/
private PartitionedRegion getPersistentLeader() {
- PartitionedRegion leader = ColocationHelper.getLeaderRegion(prRegion);
+ PartitionedRegion leader = ColocationHelper.getLeaderRegion(partitionedRegion);
return findPersistentRegionRecursively(leader);
}
- private PartitionedRegion findPersistentRegionRecursively(PartitionedRegion pr) {
- if (pr.getDataPolicy().withPersistence()) {
- return pr;
+ private PartitionedRegion findPersistentRegionRecursively(PartitionedRegion partitionedRegion) {
+ if (partitionedRegion.getDataPolicy().withPersistence()) {
+ return partitionedRegion;
}
- for (PartitionedRegion child : ColocationHelper.getColocatedChildRegions(pr)) {
+ for (PartitionedRegion child : ColocationHelper.getColocatedChildRegions(partitionedRegion)) {
PartitionedRegion leader = findPersistentRegionRecursively(child);
if (leader != null) {
return leader;
@@ -1641,11 +1684,11 @@ public class PRHARedundancyProvider {
}
void scheduleCreateMissingBuckets() {
- if (prRegion.getColocatedWith() != null
- && ColocationHelper.isColocationComplete(prRegion)) {
+ if (partitionedRegion.getColocatedWith() != null
+ && ColocationHelper.isColocationComplete(partitionedRegion)) {
Runnable task = new CreateMissingBucketsTask(this);
final InternalResourceManager resourceManager =
- prRegion.getGemFireCache().getInternalResourceManager();
+ partitionedRegion.getGemFireCache().getInternalResourceManager();
resourceManager.getExecutor().execute(task);
}
}
@@ -1668,9 +1711,8 @@ public class PRHARedundancyProvider {
* @param loadProbe the LoadProbe to use
* @return PartitionRegionInfo for the partitioned region
*/
- public InternalPRInfo buildPartitionedRegionInfo(final boolean internal,
- final LoadProbe loadProbe) {
- final PartitionedRegion pr = prRegion;
+ public InternalPRInfo buildPartitionedRegionInfo(boolean internal, LoadProbe loadProbe) {
+ PartitionedRegion pr = partitionedRegion;
if (pr == null) {
return null;
@@ -1683,7 +1725,7 @@ public class PRHARedundancyProvider {
int configuredRedundantCopies = pr.getRedundantCopies();
int actualRedundantCopies = pr.getRedundancyTracker().getActualRedundancy();
- final PartitionedRegionDataStore ds = pr.getDataStore();
+ PartitionedRegionDataStore dataStore = pr.getDataStore();
Set<InternalDistributedMember> datastores = pr.getRegionAdvisor().adviseDataStore();
@@ -1691,7 +1733,7 @@ public class PRHARedundancyProvider {
OfflineMemberDetails offlineMembers = null;
boolean fetchOfflineMembers = false;
- if (ds != null) {
+ if (dataStore != null) {
memberDetails.add(buildPartitionMemberDetails(internal, loadProbe));
offlineMembers = fetchOfflineMembers();
} else {
@@ -1719,13 +1761,13 @@ public class PRHARedundancyProvider {
* Retrieve the set of members which are currently offline for all buckets.
*/
public OfflineMemberDetailsImpl fetchOfflineMembers() {
- ProxyBucketRegion[] proxyBuckets = prRegion.getRegionAdvisor().getProxyBucketArray();
+ ProxyBucketRegion[] proxyBuckets = partitionedRegion.getRegionAdvisor().getProxyBucketArray();
Set<PersistentMemberID>[] offlineMembers = new Set[proxyBuckets.length];
for (int i = 0; i < proxyBuckets.length; i++) {
- ProxyBucketRegion proxy = proxyBuckets[i];
- if (prRegion.getDataPolicy().withPersistence()) {
+ ProxyBucketRegion proxyBucket = proxyBuckets[i];
+ if (partitionedRegion.getDataPolicy().withPersistence()) {
Set<PersistentMemberID> persistedMembers =
- proxy.getPersistenceAdvisor().getMissingMembers();
+ proxyBucket.getPersistenceAdvisor().getMissingMembers();
if (persistedMembers == null) {
persistedMembers = Collections.emptySet();
}
@@ -1744,12 +1786,12 @@ public class PRHARedundancyProvider {
* @param loadProbe the LoadProbe to use
* @return PartitionMemberDetails for the local member
*/
- public InternalPartitionDetails buildPartitionMemberDetails(final boolean internal,
- final LoadProbe loadProbe) {
- final PartitionedRegion pr = prRegion;
+ public InternalPartitionDetails buildPartitionMemberDetails(boolean internal,
+ LoadProbe loadProbe) {
+ final PartitionedRegion pr = partitionedRegion;
- PartitionedRegionDataStore ds = pr.getDataStore();
- if (ds == null) {
+ PartitionedRegionDataStore dataStore = pr.getDataStore();
+ if (dataStore == null) {
return null;
}
@@ -1758,11 +1800,13 @@ public class PRHARedundancyProvider {
int configuredBucketCount = pr.getTotalNumberOfBuckets();
long[] bucketSizes = new long[configuredBucketCount];
+
// key: bid, value: size
- Map<Integer, Integer> bucketSizeMap = ds.getSizeLocally();
+ Map<Integer, Integer> bucketSizeMap = dataStore.getSizeLocally();
+
for (Map.Entry<Integer, Integer> me : bucketSizeMap.entrySet()) {
int bid = me.getKey();
- long bucketSize = ds.getBucketSize(bid);
+ long bucketSize = dataStore.getBucketSize(bid);
bucketSizes[bid] = bucketSize;
size += bucketSize;
}
@@ -1774,11 +1818,12 @@ public class PRHARedundancyProvider {
PRLoad prLoad = loadProbe.getLoad(pr);
localDetails =
new PartitionMemberInfoImpl(localMember, pr.getLocalMaxMemory() * 1024L * 1024L, size,
- ds.getBucketsManaged(), ds.getNumberOfPrimaryBucketsManaged(), prLoad, bucketSizes);
+ dataStore.getBucketsManaged(), dataStore.getNumberOfPrimaryBucketsManaged(), prLoad,
+ bucketSizes);
} else {
localDetails =
new PartitionMemberInfoImpl(localMember, pr.getLocalMaxMemory() * 1024L * 1024L, size,
- ds.getBucketsManaged(), ds.getNumberOfPrimaryBucketsManaged());
+ dataStore.getBucketsManaged(), dataStore.getNumberOfPrimaryBucketsManaged());
}
return localDetails;
}
@@ -1794,7 +1839,7 @@ public class PRHARedundancyProvider {
}
List<PartitionedRegion> colocatedRegions =
- ColocationHelper.getColocatedChildRegions(prRegion);
+ ColocationHelper.getColocatedChildRegions(partitionedRegion);
for (PartitionedRegion child : colocatedRegions) {
child.getRedundancyProvider().waitForPersistentBucketRecoveryOrClose();
}
@@ -1811,7 +1856,7 @@ public class PRHARedundancyProvider {
}
boolean isPersistentRecoveryComplete() {
- if (!ColocationHelper.checkMembersColocation(prRegion, prRegion.getMyId())) {
+ if (!checkMembersColocation(partitionedRegion, partitionedRegion.getMyId())) {
return false;
}
@@ -1821,7 +1866,7 @@ public class PRHARedundancyProvider {
}
Map<String, PartitionedRegion> colocatedRegions =
- ColocationHelper.getAllColocationRegions(prRegion);
+ ColocationHelper.getAllColocationRegions(partitionedRegion);
for (PartitionedRegion region : colocatedRegions.values()) {
PRHARedundancyProvider redundancyProvider = region.getRedundancyProvider();
@@ -1834,7 +1879,7 @@ public class PRHARedundancyProvider {
}
private ThreadsMonitoring getThreadMonitorObj() {
- DistributionManager distributionManager = prRegion.getDistributionManager();
+ DistributionManager distributionManager = partitionedRegion.getDistributionManager();
if (distributionManager != null) {
return distributionManager.getThreadMonitoring();
}
@@ -1845,12 +1890,13 @@ public class PRHARedundancyProvider {
* Monitors distributed membership for a given bucket
*/
private class BucketMembershipObserver implements MembershipListener {
+
private final Bucket bucketToMonitor;
private final AtomicInteger arrivals = new AtomicInteger(0);
private final AtomicBoolean departures = new AtomicBoolean(false);
- private BucketMembershipObserver(Bucket b) {
- bucketToMonitor = b;
+ private BucketMembershipObserver(Bucket bucketToMonitor) {
+ this.bucketToMonitor = bucketToMonitor;
}
private BucketMembershipObserver beginMonitoring() {
@@ -1897,8 +1943,8 @@ public class PRHARedundancyProvider {
* @param expectedOwners the list of owners used when a departure is detected
* @return if no problematic departures are detected, the primary
*/
- private BucketMembershipObserverResults waitForOwnersGetPrimary(final int expectedCount,
- final Collection<InternalDistributedMember> expectedOwners, String partitionName)
+ private BucketMembershipObserverResults waitForOwnersGetPrimary(int expectedCount,
+ Collection<InternalDistributedMember> expectedOwners, String partitionName)
throws InterruptedException {
boolean problematicDeparture = false;
synchronized (this) {
@@ -1912,9 +1958,11 @@ public class PRHARedundancyProvider {
if (expectedOwners.isEmpty()) {
problematicDeparture = true; // need to pick new victims
}
+
// need to pick new victims
arrivals.set(expectedOwners.size());
departures.set(false);
+
if (problematicDeparture) {
if (logger.isDebugEnabled()) {
logger.debug("Bucket observer found departed members - retrying");
@@ -1929,26 +1977,29 @@ public class PRHARedundancyProvider {
// success!
break;
}
+
if (logger.isDebugEnabled()) {
logger.debug("Waiting for bucket {} to finish being created",
- prRegion.bucketStringForLogs(bucketToMonitor.getId()));
+ partitionedRegion.bucketStringForLogs(bucketToMonitor.getId()));
}
- prRegion.checkReadiness();
- final int creationWaitMillis = 5 * 1000;
+ partitionedRegion.checkReadiness();
+
+ int creationWaitMillis = 5 * 1000;
wait(creationWaitMillis);
if (oldArrivals == arrivals.get() && oldDepartures == departures.get()) {
logger.warn(
"Time out waiting {} ms for creation of bucket for partitioned region {}. Members requested to create the bucket are: {}",
- new Object[] {creationWaitMillis, prRegion.getFullPath(),
- expectedOwners});
+ creationWaitMillis, partitionedRegion.getFullPath(), expectedOwners);
}
}
}
+
if (problematicDeparture) {
return new BucketMembershipObserverResults(true, null);
}
+
InternalDistributedMember primary = bucketToMonitor.getBucketAdvisor().getPrimary();
if (primary == null) {
/*
@@ -1965,11 +2016,12 @@ public class PRHARedundancyProvider {
* This class extends MembershipListener to perform cleanup when a node leaves DistributedSystem.
*/
private class PRMembershipListener implements MembershipListener {
+
@Override
public void memberDeparted(DistributionManager distributionManager,
- final InternalDistributedMember id, final boolean crashed) {
+ InternalDistributedMember id, boolean crashed) {
try {
- DistributedMember member = prRegion.getSystem().getDistributedMember();
+ DistributedMember member = partitionedRegion.getSystem().getDistributedMember();
if (logger.isDebugEnabled()) {
logger.debug(
"MembershipListener invoked on DistributedMember = {} for failed memberId = {}",
@@ -1977,27 +2029,29 @@ public class PRHARedundancyProvider {
id);
}
- if (!prRegion.isCacheClosing() && !prRegion.isDestroyed() && !member.equals(id)) {
+ if (!partitionedRegion.isCacheClosing() && !partitionedRegion.isDestroyed()
+ && !member.equals(id)) {
Runnable postRecoveryTask = null;
// Only schedule redundancy recovery if this not a fixed PR.
- if (!prRegion.isFixedPartitionedRegion()) {
+ if (!partitionedRegion.isFixedPartitionedRegion()) {
postRecoveryTask = () -> {
// After the metadata has been cleaned, recover redundancy.
scheduleRedundancyRecovery(id);
};
}
// Schedule clean up the metadata for the failed member.
- PartitionedRegionHelper.cleanUpMetaDataForRegion(prRegion.getCache(),
- prRegion.getRegionIdentifier(), id, postRecoveryTask);
+ PartitionedRegionHelper.cleanUpMetaDataForRegion(partitionedRegion.getCache(),
+ partitionedRegion.getRegionIdentifier(), id, postRecoveryTask);
}
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
// ignore
}
}
}
private static class ManageBucketRsp {
+
@Immutable
private static final ManageBucketRsp NO = new ManageBucketRsp("NO");
@Immutable
@@ -2044,6 +2098,7 @@ public class PRHARedundancyProvider {
}
private static class ArrayListWithClearState<T> extends ArrayList<T> {
+
private static final long serialVersionUID = 1L;
private volatile boolean wasCleared;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 476bc81..953eada 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -768,7 +768,7 @@ public class PartitionedRegion extends LocalRegion
this.distAdvisor = RegionAdvisor.createRegionAdvisor(this);
senderIdMonitor = createSenderIdMonitor();
// Warning: potential early escape of instance
- this.redundancyProvider = new PRHARedundancyProvider(this);
+ this.redundancyProvider = new PRHARedundancyProvider(this, cache.getInternalResourceManager());
// localCacheEnabled = ra.getPartitionAttributes().isLocalCacheEnabled();
// This is to make sure that local-cache get and put works properly.
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
index d16a7af..a7bd338 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
@@ -15,7 +15,6 @@
package org.apache.geode.internal.cache;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@@ -24,60 +23,76 @@ import static org.mockito.Mockito.when;
import java.util.HashSet;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.mockito.quality.Strictness;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.partitioned.InternalPRInfo;
import org.apache.geode.internal.cache.partitioned.LoadProbe;
import org.apache.geode.internal.cache.partitioned.PersistentBucketRecoverer;
+import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
public class PRHARedundancyProviderTest {
+ private InternalCache cache;
private PartitionedRegion partitionedRegion;
+ private InternalResourceManager resourceManager;
+
private PRHARedundancyProvider prHaRedundancyProvider;
+ @Rule
+ public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
+
@Before
public void setUp() {
- partitionedRegion = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS);
- InternalCache cache = mock(InternalCache.class);
- DistributedRegion rootRegion = mock(DistributedRegion.class);
-
- when(partitionedRegion.getCache()).thenReturn(cache);
- when(cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, true)).thenReturn(rootRegion);
-
- prHaRedundancyProvider = spy(new PRHARedundancyProvider(partitionedRegion));
+ cache = mock(InternalCache.class);
+ partitionedRegion = mock(PartitionedRegion.class);
+ resourceManager = mock(InternalResourceManager.class);
}
@Test
public void waitForPersistentBucketRecoveryProceedsWhenPersistentBucketRecovererLatchIsNotSet() {
- PersistentBucketRecoverer recoverer = mock(PersistentBucketRecoverer.class);
- when(prHaRedundancyProvider.getPersistentBucketRecoverer()).thenReturn(recoverer);
+ prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager,
+ (a, b) -> mock(PersistentBucketRecoverer.class));
prHaRedundancyProvider.waitForPersistentBucketRecovery();
}
@Test
public void waitForPersistentBucketRecoveryProceedsAfterLatchCountDown() {
- PersistentBucketRecoverer recoverer =
- spy(new PersistentBucketRecoverer(prHaRedundancyProvider, 1));
- when(prHaRedundancyProvider.getPersistentBucketRecoverer()).thenReturn(recoverer);
+ when(cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, true))
+ .thenReturn(mock(DistributedRegion.class));
+ when(partitionedRegion.getCache()).thenReturn(cache);
+ when(partitionedRegion.getDataPolicy()).thenReturn(DataPolicy.PARTITION);
+ when(partitionedRegion.getPartitionAttributes()).thenReturn(mock(PartitionAttributes.class));
+ prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager,
+ (a, b) -> spy(new ThreadlessPersistentBucketRecoverer(a, b)));
+ prHaRedundancyProvider.createPersistentBucketRecoverer(1);
prHaRedundancyProvider.getPersistentBucketRecoverer().countDown();
prHaRedundancyProvider.waitForPersistentBucketRecovery();
- verify(recoverer).await();
+ verify(prHaRedundancyProvider.getPersistentBucketRecoverer()).await();
}
@Test
public void buildPartitionedRegionInfo() {
- when(partitionedRegion.getRegionAdvisor().adviseDataStore()).thenReturn(new HashSet<>());
- when(partitionedRegion.getRegionAdvisor().getProxyBucketArray())
- .thenReturn(new ProxyBucketRegion[] {});
-
- when(partitionedRegion.getTotalNumberOfBuckets()).thenReturn(42);
- when(partitionedRegion.getRegionAdvisor().getCreatedBucketsCount()).thenReturn(17);
+ prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager,
+ (a, b) -> mock(PersistentBucketRecoverer.class));
+ when(partitionedRegion.getRedundancyTracker())
+ .thenReturn(mock(PartitionedRegionRedundancyTracker.class));
+ when(partitionedRegion.getRedundancyTracker().getActualRedundancy()).thenReturn(33);
when(partitionedRegion.getRedundancyTracker().getLowRedundancyBuckets()).thenReturn(3);
when(partitionedRegion.getRedundantCopies()).thenReturn(12);
- when(partitionedRegion.getRedundancyTracker().getActualRedundancy()).thenReturn(33);
+ when(partitionedRegion.getRegionAdvisor()).thenReturn(mock(RegionAdvisor.class));
+ when(partitionedRegion.getRegionAdvisor().adviseDataStore()).thenReturn(new HashSet<>());
+ when(partitionedRegion.getRegionAdvisor().getCreatedBucketsCount()).thenReturn(17);
+ when(partitionedRegion.getTotalNumberOfBuckets()).thenReturn(42);
InternalPRInfo internalPRInfo =
prHaRedundancyProvider.buildPartitionedRegionInfo(false, mock(LoadProbe.class));
@@ -88,4 +103,17 @@ public class PRHARedundancyProviderTest {
assertThat(internalPRInfo.getConfiguredRedundantCopies()).isEqualTo(12);
assertThat(internalPRInfo.getActualRedundantCopies()).isEqualTo(33);
}
+
+ private static class ThreadlessPersistentBucketRecoverer extends PersistentBucketRecoverer {
+
+ public ThreadlessPersistentBucketRecoverer(
+ PRHARedundancyProvider prhaRedundancyProvider, int proxyBuckets) {
+ super(prhaRedundancyProvider, proxyBuckets);
+ }
+
+ @Override
+ public void startLoggingThread() {
+ // do nothing
+ }
+ }
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecovererTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecovererTest.java
index 12168cd..a45b72b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecovererTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecovererTest.java
@@ -27,21 +27,27 @@ import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PRHARedundancyProvider;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.internal.cache.control.InternalResourceManager;
public class PersistentBucketRecovererTest {
+
private PartitionedRegion partitionedRegion;
private InternalCache cache;
private DistributedRegion root;
private PRHARedundancyProvider provider;
+ private InternalResourceManager resourceManager;
@Before
public void setUp() {
partitionedRegion = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS);
+ resourceManager = mock(InternalResourceManager.class);
cache = mock(InternalCache.class);
root = mock(DistributedRegion.class);
+
when(partitionedRegion.getCache()).thenReturn(cache);
when(cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, true)).thenReturn(root);
- provider = new PRHARedundancyProvider(partitionedRegion);
+
+ provider = new PRHARedundancyProvider(partitionedRegion, resourceManager);
}
@Test