You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/10/11 02:57:49 UTC
[3/3] incubator-geode git commit: GEODE-1801: Change the logic to
increment NonSingleHopsCount
GEODE-1801: Change the logic to increment NonSingleHopsCount
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/9b6c10bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/9b6c10bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/9b6c10bc
Branch: refs/heads/feature/GEODE-1801
Commit: 9b6c10bc2f433b36546e6f3c22f39d3b20d880b5
Parents: febc634
Author: Udo Kohlmeyer <uk...@pivotal.io>
Authored: Tue Oct 11 13:57:06 2016 +1100
Committer: Udo Kohlmeyer <uk...@pivotal.io>
Committed: Tue Oct 11 13:57:06 2016 +1100
----------------------------------------------------------------------
.../client/internal/ClientMetadataService.java | 536 +++----
.../geode/internal/cache/GemFireCacheImpl.java | 1499 +++++++++---------
.../internal/cache/SingleHopStatsDUnitTest.java | 127 +-
3 files changed, 980 insertions(+), 1182 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9b6c10bc/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java
index c863d46..325322e 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java
@@ -16,94 +16,109 @@
*/
package org.apache.geode.cache.client.internal;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.SystemFailure;
-import org.apache.geode.cache.*;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.EntryOperation;
+import org.apache.geode.cache.FixedPartitionResolver;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.PartitionResolver;
+import org.apache.geode.cache.Region;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.ServerLocation;
-import org.apache.geode.internal.cache.*;
+import org.apache.geode.internal.cache.BucketServerLocation66;
+import org.apache.geode.internal.cache.EntryOperationImpl;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
-import org.apache.logging.log4j.Logger;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
/**
* Maintains {@link ClientPartitionAdvisor} for Partitioned Regions on servers
* Client operations will consult this service to identify the server locations
* on which the data for the client operation is residing
- *
- *
+ *
* @since GemFire 6.5
- *
*/
public final class ClientMetadataService {
private static final Logger logger = LogService.getLogger();
-
+
private final Cache cache;
-
+
private final Set<String> nonPRs = new HashSet<String>();
private boolean HONOUR_SERVER_GROUP_IN_PR_SINGLE_HOP = Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "PoolImpl.honourServerGroupsInPRSingleHop");
public static final int SIZE_BYTES_ARRAY_RECEIVED = 2;
-
+
public static final int INITIAL_VERSION = 0;
-
- /** random number generator used in pruning */
+
+ /**
+ * random number generator used in pruning
+ */
private final Random rand = new Random();
-
+
private volatile boolean isMetadataStable = true;
private boolean isMetadataRefreshed_TEST_ONLY = false;
-
+
private int refreshTaskCount = 0;
-
+
private Set<String> regionsBeingRefreshed = new HashSet<>();
-
+
private final Object fetchTaskCountLock = new Object();
-
+
public ClientMetadataService(Cache cache) {
this.cache = cache;
}
private final Map<String, ClientPartitionAdvisor> clientPRAdvisors = new ConcurrentHashMap<String, ClientPartitionAdvisor>();
private final Map<String, Set<ClientPartitionAdvisor>> colocatedPRAdvisors = new ConcurrentHashMap<String, Set<ClientPartitionAdvisor>>();
-
- private PartitionResolver getResolver(Region r, Object key,
- Object callbackArgument) {
+
+ private PartitionResolver getResolver(Region r, Object key, Object callbackArgument) {
// First choice is one associated with the region
final String regionFullPath = r.getFullPath();
- ClientPartitionAdvisor advisor = this
- .getClientPartitionAdvisor(regionFullPath);
+ ClientPartitionAdvisor advisor = this.getClientPartitionAdvisor(regionFullPath);
PartitionResolver result = null;
if (advisor != null) {
result = advisor.getPartitionResolver();
}
-
+
if (result != null) {
return result;
}
// Second is the key
if (key != null && key instanceof PartitionResolver) {
- return (PartitionResolver)key;
+ return (PartitionResolver) key;
}
// Third is the callback argument
- if (callbackArgument != null
- && callbackArgument instanceof PartitionResolver) {
- return (PartitionResolver)callbackArgument;
+ if (callbackArgument != null && callbackArgument instanceof PartitionResolver) {
+ return (PartitionResolver) callbackArgument;
}
// There is no resolver.
return null;
}
- public ServerLocation getBucketServerLocation(Region region,
- Operation operation, Object key, Object value, Object callbackArg) {
- ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(region.getFullPath());
+ public ServerLocation getBucketServerLocation(Region region, Operation operation, Object key, Object value, Object callbackArg) {
+ ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(region.getFullPath());
if (prAdvisor == null) {
return null;
}
@@ -116,32 +131,23 @@ public final class ClientMetadataService {
// client has not registered PartitionResolver
// Assuming even PR at server side is not using PartitionResolver
resolveKey = key;
- }
- else {
- entryOp = new EntryOperationImpl(region, operation, key,
- value, callbackArg);
+ } else {
+ entryOp = new EntryOperationImpl(region, operation, key, value, callbackArg);
resolveKey = resolver.getRoutingObject(entryOp);
if (resolveKey == null) {
- throw new IllegalStateException(
- LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL
- .toLocalizedString());
+ throw new IllegalStateException(LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL.toLocalizedString());
}
}
int bucketId;
if (resolver instanceof FixedPartitionResolver) {
if (entryOp == null) {
- entryOp = new EntryOperationImpl(region,
- Operation.FUNCTION_EXECUTION, key, null, null);
+ entryOp = new EntryOperationImpl(region, Operation.FUNCTION_EXECUTION, key, null, null);
}
- String partition = ((FixedPartitionResolver)resolver).getPartitionName(
- entryOp, prAdvisor.getFixedPartitionNames());
+ String partition = ((FixedPartitionResolver) resolver).getPartitionName(entryOp, prAdvisor.getFixedPartitionNames());
if (partition == null) {
Object[] prms = new Object[] { region.getName(), resolver };
- throw new IllegalStateException(
- LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL
- .toLocalizedString(prms));
- }
- else {
+ throw new IllegalStateException(LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL.toLocalizedString(prms));
+ } else {
bucketId = prAdvisor.assignFixedBucketId(region, partition, resolveKey);
if (bucketId == -1) {
// scheduleGetPRMetaData((LocalRegion)region);
@@ -149,21 +155,19 @@ public final class ClientMetadataService {
}
}
- }else {
+ } else {
bucketId = PartitionedRegionHelper.getHashKey(resolveKey, totalNumberOfBuckets);
}
-
- ServerLocation bucketServerLocation = getServerLocation(region, operation,
- bucketId);
+
+ ServerLocation bucketServerLocation = getServerLocation(region, operation, bucketId);
ServerLocation location = null;
- if (bucketServerLocation != null)
- location = new ServerLocation(bucketServerLocation.getHostName(),
- bucketServerLocation.getPort());
+ if (bucketServerLocation != null) {
+ location = new ServerLocation(bucketServerLocation.getHostName(), bucketServerLocation.getPort());
+ }
return location;
}
- private ServerLocation getServerLocation(Region region, Operation operation,
- int bucketId) {
+ private ServerLocation getServerLocation(Region region, Operation operation, int bucketId) {
final String regionFullPath = region.getFullPath();
ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(regionFullPath);
if (prAdvisor == null) {
@@ -172,60 +176,41 @@ public final class ClientMetadataService {
}
return null;
}
-
-// if (prAdvisor.getColocatedWith() != null) {
-// prAdvisor = this.getClientPartitionAdvisor(prAdvisor.getColocatedWith());
-// if (prAdvisor == null) {
-// if (this.logger.fineEnabled()) {
-// this.logger.fine(
-// "ClientMetadataService#getServerLocation : Region "
-// + regionFullPath + "prAdvisor does not exist.");
-// }
-// return null;
-// }
-// }
-
+
if (operation.isGet()) {
return prAdvisor.adviseServerLocation(bucketId);
- }
- else {
+ } else {
return prAdvisor.advisePrimaryServerLocation(bucketId);
}
}
- public Map<ServerLocation, HashSet> getServerToFilterMap(
- final Collection routingKeys, final Region region, boolean primaryMembersNeeded
- ) {
- return getServerToFilterMap(routingKeys, region, primaryMembersNeeded, false);
+ public Map<ServerLocation, HashSet> getServerToFilterMap(final Collection routingKeys, final Region region, boolean primaryMembersNeeded) {
+ return getServerToFilterMap(routingKeys, region, primaryMembersNeeded, false);
}
-
- public Map<ServerLocation, HashSet> getServerToFilterMap(
- final Collection routingKeys, final Region region, boolean primaryMembersNeeded,
- boolean bucketsAsFilter) {
+
+ public Map<ServerLocation, HashSet> getServerToFilterMap(final Collection routingKeys, final Region region, boolean primaryMembersNeeded, boolean bucketsAsFilter) {
final String regionFullPath = region.getFullPath();
ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(regionFullPath);
if (prAdvisor == null || prAdvisor.adviseRandomServerLocation() == null) {
- scheduleGetPRMetaData((LocalRegion)region, false);
+ scheduleGetPRMetaData((LocalRegion) region, false);
return null;
}
- HashMap<Integer, HashSet> bucketToKeysMap = groupByBucketOnClientSide(
- region, prAdvisor, routingKeys, bucketsAsFilter);
+ HashMap<Integer, HashSet> bucketToKeysMap = groupByBucketOnClientSide(region, prAdvisor, routingKeys, bucketsAsFilter);
HashMap<ServerLocation, HashSet> serverToKeysMap = new HashMap<ServerLocation, HashSet>();
- HashMap<ServerLocation, HashSet<Integer>> serverToBuckets = groupByServerToBuckets(
- prAdvisor, bucketToKeysMap.keySet(), primaryMembersNeeded);
-
- if(serverToBuckets == null){
+ HashMap<ServerLocation, HashSet<Integer>> serverToBuckets = groupByServerToBuckets(prAdvisor, bucketToKeysMap.keySet(), primaryMembersNeeded);
+
+ if (serverToBuckets == null) {
return null;
}
-
+
for (Map.Entry entry : serverToBuckets.entrySet()) {
- ServerLocation server = (ServerLocation)entry.getKey();
- HashSet<Integer> buckets = (HashSet)entry.getValue();
+ ServerLocation server = (ServerLocation) entry.getKey();
+ HashSet<Integer> buckets = (HashSet) entry.getValue();
for (Integer bucket : buckets) {
// use LinkedHashSet to maintain the order of keys
// the keys will be iterated several times
- LinkedHashSet keys = (LinkedHashSet)serverToKeysMap.get(server);
+ LinkedHashSet keys = (LinkedHashSet) serverToKeysMap.get(server);
if (keys == null) {
keys = new LinkedHashSet();
}
@@ -239,29 +224,28 @@ public final class ClientMetadataService {
return serverToKeysMap;
}
-
- public HashMap<ServerLocation, HashSet<Integer>> groupByServerToAllBuckets(Region region, boolean primaryOnly){
+
+ public HashMap<ServerLocation, HashSet<Integer>> groupByServerToAllBuckets(Region region, boolean primaryOnly) {
final String regionFullPath = region.getFullPath();
ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(regionFullPath);
if (prAdvisor == null || prAdvisor.adviseRandomServerLocation() == null) {
- scheduleGetPRMetaData((LocalRegion)region, false);
+ scheduleGetPRMetaData((LocalRegion) region, false);
return null;
}
int totalNumberOfBuckets = prAdvisor.getTotalNumBuckets();
HashSet<Integer> allBucketIds = new HashSet<Integer>();
- for(int i =0; i < totalNumberOfBuckets; i++){
+ for (int i = 0; i < totalNumberOfBuckets; i++) {
allBucketIds.add(i);
}
return groupByServerToBuckets(prAdvisor, allBucketIds, primaryOnly);
}
+
/**
* This function should make a map of server to buckets it is hosting.
* If for some bucket servers are not available due to mismatch in metadata
* it should fill up a random server for it.
*/
- private HashMap<ServerLocation, HashSet<Integer>> groupByServerToBuckets(
- ClientPartitionAdvisor prAdvisor, Set<Integer> bucketSet,
- boolean primaryOnly) {
+ private HashMap<ServerLocation, HashSet<Integer>> groupByServerToBuckets(ClientPartitionAdvisor prAdvisor, Set<Integer> bucketSet, boolean primaryOnly) {
if (primaryOnly) {
HashMap<ServerLocation, HashSet<Integer>> serverToBucketsMap = new HashMap<ServerLocation, HashSet<Integer>>();
for (Integer bucketId : bucketSet) {
@@ -285,16 +269,14 @@ public final class ClientMetadataService {
}
return serverToBucketsMap;
- }
- else {
+ } else {
return pruneNodes(prAdvisor, bucketSet);
}
}
-
-
- private HashMap<ServerLocation, HashSet<Integer>> pruneNodes(
- ClientPartitionAdvisor prAdvisor, Set<Integer> buckets) {
-
+
+
+ private HashMap<ServerLocation, HashSet<Integer>> pruneNodes(ClientPartitionAdvisor prAdvisor, Set<Integer> buckets) {
+
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("ClientMetadataService: The buckets to be pruned are: {}", buckets);
@@ -303,8 +285,7 @@ public final class ClientMetadataService {
HashMap<ServerLocation, HashSet<Integer>> prunedServerToBucketsMap = new HashMap<ServerLocation, HashSet<Integer>>();
for (Integer bucketId : buckets) {
- List<BucketServerLocation66> serversList = prAdvisor
- .adviseServerLocations(bucketId);
+ List<BucketServerLocation66> serversList = prAdvisor.adviseServerLocations(bucketId);
if (isDebugEnabled) {
logger.debug("ClientMetadataService: For bucketId {} the server list is {}", bucketId, serversList);
}
@@ -314,18 +295,17 @@ public final class ClientMetadataService {
//will cause us to use the non-single hop path.
return null;
}
-
+
if (isDebugEnabled) {
logger.debug("ClientMetadataService: The buckets owners of the bucket: {} are: {}", bucketId, serversList);
}
-
+
for (ServerLocation server : serversList) {
if (serverToBucketsMap.get(server) == null) {
HashSet<Integer> bucketSet = new HashSet<Integer>();
bucketSet.add(bucketId);
serverToBucketsMap.put(server, bucketSet);
- }
- else {
+ } else {
HashSet<Integer> bucketSet = serverToBucketsMap.get(server);
bucketSet.add(bucketId);
serverToBucketsMap.put(server, bucketSet);
@@ -342,10 +322,9 @@ public final class ClientMetadataService {
ServerLocation randomFirstServer = null;
if (serverToBucketsMap.isEmpty()) {
return null;
- }
- else {
+ } else {
int size = serverToBucketsMap.size();
- randomFirstServer = (ServerLocation)serverToBucketsMap.keySet().toArray()[rand.nextInt(size)];
+ randomFirstServer = (ServerLocation) serverToBucketsMap.keySet().toArray()[rand.nextInt(size)];
}
HashSet<Integer> bucketSet = serverToBucketsMap.get(randomFirstServer);
if (isDebugEnabled) {
@@ -356,21 +335,14 @@ public final class ClientMetadataService {
serverToBucketsMap.remove(randomFirstServer);
while (!currentBucketSet.equals(buckets)) {
- ServerLocation server = findNextServer(serverToBucketsMap.entrySet(),
- currentBucketSet);
+ ServerLocation server = findNextServer(serverToBucketsMap.entrySet(), currentBucketSet);
if (server == null) {
-// HashSet<Integer> rBuckets = prunedServerToBucketsMap
-// .get(randomFirstServer);
-// HashSet<Integer> remainingBuckets = new HashSet<Integer>(buckets);
-// remainingBuckets.removeAll(currentBucketSet);
-// rBuckets.addAll(remainingBuckets);
-// prunedServerToBucketsMap.put(randomFirstServer, rBuckets);
break;
}
-
+
HashSet<Integer> bucketSet2 = serverToBucketsMap.get(server);
bucketSet2.removeAll(currentBucketSet);
- if(bucketSet2.isEmpty()) {
+ if (bucketSet2.isEmpty()) {
serverToBucketsMap.remove(server);
continue;
}
@@ -381,22 +353,20 @@ public final class ClientMetadataService {
}
serverToBucketsMap.remove(server);
}
-
+
if (isDebugEnabled) {
logger.debug("ClientMetadataService: The final prunedServerToBucket calculated is : {}", prunedServerToBucketsMap);
}
-
+
return prunedServerToBucketsMap;
}
-
-
- private ServerLocation findNextServer(
- Set<Map.Entry<ServerLocation, HashSet<Integer>>> entrySet,
- HashSet<Integer> currentBucketSet) {
-
+
+
+ private ServerLocation findNextServer(Set<Map.Entry<ServerLocation, HashSet<Integer>>> entrySet, HashSet<Integer> currentBucketSet) {
+
ServerLocation server = null;
int max = -1;
- ArrayList<ServerLocation> nodesOfEqualSize = new ArrayList<ServerLocation>();
+ ArrayList<ServerLocation> nodesOfEqualSize = new ArrayList<ServerLocation>();
for (Map.Entry<ServerLocation, HashSet<Integer>> entry : entrySet) {
HashSet<Integer> buckets = new HashSet<Integer>();
buckets.addAll(entry.getValue());
@@ -407,30 +377,28 @@ public final class ClientMetadataService {
server = entry.getKey();
nodesOfEqualSize.clear();
nodesOfEqualSize.add(server);
- }
- else if (max == buckets.size()){
+ } else if (max == buckets.size()) {
nodesOfEqualSize.add(server);
}
}
-
+
//return node;
Random r = new Random();
- if(nodesOfEqualSize.size() > 0)
+ if (nodesOfEqualSize.size() > 0) {
return nodesOfEqualSize.get(r.nextInt(nodesOfEqualSize.size()));
-
- return null;
+ }
+
+ return null;
}
-
- private HashMap<Integer, HashSet> groupByBucketOnClientSide(Region region,
- ClientPartitionAdvisor prAdvisor, Collection routingKeys, boolean bucketsAsFilter) {
-
+
+ private HashMap<Integer, HashSet> groupByBucketOnClientSide(Region region, ClientPartitionAdvisor prAdvisor, Collection routingKeys, boolean bucketsAsFilter) {
+
HashMap<Integer, HashSet> bucketToKeysMap = new HashMap();
int totalNumberOfBuckets = prAdvisor.getTotalNumBuckets();
Iterator i = routingKeys.iterator();
while (i.hasNext()) {
- Object key = i.next();
- int bucketId = bucketsAsFilter ? ((Integer)key).intValue() :
- extractBucketID(region, prAdvisor, totalNumberOfBuckets, key);
+ Object key = i.next();
+ int bucketId = bucketsAsFilter ? ((Integer) key).intValue() : extractBucketID(region, prAdvisor, totalNumberOfBuckets, key);
HashSet bucketKeys = bucketToKeysMap.get(bucketId);
if (bucketKeys == null) {
bucketKeys = new HashSet(); // faster if this was an ArrayList
@@ -444,8 +412,7 @@ public final class ClientMetadataService {
return bucketToKeysMap;
}
- private int extractBucketID(Region region, ClientPartitionAdvisor prAdvisor,
- int totalNumberOfBuckets, Object key) {
+ private int extractBucketID(Region region, ClientPartitionAdvisor prAdvisor, int totalNumberOfBuckets, Object key) {
int bucketId = -1;
final PartitionResolver resolver = getResolver(region, key, null);
Object resolveKey;
@@ -454,101 +421,76 @@ public final class ClientMetadataService {
// client has not registered PartitionResolver
// Assuming even PR at server side is not using PartitionResolver
resolveKey = key;
- }
- else {
- entryOp = new EntryOperationImpl(region,
- Operation.FUNCTION_EXECUTION, key, null, null);
+ } else {
+ entryOp = new EntryOperationImpl(region, Operation.FUNCTION_EXECUTION, key, null, null);
resolveKey = resolver.getRoutingObject(entryOp);
if (resolveKey == null) {
- throw new IllegalStateException(
- LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL
- .toLocalizedString());
+ throw new IllegalStateException(LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL.toLocalizedString());
}
}
-
+
if (resolver instanceof FixedPartitionResolver) {
if (entryOp == null) {
- entryOp = new EntryOperationImpl(region,
- Operation.FUNCTION_EXECUTION, key, null, null);
+ entryOp = new EntryOperationImpl(region, Operation.FUNCTION_EXECUTION, key, null, null);
}
- String partition = ((FixedPartitionResolver)resolver).getPartitionName(
- entryOp, prAdvisor.getFixedPartitionNames());
+ String partition = ((FixedPartitionResolver) resolver).getPartitionName(entryOp, prAdvisor.getFixedPartitionNames());
if (partition == null) {
Object[] prms = new Object[] { region.getName(), resolver };
- throw new IllegalStateException(
- LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL
- .toLocalizedString(prms));
- }
- else {
- bucketId = prAdvisor.assignFixedBucketId(region, partition, resolveKey);
+ throw new IllegalStateException(LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL.toLocalizedString(prms));
+ } else {
+ bucketId = prAdvisor.assignFixedBucketId(region, partition, resolveKey);
// This bucketid can be -1 in some circumstances where we don't have information about
// all the partition on the server.
// Do proactive scheduling of metadata fetch
- if(bucketId == -1) {
- scheduleGetPRMetaData((LocalRegion)region, true);
+ if (bucketId == -1) {
+ scheduleGetPRMetaData((LocalRegion) region, true);
}
}
- }else{
+ } else {
bucketId = PartitionedRegionHelper.getHashKey(resolveKey, totalNumberOfBuckets);
}
return bucketId;
}
-
-
- public void scheduleGetPRMetaData(final LocalRegion region,
- final boolean isRecursive) {
- if(this.nonPRs.contains(region.getFullPath())){
+
+ public void scheduleGetPRMetaData(final LocalRegion region, final boolean isRecursive) {
+ if (this.nonPRs.contains(region.getFullPath())) {
return;
}
this.setMetadataStable(false);
region.getCachePerfStats().incNonSingleHopsCount();
if (isRecursive) {
- try {
- getClientPRMetadata(region);
- }
- catch (VirtualMachineError e) {
- SystemFailure.initiateFailure(e);
- throw e;
- }
- catch (Throwable e) {
- SystemFailure.checkFailure();
- if (logger.isDebugEnabled()) {
- logger.debug("An exception occurred while fetching metadata", e);
- }
- }
- }
- else {
- synchronized (fetchTaskCountLock){
+ getClientPRMetadataForRegion(region);
+ } else {
+ synchronized (fetchTaskCountLock) {
refreshTaskCount++;
}
- Runnable fetchTask = new Runnable() {
- @SuppressWarnings("synthetic-access")
- public void run() {
- try {
- getClientPRMetadata(region);
- }
- catch (VirtualMachineError e) {
- SystemFailure.initiateFailure(e);
- throw e;
- }
- catch (Throwable e) {
- SystemFailure.checkFailure();
- if (logger.isDebugEnabled()) {
- logger.debug("An exception occurred while fetching metadata", e);
- }
- }
- finally {
- synchronized (fetchTaskCountLock){
- refreshTaskCount--;
- }
+ SingleHopClientExecutor.submitTask(() -> {
+ try {
+ getClientPRMetadataForRegion(region);
+ } finally {
+ synchronized (fetchTaskCountLock) {
+ refreshTaskCount--;
}
}
- };
- SingleHopClientExecutor.submitTask(fetchTask);
+ });
}
}
-
+
+ private void getClientPRMetadataForRegion(final LocalRegion region) {
+ try {
+ getClientPRMetadata(region);
+ } catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ } catch (Throwable e) {
+ SystemFailure.checkFailure();
+ if (logger.isDebugEnabled()) {
+ logger.debug("An exception occurred while fetching metadata", e);
+ }
+ }
+ }
+
public final void getClientPRMetadata(LocalRegion region) {
final String regionFullPath = region.getFullPath();
ClientPartitionAdvisor advisor = null;
@@ -558,19 +500,16 @@ public final class ClientMetadataService {
if (region.clientMetaDataLock.tryLock()) {
try {
advisor = this.getClientPartitionAdvisor(regionFullPath);
- if (advisor==null) {
- advisor = GetClientPartitionAttributesOp
- .execute(pool, regionFullPath);
- if(advisor == null){
+ if (advisor == null) {
+ advisor = GetClientPartitionAttributesOp.execute(pool, regionFullPath);
+ if (advisor == null) {
this.nonPRs.add(regionFullPath);
return;
}
addClientPartitionAdvisor(regionFullPath, advisor);
- }
- else {
- if(advisor.getFixedPAMap() != null && !advisor.isFPAAttrsComplete()) {
- ClientPartitionAdvisor newAdvisor = GetClientPartitionAttributesOp
- .execute(pool, regionFullPath);
+ } else {
+ if (advisor.getFixedPAMap() != null && !advisor.isFPAAttrsComplete()) {
+ ClientPartitionAdvisor newAdvisor = GetClientPartitionAttributesOp.execute(pool, regionFullPath);
advisor.updateFixedPAMap(newAdvisor.getFixedPAMap());
}
}
@@ -579,39 +518,33 @@ public final class ClientMetadataService {
isMetadataRefreshed_TEST_ONLY = true;
GetClientPRMetaDataOp.execute(pool, regionFullPath, this);
region.getCachePerfStats().incMetaDataRefreshCount();
- }
- else {
- ClientPartitionAdvisor colocatedAdvisor = this.getClientPartitionAdvisor(colocatedWith);
- LocalRegion leaderRegion = (LocalRegion)region.getCache()
- .getRegion(colocatedWith);
- if (colocatedAdvisor == null) {
+ } else {
+ LocalRegion leaderRegion = (LocalRegion) region.getCache().getRegion(colocatedWith);
+ if (this.getClientPartitionAdvisor(colocatedWith) == null) {
scheduleGetPRMetaData(leaderRegion, true);
return;
- }
- else {
+ } else {
isMetadataRefreshed_TEST_ONLY = true;
GetClientPRMetaDataOp.execute(pool, colocatedWith, this);
leaderRegion.getCachePerfStats().incMetaDataRefreshCount();
}
}
- }
- finally {
+ } finally {
region.clientMetaDataLock.unlock();
}
}
}
-
- public void scheduleGetPRMetaData(final LocalRegion region,
- final boolean isRecursive, byte nwHopType) {
- if(this.nonPRs.contains(region.getFullPath())){
+
+ public void scheduleGetPRMetaData(final LocalRegion region, final boolean isRecursive, byte nwHopType) {
+ if (this.nonPRs.contains(region.getFullPath())) {
return;
}
ClientPartitionAdvisor advisor = this.getClientPartitionAdvisor(region.getFullPath());
- if(advisor!= null && advisor.getServerGroup().length()!= 0 && HONOUR_SERVER_GROUP_IN_PR_SINGLE_HOP){
+ if (advisor != null && advisor.getServerGroup().length() != 0 && HONOUR_SERVER_GROUP_IN_PR_SINGLE_HOP) {
if (logger.isDebugEnabled()) {
logger.debug("Scheduling metadata refresh: {} region: {}", nwHopType, region.getName());
}
- if( nwHopType == PartitionedRegion.NETWORK_HOP_TO_DIFFERENT_GROUP){
+ if (nwHopType == PartitionedRegion.NETWORK_HOP_TO_DIFFERENT_GROUP) {
return;
}
}
@@ -620,51 +553,27 @@ public final class ClientMetadataService {
return;
}
}
+ region.getCachePerfStats().incNonSingleHopsCount();
if (isRecursive) {
- region.getCachePerfStats().incNonSingleHopsCount();
- try {
- getClientPRMetadata(region);
- } catch (VirtualMachineError e) {
- SystemFailure.initiateFailure(e);
- throw e;
- } catch (Throwable e) {
- SystemFailure.checkFailure();
- if (logger.isDebugEnabled()) {
- logger.debug("An exception occurred while fetching metadata", e);
- }
- }
+ getClientPRMetadataForRegion(region);
} else {
synchronized (fetchTaskCountLock) {
if (regionsBeingRefreshed.contains(region.getFullPath())) {
return;
}
- region.getCachePerfStats().incNonSingleHopsCount();
regionsBeingRefreshed.add(region.getFullPath());
refreshTaskCount++;
}
- Runnable fetchTask = new Runnable() {
- @SuppressWarnings("synthetic-access")
- public void run() {
- try {
- getClientPRMetadata(region);
- } catch (VirtualMachineError e) {
- SystemFailure.initiateFailure(e);
- throw e;
- } catch (Throwable e) {
- SystemFailure.checkFailure();
- if (logger.isDebugEnabled()) {
- logger.debug("An exception occurred while fetching metadata", e);
- }
- }
- finally {
- synchronized (fetchTaskCountLock){
- regionsBeingRefreshed.remove(region.getFullPath());
- refreshTaskCount--;
- }
+ SingleHopClientExecutor.submitTask(() -> {
+ try {
+ getClientPRMetadataForRegion(region);
+ } finally {
+ synchronized (fetchTaskCountLock) {
+ regionsBeingRefreshed.remove(region.getFullPath());
+ refreshTaskCount--;
}
}
- };
- SingleHopClientExecutor.submitTask(fetchTask);
+ });
}
}
@@ -676,8 +585,7 @@ public final class ClientMetadataService {
}
if (keys != null) {
for (String regionPath : keys) {
- ClientPartitionAdvisor prAdvisor = this
- .getClientPartitionAdvisor(regionPath);
+ ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(regionPath);
if (isDebugEnabled) {
logger.debug("ClientMetadataService removing from {}{}", regionPath, prAdvisor);
}
@@ -687,11 +595,9 @@ public final class ClientMetadataService {
}
}
}
-
- public byte getMetaDataVersion(Region region, Operation operation,
- Object key, Object value, Object callbackArg) {
- ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(region
- .getFullPath());
+
+ public byte getMetaDataVersion(Region region, Operation operation, Object key, Object value, Object callbackArg) {
+ ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(region.getFullPath());
if (prAdvisor == null) {
return 0;
}
@@ -705,41 +611,31 @@ public final class ClientMetadataService {
// client has not registered PartitionResolver
// Assuming even PR at server side is not using PartitionResolver
resolveKey = key;
- }
- else {
- entryOp = new EntryOperationImpl(region, operation, key,
- value, callbackArg);
+ } else {
+ entryOp = new EntryOperationImpl(region, operation, key, value, callbackArg);
resolveKey = resolver.getRoutingObject(entryOp);
if (resolveKey == null) {
- throw new IllegalStateException(
- LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL
- .toLocalizedString());
+ throw new IllegalStateException(LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL.toLocalizedString());
}
}
-
+
int bucketId;
if (resolver instanceof FixedPartitionResolver) {
if (entryOp == null) {
- entryOp = new EntryOperationImpl(region,
- Operation.FUNCTION_EXECUTION, key, null, null);
+ entryOp = new EntryOperationImpl(region, Operation.FUNCTION_EXECUTION, key, null, null);
}
- String partition = ((FixedPartitionResolver)resolver).getPartitionName(
- entryOp, prAdvisor.getFixedPartitionNames());
+ String partition = ((FixedPartitionResolver) resolver).getPartitionName(entryOp, prAdvisor.getFixedPartitionNames());
if (partition == null) {
Object[] prms = new Object[] { region.getName(), resolver };
- throw new IllegalStateException(
- LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL
- .toLocalizedString(prms));
- }
- else {
- bucketId = prAdvisor.assignFixedBucketId(region, partition, resolveKey);
+ throw new IllegalStateException(LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL.toLocalizedString(prms));
+ } else {
+ bucketId = prAdvisor.assignFixedBucketId(region, partition, resolveKey);
}
- }else {
+ } else {
bucketId = PartitionedRegionHelper.getHashKey(resolveKey, totalNumberOfBuckets);
}
-
- BucketServerLocation66 bsl = (BucketServerLocation66)getPrimaryServerLocation(
- region, bucketId);
+
+ BucketServerLocation66 bsl = (BucketServerLocation66) getPrimaryServerLocation(region, bucketId);
if (bsl == null) {
return 0;
}
@@ -767,9 +663,8 @@ public final class ClientMetadataService {
}
return prAdvisor.advisePrimaryServerLocation(bucketId);
}
-
- private void addClientPartitionAdvisor(String regionFullPath,
- ClientPartitionAdvisor advisor) {
+
+ private void addClientPartitionAdvisor(String regionFullPath, ClientPartitionAdvisor advisor) {
if (this.cache.isClosed() || this.clientPRAdvisors == null) {
return;
}
@@ -778,49 +673,44 @@ public final class ClientMetadataService {
if (advisor.getColocatedWith() != null) {
String parentRegionPath = advisor.getColocatedWith();
Set<ClientPartitionAdvisor> colocatedAdvisors = this.colocatedPRAdvisors.get(parentRegionPath);
- if(colocatedAdvisors == null){
+ if (colocatedAdvisors == null) {
colocatedAdvisors = new CopyOnWriteArraySet<ClientPartitionAdvisor>();
this.colocatedPRAdvisors.put(parentRegionPath, colocatedAdvisors);
}
colocatedAdvisors.add(advisor);
}
- }
- catch (Exception npe) {
+ } catch (Exception npe) {
// ignore, shutdown case
}
-
+
}
public ClientPartitionAdvisor getClientPartitionAdvisor(String regionFullPath) {
if (this.cache.isClosed() || this.clientPRAdvisors == null) {
return null;
}
- ClientPartitionAdvisor prAdvisor = null;
try {
- prAdvisor = this.clientPRAdvisors.get(regionFullPath);
- }
- catch (Exception npe) {
+ return this.clientPRAdvisors.get(regionFullPath);
+ } catch (Exception npe) {
return null;
}
- return prAdvisor;
}
-
+
public Set<ClientPartitionAdvisor> getColocatedClientPartitionAdvisor(String regionFullPath) {
if (this.cache.isClosed() || this.clientPRAdvisors == null || this.colocatedPRAdvisors == null) {
return null;
}
return this.colocatedPRAdvisors.get(regionFullPath);
}
-
+
private Set<String> getAllRegionFullPaths() {
if (this.cache.isClosed() || this.clientPRAdvisors == null) {
return null;
}
- Set<String> keys = null;
+ Set<String> keys = null;
try {
keys = this.clientPRAdvisors.keySet();
- }
- catch (Exception npe) {
+ } catch (Exception npe) {
return null;
}
return keys;
@@ -830,7 +720,7 @@ public final class ClientMetadataService {
this.clientPRAdvisors.clear();
this.colocatedPRAdvisors.clear();
}
-
+
public boolean isRefreshMetadataTestOnly() {
return isMetadataRefreshed_TEST_ONLY;
}
@@ -847,10 +737,10 @@ public final class ClientMetadataService {
return clientPRAdvisors;
}
- public boolean honourServerGroup(){
+ public boolean honourServerGroup() {
return HONOUR_SERVER_GROUP_IN_PR_SINGLE_HOP;
}
-
+
public boolean isMetadataStable() {
return isMetadataStable;
}
@@ -860,7 +750,7 @@ public final class ClientMetadataService {
}
public int getRefreshTaskCount() {
- synchronized(fetchTaskCountLock) {
+ synchronized (fetchTaskCountLock) {
return refreshTaskCount;
}
}