You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/10/11 02:57:47 UTC
[1/3] incubator-geode git commit: GEODE-1801: Change the logic to
increment NonSingleHopsCount
Repository: incubator-geode
Updated Branches:
refs/heads/feature/GEODE-1801 [created] 9b6c10bc2
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9b6c10bc/geode-core/src/test/java/org/apache/geode/internal/cache/SingleHopStatsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/SingleHopStatsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleHopStatsDUnitTest.java
index a7c7b48..3f960c7 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/SingleHopStatsDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleHopStatsDUnitTest.java
@@ -162,13 +162,7 @@ public class SingleHopStatsDUnitTest extends JUnit4CacheTestCase {
CacheServerTestUtil.disableShufflingOfEndpoints();
Pool p;
try {
- p = PoolManager.createFactory().addServer("localhost", port0)
- .addServer("localhost", port1).addServer("localhost", port2)
- .setRetryAttempts(5)
- .setMinConnections(1)
- .setMaxConnections(-1)
- .setSubscriptionEnabled(false)
- .create(Region_Name);
+ p = PoolManager.createFactory().addServer("localhost", port0).addServer("localhost", port1).addServer("localhost", port2).setRetryAttempts(5).setMinConnections(1).setMaxConnections(-1).setSubscriptionEnabled(false).create(Region_Name);
} finally {
CacheServerTestUtil.enableShufflingOfEndpoints();
}
@@ -195,58 +189,40 @@ public class SingleHopStatsDUnitTest extends JUnit4CacheTestCase {
attr.setDataPolicy(DataPolicy.REPLICATE);
region = cache.createRegion(Region_Name, attr.create());
assertNotNull(region);
- LogWriterUtils.getLogWriter().info(
- "Distributed Region " + Region_Name + " created Successfully :"
- + region.toString());
+ LogWriterUtils.getLogWriter().info("Distributed Region " + Region_Name + " created Successfully :" + region.toString());
} else {
PartitionAttributesFactory paf = new PartitionAttributesFactory();
- paf.setRedundantCopies(redundantCopies).setTotalNumBuckets(
- totalNoofBuckets);
+ paf.setRedundantCopies(redundantCopies).setTotalNumBuckets(totalNoofBuckets);
AttributesFactory attr = new AttributesFactory();
attr.setPartitionAttributes(paf.create());
region = cache.createRegion(Region_Name, attr.create());
assertNotNull(region);
- LogWriterUtils.getLogWriter().info(
- "Partitioned Region " + Region_Name + " created Successfully :"
- + region.toString());
+ LogWriterUtils.getLogWriter().info("Partitioned Region " + Region_Name + " created Successfully :" + region.toString());
}
} else {
PartitionAttributesFactory paf = new PartitionAttributesFactory();
- paf.setRedundantCopies(redundantCopies).setTotalNumBuckets(
- totalNoofBuckets).setPartitionResolver(
- new CustomerIDPartitionResolver("CustomerIDPartitio"
- + "nResolver"));
+ paf.setRedundantCopies(redundantCopies).setTotalNumBuckets(totalNoofBuckets).setPartitionResolver(new CustomerIDPartitionResolver("CustomerIDPartitio" + "nResolver"));
AttributesFactory attr = new AttributesFactory();
attr.setPartitionAttributes(paf.create());
Region customerRegion = cache.createRegion(CUSTOMER_REGION_NAME, attr.create());
assertNotNull(customerRegion);
- LogWriterUtils.getLogWriter().info(
- "Partitioned Region CUSTOMER created Successfully :"
- + customerRegion.toString());
+ LogWriterUtils.getLogWriter().info("Partitioned Region CUSTOMER created Successfully :" + customerRegion.toString());
paf = new PartitionAttributesFactory();
- paf.setRedundantCopies(redundantCopies).setTotalNumBuckets(
- totalNoofBuckets).setColocatedWith(CUSTOMER_REGION_NAME).setPartitionResolver(
- new CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
+ paf.setRedundantCopies(redundantCopies).setTotalNumBuckets(totalNoofBuckets).setColocatedWith(CUSTOMER_REGION_NAME).setPartitionResolver(new CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
attr = new AttributesFactory();
attr.setPartitionAttributes(paf.create());
Region orderRegion = cache.createRegion(ORDER_REGION_NAME, attr.create());
assertNotNull(orderRegion);
- LogWriterUtils.getLogWriter().info(
- "Partitioned Region ORDER created Successfully :"
- + orderRegion.toString());
+ LogWriterUtils.getLogWriter().info("Partitioned Region ORDER created Successfully :" + orderRegion.toString());
paf = new PartitionAttributesFactory();
- paf.setRedundantCopies(redundantCopies).setTotalNumBuckets(
- totalNoofBuckets).setColocatedWith(ORDER_REGION_NAME).setPartitionResolver(
- new CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
+ paf.setRedundantCopies(redundantCopies).setTotalNumBuckets(totalNoofBuckets).setColocatedWith(ORDER_REGION_NAME).setPartitionResolver(new CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
attr = new AttributesFactory();
attr.setPartitionAttributes(paf.create());
Region shipmentRegion = cache.createRegion(SHIPMENT_REGION_NAME, attr.create());
assertNotNull(shipmentRegion);
- LogWriterUtils.getLogWriter().info(
- "Partitioned Region SHIPMENT created Successfully :"
- + shipmentRegion.toString());
+ LogWriterUtils.getLogWriter().info("Partitioned Region SHIPMENT created Successfully :" + shipmentRegion.toString());
}
return server.getPort();
}
@@ -260,35 +236,28 @@ public class SingleHopStatsDUnitTest extends JUnit4CacheTestCase {
RegionAttributes attrs = factory.create();
region = cache.createRegion(Region_Name, attrs);
assertNotNull(region);
- LogWriterUtils.getLogWriter().info(
- "Region " + Region_Name + " created Successfully :" + region.toString());
+ LogWriterUtils.getLogWriter().info("Region " + Region_Name + " created Successfully :" + region.toString());
} else {
AttributesFactory factory = new AttributesFactory();
factory.setPoolName(poolName);
RegionAttributes attrs = factory.create();
Region customerRegion = cache.createRegion(CUSTOMER_REGION_NAME, attrs);
assertNotNull(customerRegion);
- LogWriterUtils.getLogWriter().info(
- "Partitioned Region CUSTOMER created Successfully :"
- + customerRegion.toString());
+ LogWriterUtils.getLogWriter().info("Partitioned Region CUSTOMER created Successfully :" + customerRegion.toString());
factory = new AttributesFactory();
factory.setPoolName(poolName);
attrs = factory.create();
Region orderRegion = cache.createRegion(ORDER_REGION_NAME, attrs);
assertNotNull(orderRegion);
- LogWriterUtils.getLogWriter().info(
- "Partitioned Region ORDER created Successfully :"
- + orderRegion.toString());
+ LogWriterUtils.getLogWriter().info("Partitioned Region ORDER created Successfully :" + orderRegion.toString());
factory = new AttributesFactory();
factory.setPoolName(poolName);
attrs = factory.create();
Region shipmentRegion = cache.createRegion("SHIPMENT", attrs);
assertNotNull(shipmentRegion);
- LogWriterUtils.getLogWriter().info(
- "Partitioned Region SHIPMENT created Successfully :"
- + shipmentRegion.toString());
+ LogWriterUtils.getLogWriter().info("Partitioned Region SHIPMENT created Successfully :" + shipmentRegion.toString());
}
}
@@ -304,23 +273,17 @@ public class SingleHopStatsDUnitTest extends JUnit4CacheTestCase {
for (int i = 0; i < 113; i++) {
region.create(new Integer(i), "create" + i);
}
- ClientMetadataService cms = ((GemFireCacheImpl) cache)
- .getClientMetadataService();
- final Map<String, ClientPartitionAdvisor> regionMetaData = cms
- .getClientPRMetadata_TEST_ONLY();
+ ClientMetadataService cms = ((GemFireCacheImpl) cache).getClientMetadataService();
+ final Map<String, ClientPartitionAdvisor> regionMetaData = cms.getClientPRMetadata_TEST_ONLY();
assertEquals(0, regionMetaData.size());
System.out.println("second pass...");
for (int i = 113; i < 226; i++) {
region.create(new Integer(i), "create" + i);
}
- cms = ((GemFireCacheImpl) cache).getClientMetadataService();
// since PR metadata is fetched in a background executor thread
// we need to wait for it to arrive for a bit
- Awaitility.await().timeout(120, TimeUnit.SECONDS).pollDelay(100, TimeUnit.MILLISECONDS)
- .pollInterval(500, TimeUnit.MILLISECONDS).until(() -> {
- return regionMetaData.size() == 1;
- });
+ Awaitility.await().timeout(120, TimeUnit.SECONDS).pollDelay(100, TimeUnit.MILLISECONDS).pollInterval(500, TimeUnit.MILLISECONDS).until(() -> regionMetaData.size() == 1);
assertTrue(regionMetaData.containsKey(region.getFullPath()));
regionMetaData.get(region.getFullPath());
@@ -328,17 +291,15 @@ public class SingleHopStatsDUnitTest extends JUnit4CacheTestCase {
nonSingleHopsCount = ((LocalRegion) region).getCachePerfStats().getNonSingleHopsCount();
assertTrue(metaDataRefreshCount != 0); // hops are not predictable
assertTrue(nonSingleHopsCount != 0);
-
+
System.out.println("metadata refresh count after second pass is " + metaDataRefreshCount);
} else {
System.out.println("creating keys in second client");
for (int i = 0; i < 226; i++) {
region.create(new Integer(i), "create" + i);
}
- ClientMetadataService cms = ((GemFireCacheImpl) cache)
- .getClientMetadataService();
- Map<String, ClientPartitionAdvisor> regionMetaData = cms
- .getClientPRMetadata_TEST_ONLY();
+ ClientMetadataService cms = ((GemFireCacheImpl) cache).getClientMetadataService();
+ Map<String, ClientPartitionAdvisor> regionMetaData = cms.getClientPRMetadata_TEST_ONLY();
assertEquals(1, regionMetaData.size());
assertTrue(regionMetaData.containsKey(region.getFullPath()));
@@ -376,9 +337,9 @@ public class SingleHopStatsDUnitTest extends JUnit4CacheTestCase {
}
}
}
+
ClientMetadataService cms = cache.getClientMetadataService();
- Map<String, ClientPartitionAdvisor> regionMetaData = cms
- .getClientPRMetadata_TEST_ONLY();
+ Map<String, ClientPartitionAdvisor> regionMetaData = cms.getClientPRMetadata_TEST_ONLY();
assertEquals(3, regionMetaData.size());
assertTrue(regionMetaData.containsKey(customerRegion.getFullPath()));
regionMetaData.get(customerRegion.getFullPath());
@@ -407,31 +368,15 @@ public class SingleHopStatsDUnitTest extends JUnit4CacheTestCase {
Region orderRegion = cache.getRegion(ORDER_REGION_NAME);
Region shipmentRegion = cache.getRegion("SHIPMENT");
if (colocation.equals("No_Colocation")) {
- if (FromClient.equals("FirstClient")) {
- for (int i = 0; i < 226; i++) {
- region.get(new Integer(i));
- }
- ClientMetadataService cms = ((GemFireCacheImpl) cache)
- .getClientMetadataService();
- Map<String, ClientPartitionAdvisor> regionMetaData = cms
- .getClientPRMetadata_TEST_ONLY();
- assertEquals(1, regionMetaData.size());
- regionMetaData.get(region.getFullPath());
- assertEquals(metaDataRefreshCount, ((LocalRegion) region).getCachePerfStats().getMetaDataRefreshCount());
- assertEquals(nonSingleHopsCount, ((LocalRegion) region).getCachePerfStats().getNonSingleHopsCount());
- } else {
- for (int i = 0; i < 226; i++) {
- region.get(new Integer(i));
- }
- ClientMetadataService cms = ((GemFireCacheImpl) cache)
- .getClientMetadataService();
- Map<String, ClientPartitionAdvisor> regionMetaData = cms
- .getClientPRMetadata_TEST_ONLY();
- assertEquals(1, regionMetaData.size());
- regionMetaData.get(region.getFullPath());
- assertEquals(metaDataRefreshCount, ((LocalRegion) region).getCachePerfStats().getMetaDataRefreshCount());
- assertEquals(nonSingleHopsCount, ((LocalRegion) region).getCachePerfStats().getNonSingleHopsCount());
+ for (int i = 0; i < 226; i++) {
+ region.get(new Integer(i));
}
+ ClientMetadataService cms = ((GemFireCacheImpl) cache).getClientMetadataService();
+ Map<String, ClientPartitionAdvisor> regionMetaData = cms.getClientPRMetadata_TEST_ONLY();
+ assertEquals(1, regionMetaData.size());
+ regionMetaData.get(region.getFullPath());
+ assertEquals(metaDataRefreshCount, ((LocalRegion) region).getCachePerfStats().getMetaDataRefreshCount());
+ assertEquals(nonSingleHopsCount, ((LocalRegion) region).getCachePerfStats().getNonSingleHopsCount());
} else {
for (int i = 0; i <= 20; i++) {
CustId custid = new CustId(i);
@@ -447,10 +392,8 @@ public class SingleHopStatsDUnitTest extends JUnit4CacheTestCase {
}
}
}
- ClientMetadataService cms = ((GemFireCacheImpl) cache)
- .getClientMetadataService();
- Map<String, ClientPartitionAdvisor> regionMetaData = cms
- .getClientPRMetadata_TEST_ONLY();
+ ClientMetadataService cms = ((GemFireCacheImpl) cache).getClientMetadataService();
+ Map<String, ClientPartitionAdvisor> regionMetaData = cms.getClientPRMetadata_TEST_ONLY();
assertEquals(3, regionMetaData.size());
assertTrue(regionMetaData.containsKey(customerRegion.getFullPath()));
regionMetaData.get(customerRegion.getFullPath());
@@ -475,10 +418,8 @@ public class SingleHopStatsDUnitTest extends JUnit4CacheTestCase {
for (int i = 0; i < 226; i++) {
region.put(new Integer(i), "Update" + i);
}
- ClientMetadataService cms = ((GemFireCacheImpl) cache)
- .getClientMetadataService();
- Map<String, ClientPartitionAdvisor> regionMetaData = cms
- .getClientPRMetadata_TEST_ONLY();
+ ClientMetadataService cms = ((GemFireCacheImpl) cache).getClientMetadataService();
+ Map<String, ClientPartitionAdvisor> regionMetaData = cms.getClientPRMetadata_TEST_ONLY();
assertEquals(1, regionMetaData.size());
regionMetaData.get(region.getFullPath());
assertEquals(metaDataRefreshCount, ((LocalRegion) region).getCachePerfStats().getMetaDataRefreshCount());
[3/3] incubator-geode git commit: GEODE-1801: Change the logic to
increment NonSingleHopsCount
Posted by ud...@apache.org.
GEODE-1801: Change the logic to increment NonSingleHopsCount
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/9b6c10bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/9b6c10bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/9b6c10bc
Branch: refs/heads/feature/GEODE-1801
Commit: 9b6c10bc2f433b36546e6f3c22f39d3b20d880b5
Parents: febc634
Author: Udo Kohlmeyer <uk...@pivotal.io>
Authored: Tue Oct 11 13:57:06 2016 +1100
Committer: Udo Kohlmeyer <uk...@pivotal.io>
Committed: Tue Oct 11 13:57:06 2016 +1100
----------------------------------------------------------------------
.../client/internal/ClientMetadataService.java | 536 +++----
.../geode/internal/cache/GemFireCacheImpl.java | 1499 +++++++++---------
.../internal/cache/SingleHopStatsDUnitTest.java | 127 +-
3 files changed, 980 insertions(+), 1182 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9b6c10bc/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java
index c863d46..325322e 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java
@@ -16,94 +16,109 @@
*/
package org.apache.geode.cache.client.internal;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.SystemFailure;
-import org.apache.geode.cache.*;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.EntryOperation;
+import org.apache.geode.cache.FixedPartitionResolver;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.PartitionResolver;
+import org.apache.geode.cache.Region;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.ServerLocation;
-import org.apache.geode.internal.cache.*;
+import org.apache.geode.internal.cache.BucketServerLocation66;
+import org.apache.geode.internal.cache.EntryOperationImpl;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
-import org.apache.logging.log4j.Logger;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
/**
* Maintains {@link ClientPartitionAdvisor} for Partitioned Regions on servers
* Client operations will consult this service to identify the server locations
* on which the data for the client operation is residing
- *
- *
+ *
* @since GemFire 6.5
- *
*/
public final class ClientMetadataService {
private static final Logger logger = LogService.getLogger();
-
+
private final Cache cache;
-
+
private final Set<String> nonPRs = new HashSet<String>();
private boolean HONOUR_SERVER_GROUP_IN_PR_SINGLE_HOP = Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "PoolImpl.honourServerGroupsInPRSingleHop");
public static final int SIZE_BYTES_ARRAY_RECEIVED = 2;
-
+
public static final int INITIAL_VERSION = 0;
-
- /** random number generator used in pruning */
+
+ /**
+ * random number generator used in pruning
+ */
private final Random rand = new Random();
-
+
private volatile boolean isMetadataStable = true;
private boolean isMetadataRefreshed_TEST_ONLY = false;
-
+
private int refreshTaskCount = 0;
-
+
private Set<String> regionsBeingRefreshed = new HashSet<>();
-
+
private final Object fetchTaskCountLock = new Object();
-
+
public ClientMetadataService(Cache cache) {
this.cache = cache;
}
private final Map<String, ClientPartitionAdvisor> clientPRAdvisors = new ConcurrentHashMap<String, ClientPartitionAdvisor>();
private final Map<String, Set<ClientPartitionAdvisor>> colocatedPRAdvisors = new ConcurrentHashMap<String, Set<ClientPartitionAdvisor>>();
-
- private PartitionResolver getResolver(Region r, Object key,
- Object callbackArgument) {
+
+ private PartitionResolver getResolver(Region r, Object key, Object callbackArgument) {
// First choice is one associated with the region
final String regionFullPath = r.getFullPath();
- ClientPartitionAdvisor advisor = this
- .getClientPartitionAdvisor(regionFullPath);
+ ClientPartitionAdvisor advisor = this.getClientPartitionAdvisor(regionFullPath);
PartitionResolver result = null;
if (advisor != null) {
result = advisor.getPartitionResolver();
}
-
+
if (result != null) {
return result;
}
// Second is the key
if (key != null && key instanceof PartitionResolver) {
- return (PartitionResolver)key;
+ return (PartitionResolver) key;
}
// Third is the callback argument
- if (callbackArgument != null
- && callbackArgument instanceof PartitionResolver) {
- return (PartitionResolver)callbackArgument;
+ if (callbackArgument != null && callbackArgument instanceof PartitionResolver) {
+ return (PartitionResolver) callbackArgument;
}
// There is no resolver.
return null;
}
- public ServerLocation getBucketServerLocation(Region region,
- Operation operation, Object key, Object value, Object callbackArg) {
- ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(region.getFullPath());
+ public ServerLocation getBucketServerLocation(Region region, Operation operation, Object key, Object value, Object callbackArg) {
+ ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(region.getFullPath());
if (prAdvisor == null) {
return null;
}
@@ -116,32 +131,23 @@ public final class ClientMetadataService {
// client has not registered PartitionResolver
// Assuming even PR at server side is not using PartitionResolver
resolveKey = key;
- }
- else {
- entryOp = new EntryOperationImpl(region, operation, key,
- value, callbackArg);
+ } else {
+ entryOp = new EntryOperationImpl(region, operation, key, value, callbackArg);
resolveKey = resolver.getRoutingObject(entryOp);
if (resolveKey == null) {
- throw new IllegalStateException(
- LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL
- .toLocalizedString());
+ throw new IllegalStateException(LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL.toLocalizedString());
}
}
int bucketId;
if (resolver instanceof FixedPartitionResolver) {
if (entryOp == null) {
- entryOp = new EntryOperationImpl(region,
- Operation.FUNCTION_EXECUTION, key, null, null);
+ entryOp = new EntryOperationImpl(region, Operation.FUNCTION_EXECUTION, key, null, null);
}
- String partition = ((FixedPartitionResolver)resolver).getPartitionName(
- entryOp, prAdvisor.getFixedPartitionNames());
+ String partition = ((FixedPartitionResolver) resolver).getPartitionName(entryOp, prAdvisor.getFixedPartitionNames());
if (partition == null) {
Object[] prms = new Object[] { region.getName(), resolver };
- throw new IllegalStateException(
- LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL
- .toLocalizedString(prms));
- }
- else {
+ throw new IllegalStateException(LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL.toLocalizedString(prms));
+ } else {
bucketId = prAdvisor.assignFixedBucketId(region, partition, resolveKey);
if (bucketId == -1) {
// scheduleGetPRMetaData((LocalRegion)region);
@@ -149,21 +155,19 @@ public final class ClientMetadataService {
}
}
- }else {
+ } else {
bucketId = PartitionedRegionHelper.getHashKey(resolveKey, totalNumberOfBuckets);
}
-
- ServerLocation bucketServerLocation = getServerLocation(region, operation,
- bucketId);
+
+ ServerLocation bucketServerLocation = getServerLocation(region, operation, bucketId);
ServerLocation location = null;
- if (bucketServerLocation != null)
- location = new ServerLocation(bucketServerLocation.getHostName(),
- bucketServerLocation.getPort());
+ if (bucketServerLocation != null) {
+ location = new ServerLocation(bucketServerLocation.getHostName(), bucketServerLocation.getPort());
+ }
return location;
}
- private ServerLocation getServerLocation(Region region, Operation operation,
- int bucketId) {
+ private ServerLocation getServerLocation(Region region, Operation operation, int bucketId) {
final String regionFullPath = region.getFullPath();
ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(regionFullPath);
if (prAdvisor == null) {
@@ -172,60 +176,41 @@ public final class ClientMetadataService {
}
return null;
}
-
-// if (prAdvisor.getColocatedWith() != null) {
-// prAdvisor = this.getClientPartitionAdvisor(prAdvisor.getColocatedWith());
-// if (prAdvisor == null) {
-// if (this.logger.fineEnabled()) {
-// this.logger.fine(
-// "ClientMetadataService#getServerLocation : Region "
-// + regionFullPath + "prAdvisor does not exist.");
-// }
-// return null;
-// }
-// }
-
+
if (operation.isGet()) {
return prAdvisor.adviseServerLocation(bucketId);
- }
- else {
+ } else {
return prAdvisor.advisePrimaryServerLocation(bucketId);
}
}
- public Map<ServerLocation, HashSet> getServerToFilterMap(
- final Collection routingKeys, final Region region, boolean primaryMembersNeeded
- ) {
- return getServerToFilterMap(routingKeys, region, primaryMembersNeeded, false);
+ public Map<ServerLocation, HashSet> getServerToFilterMap(final Collection routingKeys, final Region region, boolean primaryMembersNeeded) {
+ return getServerToFilterMap(routingKeys, region, primaryMembersNeeded, false);
}
-
- public Map<ServerLocation, HashSet> getServerToFilterMap(
- final Collection routingKeys, final Region region, boolean primaryMembersNeeded,
- boolean bucketsAsFilter) {
+
+ public Map<ServerLocation, HashSet> getServerToFilterMap(final Collection routingKeys, final Region region, boolean primaryMembersNeeded, boolean bucketsAsFilter) {
final String regionFullPath = region.getFullPath();
ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(regionFullPath);
if (prAdvisor == null || prAdvisor.adviseRandomServerLocation() == null) {
- scheduleGetPRMetaData((LocalRegion)region, false);
+ scheduleGetPRMetaData((LocalRegion) region, false);
return null;
}
- HashMap<Integer, HashSet> bucketToKeysMap = groupByBucketOnClientSide(
- region, prAdvisor, routingKeys, bucketsAsFilter);
+ HashMap<Integer, HashSet> bucketToKeysMap = groupByBucketOnClientSide(region, prAdvisor, routingKeys, bucketsAsFilter);
HashMap<ServerLocation, HashSet> serverToKeysMap = new HashMap<ServerLocation, HashSet>();
- HashMap<ServerLocation, HashSet<Integer>> serverToBuckets = groupByServerToBuckets(
- prAdvisor, bucketToKeysMap.keySet(), primaryMembersNeeded);
-
- if(serverToBuckets == null){
+ HashMap<ServerLocation, HashSet<Integer>> serverToBuckets = groupByServerToBuckets(prAdvisor, bucketToKeysMap.keySet(), primaryMembersNeeded);
+
+ if (serverToBuckets == null) {
return null;
}
-
+
for (Map.Entry entry : serverToBuckets.entrySet()) {
- ServerLocation server = (ServerLocation)entry.getKey();
- HashSet<Integer> buckets = (HashSet)entry.getValue();
+ ServerLocation server = (ServerLocation) entry.getKey();
+ HashSet<Integer> buckets = (HashSet) entry.getValue();
for (Integer bucket : buckets) {
// use LinkedHashSet to maintain the order of keys
// the keys will be iterated several times
- LinkedHashSet keys = (LinkedHashSet)serverToKeysMap.get(server);
+ LinkedHashSet keys = (LinkedHashSet) serverToKeysMap.get(server);
if (keys == null) {
keys = new LinkedHashSet();
}
@@ -239,29 +224,28 @@ public final class ClientMetadataService {
return serverToKeysMap;
}
-
- public HashMap<ServerLocation, HashSet<Integer>> groupByServerToAllBuckets(Region region, boolean primaryOnly){
+
+ public HashMap<ServerLocation, HashSet<Integer>> groupByServerToAllBuckets(Region region, boolean primaryOnly) {
final String regionFullPath = region.getFullPath();
ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(regionFullPath);
if (prAdvisor == null || prAdvisor.adviseRandomServerLocation() == null) {
- scheduleGetPRMetaData((LocalRegion)region, false);
+ scheduleGetPRMetaData((LocalRegion) region, false);
return null;
}
int totalNumberOfBuckets = prAdvisor.getTotalNumBuckets();
HashSet<Integer> allBucketIds = new HashSet<Integer>();
- for(int i =0; i < totalNumberOfBuckets; i++){
+ for (int i = 0; i < totalNumberOfBuckets; i++) {
allBucketIds.add(i);
}
return groupByServerToBuckets(prAdvisor, allBucketIds, primaryOnly);
}
+
/**
* This function should make a map of server to buckets it is hosting.
* If for some bucket servers are not available due to mismatch in metadata
* it should fill up a random server for it.
*/
- private HashMap<ServerLocation, HashSet<Integer>> groupByServerToBuckets(
- ClientPartitionAdvisor prAdvisor, Set<Integer> bucketSet,
- boolean primaryOnly) {
+ private HashMap<ServerLocation, HashSet<Integer>> groupByServerToBuckets(ClientPartitionAdvisor prAdvisor, Set<Integer> bucketSet, boolean primaryOnly) {
if (primaryOnly) {
HashMap<ServerLocation, HashSet<Integer>> serverToBucketsMap = new HashMap<ServerLocation, HashSet<Integer>>();
for (Integer bucketId : bucketSet) {
@@ -285,16 +269,14 @@ public final class ClientMetadataService {
}
return serverToBucketsMap;
- }
- else {
+ } else {
return pruneNodes(prAdvisor, bucketSet);
}
}
-
-
- private HashMap<ServerLocation, HashSet<Integer>> pruneNodes(
- ClientPartitionAdvisor prAdvisor, Set<Integer> buckets) {
-
+
+
+ private HashMap<ServerLocation, HashSet<Integer>> pruneNodes(ClientPartitionAdvisor prAdvisor, Set<Integer> buckets) {
+
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("ClientMetadataService: The buckets to be pruned are: {}", buckets);
@@ -303,8 +285,7 @@ public final class ClientMetadataService {
HashMap<ServerLocation, HashSet<Integer>> prunedServerToBucketsMap = new HashMap<ServerLocation, HashSet<Integer>>();
for (Integer bucketId : buckets) {
- List<BucketServerLocation66> serversList = prAdvisor
- .adviseServerLocations(bucketId);
+ List<BucketServerLocation66> serversList = prAdvisor.adviseServerLocations(bucketId);
if (isDebugEnabled) {
logger.debug("ClientMetadataService: For bucketId {} the server list is {}", bucketId, serversList);
}
@@ -314,18 +295,17 @@ public final class ClientMetadataService {
//will cause us to use the non-single hop path.
return null;
}
-
+
if (isDebugEnabled) {
logger.debug("ClientMetadataService: The buckets owners of the bucket: {} are: {}", bucketId, serversList);
}
-
+
for (ServerLocation server : serversList) {
if (serverToBucketsMap.get(server) == null) {
HashSet<Integer> bucketSet = new HashSet<Integer>();
bucketSet.add(bucketId);
serverToBucketsMap.put(server, bucketSet);
- }
- else {
+ } else {
HashSet<Integer> bucketSet = serverToBucketsMap.get(server);
bucketSet.add(bucketId);
serverToBucketsMap.put(server, bucketSet);
@@ -342,10 +322,9 @@ public final class ClientMetadataService {
ServerLocation randomFirstServer = null;
if (serverToBucketsMap.isEmpty()) {
return null;
- }
- else {
+ } else {
int size = serverToBucketsMap.size();
- randomFirstServer = (ServerLocation)serverToBucketsMap.keySet().toArray()[rand.nextInt(size)];
+ randomFirstServer = (ServerLocation) serverToBucketsMap.keySet().toArray()[rand.nextInt(size)];
}
HashSet<Integer> bucketSet = serverToBucketsMap.get(randomFirstServer);
if (isDebugEnabled) {
@@ -356,21 +335,14 @@ public final class ClientMetadataService {
serverToBucketsMap.remove(randomFirstServer);
while (!currentBucketSet.equals(buckets)) {
- ServerLocation server = findNextServer(serverToBucketsMap.entrySet(),
- currentBucketSet);
+ ServerLocation server = findNextServer(serverToBucketsMap.entrySet(), currentBucketSet);
if (server == null) {
-// HashSet<Integer> rBuckets = prunedServerToBucketsMap
-// .get(randomFirstServer);
-// HashSet<Integer> remainingBuckets = new HashSet<Integer>(buckets);
-// remainingBuckets.removeAll(currentBucketSet);
-// rBuckets.addAll(remainingBuckets);
-// prunedServerToBucketsMap.put(randomFirstServer, rBuckets);
break;
}
-
+
HashSet<Integer> bucketSet2 = serverToBucketsMap.get(server);
bucketSet2.removeAll(currentBucketSet);
- if(bucketSet2.isEmpty()) {
+ if (bucketSet2.isEmpty()) {
serverToBucketsMap.remove(server);
continue;
}
@@ -381,22 +353,20 @@ public final class ClientMetadataService {
}
serverToBucketsMap.remove(server);
}
-
+
if (isDebugEnabled) {
logger.debug("ClientMetadataService: The final prunedServerToBucket calculated is : {}", prunedServerToBucketsMap);
}
-
+
return prunedServerToBucketsMap;
}
-
-
- private ServerLocation findNextServer(
- Set<Map.Entry<ServerLocation, HashSet<Integer>>> entrySet,
- HashSet<Integer> currentBucketSet) {
-
+
+
+ private ServerLocation findNextServer(Set<Map.Entry<ServerLocation, HashSet<Integer>>> entrySet, HashSet<Integer> currentBucketSet) {
+
ServerLocation server = null;
int max = -1;
- ArrayList<ServerLocation> nodesOfEqualSize = new ArrayList<ServerLocation>();
+ ArrayList<ServerLocation> nodesOfEqualSize = new ArrayList<ServerLocation>();
for (Map.Entry<ServerLocation, HashSet<Integer>> entry : entrySet) {
HashSet<Integer> buckets = new HashSet<Integer>();
buckets.addAll(entry.getValue());
@@ -407,30 +377,28 @@ public final class ClientMetadataService {
server = entry.getKey();
nodesOfEqualSize.clear();
nodesOfEqualSize.add(server);
- }
- else if (max == buckets.size()){
+ } else if (max == buckets.size()) {
nodesOfEqualSize.add(server);
}
}
-
+
//return node;
Random r = new Random();
- if(nodesOfEqualSize.size() > 0)
+ if (nodesOfEqualSize.size() > 0) {
return nodesOfEqualSize.get(r.nextInt(nodesOfEqualSize.size()));
-
- return null;
+ }
+
+ return null;
}
-
- private HashMap<Integer, HashSet> groupByBucketOnClientSide(Region region,
- ClientPartitionAdvisor prAdvisor, Collection routingKeys, boolean bucketsAsFilter) {
-
+
+ private HashMap<Integer, HashSet> groupByBucketOnClientSide(Region region, ClientPartitionAdvisor prAdvisor, Collection routingKeys, boolean bucketsAsFilter) {
+
HashMap<Integer, HashSet> bucketToKeysMap = new HashMap();
int totalNumberOfBuckets = prAdvisor.getTotalNumBuckets();
Iterator i = routingKeys.iterator();
while (i.hasNext()) {
- Object key = i.next();
- int bucketId = bucketsAsFilter ? ((Integer)key).intValue() :
- extractBucketID(region, prAdvisor, totalNumberOfBuckets, key);
+ Object key = i.next();
+ int bucketId = bucketsAsFilter ? ((Integer) key).intValue() : extractBucketID(region, prAdvisor, totalNumberOfBuckets, key);
HashSet bucketKeys = bucketToKeysMap.get(bucketId);
if (bucketKeys == null) {
bucketKeys = new HashSet(); // faster if this was an ArrayList
@@ -444,8 +412,7 @@ public final class ClientMetadataService {
return bucketToKeysMap;
}
- private int extractBucketID(Region region, ClientPartitionAdvisor prAdvisor,
- int totalNumberOfBuckets, Object key) {
+ private int extractBucketID(Region region, ClientPartitionAdvisor prAdvisor, int totalNumberOfBuckets, Object key) {
int bucketId = -1;
final PartitionResolver resolver = getResolver(region, key, null);
Object resolveKey;
@@ -454,101 +421,76 @@ public final class ClientMetadataService {
// client has not registered PartitionResolver
// Assuming even PR at server side is not using PartitionResolver
resolveKey = key;
- }
- else {
- entryOp = new EntryOperationImpl(region,
- Operation.FUNCTION_EXECUTION, key, null, null);
+ } else {
+ entryOp = new EntryOperationImpl(region, Operation.FUNCTION_EXECUTION, key, null, null);
resolveKey = resolver.getRoutingObject(entryOp);
if (resolveKey == null) {
- throw new IllegalStateException(
- LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL
- .toLocalizedString());
+ throw new IllegalStateException(LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL.toLocalizedString());
}
}
-
+
if (resolver instanceof FixedPartitionResolver) {
if (entryOp == null) {
- entryOp = new EntryOperationImpl(region,
- Operation.FUNCTION_EXECUTION, key, null, null);
+ entryOp = new EntryOperationImpl(region, Operation.FUNCTION_EXECUTION, key, null, null);
}
- String partition = ((FixedPartitionResolver)resolver).getPartitionName(
- entryOp, prAdvisor.getFixedPartitionNames());
+ String partition = ((FixedPartitionResolver) resolver).getPartitionName(entryOp, prAdvisor.getFixedPartitionNames());
if (partition == null) {
Object[] prms = new Object[] { region.getName(), resolver };
- throw new IllegalStateException(
- LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL
- .toLocalizedString(prms));
- }
- else {
- bucketId = prAdvisor.assignFixedBucketId(region, partition, resolveKey);
+ throw new IllegalStateException(LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL.toLocalizedString(prms));
+ } else {
+ bucketId = prAdvisor.assignFixedBucketId(region, partition, resolveKey);
// This bucketid can be -1 in some circumstances where we don't have information about
// all the partition on the server.
// Do proactive scheduling of metadata fetch
- if(bucketId == -1) {
- scheduleGetPRMetaData((LocalRegion)region, true);
+ if (bucketId == -1) {
+ scheduleGetPRMetaData((LocalRegion) region, true);
}
}
- }else{
+ } else {
bucketId = PartitionedRegionHelper.getHashKey(resolveKey, totalNumberOfBuckets);
}
return bucketId;
}
-
-
- public void scheduleGetPRMetaData(final LocalRegion region,
- final boolean isRecursive) {
- if(this.nonPRs.contains(region.getFullPath())){
+
+ public void scheduleGetPRMetaData(final LocalRegion region, final boolean isRecursive) {
+ if (this.nonPRs.contains(region.getFullPath())) {
return;
}
this.setMetadataStable(false);
region.getCachePerfStats().incNonSingleHopsCount();
if (isRecursive) {
- try {
- getClientPRMetadata(region);
- }
- catch (VirtualMachineError e) {
- SystemFailure.initiateFailure(e);
- throw e;
- }
- catch (Throwable e) {
- SystemFailure.checkFailure();
- if (logger.isDebugEnabled()) {
- logger.debug("An exception occurred while fetching metadata", e);
- }
- }
- }
- else {
- synchronized (fetchTaskCountLock){
+ getClientPRMetadataForRegion(region);
+ } else {
+ synchronized (fetchTaskCountLock) {
refreshTaskCount++;
}
- Runnable fetchTask = new Runnable() {
- @SuppressWarnings("synthetic-access")
- public void run() {
- try {
- getClientPRMetadata(region);
- }
- catch (VirtualMachineError e) {
- SystemFailure.initiateFailure(e);
- throw e;
- }
- catch (Throwable e) {
- SystemFailure.checkFailure();
- if (logger.isDebugEnabled()) {
- logger.debug("An exception occurred while fetching metadata", e);
- }
- }
- finally {
- synchronized (fetchTaskCountLock){
- refreshTaskCount--;
- }
+ SingleHopClientExecutor.submitTask(() -> {
+ try {
+ getClientPRMetadataForRegion(region);
+ } finally {
+ synchronized (fetchTaskCountLock) {
+ refreshTaskCount--;
}
}
- };
- SingleHopClientExecutor.submitTask(fetchTask);
+ });
}
}
-
+
+ private void getClientPRMetadataForRegion(final LocalRegion region) {
+ try {
+ getClientPRMetadata(region);
+ } catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ } catch (Throwable e) {
+ SystemFailure.checkFailure();
+ if (logger.isDebugEnabled()) {
+ logger.debug("An exception occurred while fetching metadata", e);
+ }
+ }
+ }
+
public final void getClientPRMetadata(LocalRegion region) {
final String regionFullPath = region.getFullPath();
ClientPartitionAdvisor advisor = null;
@@ -558,19 +500,16 @@ public final class ClientMetadataService {
if (region.clientMetaDataLock.tryLock()) {
try {
advisor = this.getClientPartitionAdvisor(regionFullPath);
- if (advisor==null) {
- advisor = GetClientPartitionAttributesOp
- .execute(pool, regionFullPath);
- if(advisor == null){
+ if (advisor == null) {
+ advisor = GetClientPartitionAttributesOp.execute(pool, regionFullPath);
+ if (advisor == null) {
this.nonPRs.add(regionFullPath);
return;
}
addClientPartitionAdvisor(regionFullPath, advisor);
- }
- else {
- if(advisor.getFixedPAMap() != null && !advisor.isFPAAttrsComplete()) {
- ClientPartitionAdvisor newAdvisor = GetClientPartitionAttributesOp
- .execute(pool, regionFullPath);
+ } else {
+ if (advisor.getFixedPAMap() != null && !advisor.isFPAAttrsComplete()) {
+ ClientPartitionAdvisor newAdvisor = GetClientPartitionAttributesOp.execute(pool, regionFullPath);
advisor.updateFixedPAMap(newAdvisor.getFixedPAMap());
}
}
@@ -579,39 +518,33 @@ public final class ClientMetadataService {
isMetadataRefreshed_TEST_ONLY = true;
GetClientPRMetaDataOp.execute(pool, regionFullPath, this);
region.getCachePerfStats().incMetaDataRefreshCount();
- }
- else {
- ClientPartitionAdvisor colocatedAdvisor = this.getClientPartitionAdvisor(colocatedWith);
- LocalRegion leaderRegion = (LocalRegion)region.getCache()
- .getRegion(colocatedWith);
- if (colocatedAdvisor == null) {
+ } else {
+ LocalRegion leaderRegion = (LocalRegion) region.getCache().getRegion(colocatedWith);
+ if (this.getClientPartitionAdvisor(colocatedWith) == null) {
scheduleGetPRMetaData(leaderRegion, true);
return;
- }
- else {
+ } else {
isMetadataRefreshed_TEST_ONLY = true;
GetClientPRMetaDataOp.execute(pool, colocatedWith, this);
leaderRegion.getCachePerfStats().incMetaDataRefreshCount();
}
}
- }
- finally {
+ } finally {
region.clientMetaDataLock.unlock();
}
}
}
-
- public void scheduleGetPRMetaData(final LocalRegion region,
- final boolean isRecursive, byte nwHopType) {
- if(this.nonPRs.contains(region.getFullPath())){
+
+ public void scheduleGetPRMetaData(final LocalRegion region, final boolean isRecursive, byte nwHopType) {
+ if (this.nonPRs.contains(region.getFullPath())) {
return;
}
ClientPartitionAdvisor advisor = this.getClientPartitionAdvisor(region.getFullPath());
- if(advisor!= null && advisor.getServerGroup().length()!= 0 && HONOUR_SERVER_GROUP_IN_PR_SINGLE_HOP){
+ if (advisor != null && advisor.getServerGroup().length() != 0 && HONOUR_SERVER_GROUP_IN_PR_SINGLE_HOP) {
if (logger.isDebugEnabled()) {
logger.debug("Scheduling metadata refresh: {} region: {}", nwHopType, region.getName());
}
- if( nwHopType == PartitionedRegion.NETWORK_HOP_TO_DIFFERENT_GROUP){
+ if (nwHopType == PartitionedRegion.NETWORK_HOP_TO_DIFFERENT_GROUP) {
return;
}
}
@@ -620,51 +553,27 @@ public final class ClientMetadataService {
return;
}
}
+ region.getCachePerfStats().incNonSingleHopsCount();
if (isRecursive) {
- region.getCachePerfStats().incNonSingleHopsCount();
- try {
- getClientPRMetadata(region);
- } catch (VirtualMachineError e) {
- SystemFailure.initiateFailure(e);
- throw e;
- } catch (Throwable e) {
- SystemFailure.checkFailure();
- if (logger.isDebugEnabled()) {
- logger.debug("An exception occurred while fetching metadata", e);
- }
- }
+ getClientPRMetadataForRegion(region);
} else {
synchronized (fetchTaskCountLock) {
if (regionsBeingRefreshed.contains(region.getFullPath())) {
return;
}
- region.getCachePerfStats().incNonSingleHopsCount();
regionsBeingRefreshed.add(region.getFullPath());
refreshTaskCount++;
}
- Runnable fetchTask = new Runnable() {
- @SuppressWarnings("synthetic-access")
- public void run() {
- try {
- getClientPRMetadata(region);
- } catch (VirtualMachineError e) {
- SystemFailure.initiateFailure(e);
- throw e;
- } catch (Throwable e) {
- SystemFailure.checkFailure();
- if (logger.isDebugEnabled()) {
- logger.debug("An exception occurred while fetching metadata", e);
- }
- }
- finally {
- synchronized (fetchTaskCountLock){
- regionsBeingRefreshed.remove(region.getFullPath());
- refreshTaskCount--;
- }
+ SingleHopClientExecutor.submitTask(() -> {
+ try {
+ getClientPRMetadataForRegion(region);
+ } finally {
+ synchronized (fetchTaskCountLock) {
+ regionsBeingRefreshed.remove(region.getFullPath());
+ refreshTaskCount--;
}
}
- };
- SingleHopClientExecutor.submitTask(fetchTask);
+ });
}
}
@@ -676,8 +585,7 @@ public final class ClientMetadataService {
}
if (keys != null) {
for (String regionPath : keys) {
- ClientPartitionAdvisor prAdvisor = this
- .getClientPartitionAdvisor(regionPath);
+ ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(regionPath);
if (isDebugEnabled) {
logger.debug("ClientMetadataService removing from {}{}", regionPath, prAdvisor);
}
@@ -687,11 +595,9 @@ public final class ClientMetadataService {
}
}
}
-
- public byte getMetaDataVersion(Region region, Operation operation,
- Object key, Object value, Object callbackArg) {
- ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(region
- .getFullPath());
+
+ public byte getMetaDataVersion(Region region, Operation operation, Object key, Object value, Object callbackArg) {
+ ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(region.getFullPath());
if (prAdvisor == null) {
return 0;
}
@@ -705,41 +611,31 @@ public final class ClientMetadataService {
// client has not registered PartitionResolver
// Assuming even PR at server side is not using PartitionResolver
resolveKey = key;
- }
- else {
- entryOp = new EntryOperationImpl(region, operation, key,
- value, callbackArg);
+ } else {
+ entryOp = new EntryOperationImpl(region, operation, key, value, callbackArg);
resolveKey = resolver.getRoutingObject(entryOp);
if (resolveKey == null) {
- throw new IllegalStateException(
- LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL
- .toLocalizedString());
+ throw new IllegalStateException(LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL.toLocalizedString());
}
}
-
+
int bucketId;
if (resolver instanceof FixedPartitionResolver) {
if (entryOp == null) {
- entryOp = new EntryOperationImpl(region,
- Operation.FUNCTION_EXECUTION, key, null, null);
+ entryOp = new EntryOperationImpl(region, Operation.FUNCTION_EXECUTION, key, null, null);
}
- String partition = ((FixedPartitionResolver)resolver).getPartitionName(
- entryOp, prAdvisor.getFixedPartitionNames());
+ String partition = ((FixedPartitionResolver) resolver).getPartitionName(entryOp, prAdvisor.getFixedPartitionNames());
if (partition == null) {
Object[] prms = new Object[] { region.getName(), resolver };
- throw new IllegalStateException(
- LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL
- .toLocalizedString(prms));
- }
- else {
- bucketId = prAdvisor.assignFixedBucketId(region, partition, resolveKey);
+ throw new IllegalStateException(LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL.toLocalizedString(prms));
+ } else {
+ bucketId = prAdvisor.assignFixedBucketId(region, partition, resolveKey);
}
- }else {
+ } else {
bucketId = PartitionedRegionHelper.getHashKey(resolveKey, totalNumberOfBuckets);
}
-
- BucketServerLocation66 bsl = (BucketServerLocation66)getPrimaryServerLocation(
- region, bucketId);
+
+ BucketServerLocation66 bsl = (BucketServerLocation66) getPrimaryServerLocation(region, bucketId);
if (bsl == null) {
return 0;
}
@@ -767,9 +663,8 @@ public final class ClientMetadataService {
}
return prAdvisor.advisePrimaryServerLocation(bucketId);
}
-
- private void addClientPartitionAdvisor(String regionFullPath,
- ClientPartitionAdvisor advisor) {
+
+ private void addClientPartitionAdvisor(String regionFullPath, ClientPartitionAdvisor advisor) {
if (this.cache.isClosed() || this.clientPRAdvisors == null) {
return;
}
@@ -778,49 +673,44 @@ public final class ClientMetadataService {
if (advisor.getColocatedWith() != null) {
String parentRegionPath = advisor.getColocatedWith();
Set<ClientPartitionAdvisor> colocatedAdvisors = this.colocatedPRAdvisors.get(parentRegionPath);
- if(colocatedAdvisors == null){
+ if (colocatedAdvisors == null) {
colocatedAdvisors = new CopyOnWriteArraySet<ClientPartitionAdvisor>();
this.colocatedPRAdvisors.put(parentRegionPath, colocatedAdvisors);
}
colocatedAdvisors.add(advisor);
}
- }
- catch (Exception npe) {
+ } catch (Exception npe) {
// ignore, shutdown case
}
-
+
}
public ClientPartitionAdvisor getClientPartitionAdvisor(String regionFullPath) {
if (this.cache.isClosed() || this.clientPRAdvisors == null) {
return null;
}
- ClientPartitionAdvisor prAdvisor = null;
try {
- prAdvisor = this.clientPRAdvisors.get(regionFullPath);
- }
- catch (Exception npe) {
+ return this.clientPRAdvisors.get(regionFullPath);
+ } catch (Exception npe) {
return null;
}
- return prAdvisor;
}
-
+
public Set<ClientPartitionAdvisor> getColocatedClientPartitionAdvisor(String regionFullPath) {
if (this.cache.isClosed() || this.clientPRAdvisors == null || this.colocatedPRAdvisors == null) {
return null;
}
return this.colocatedPRAdvisors.get(regionFullPath);
}
-
+
private Set<String> getAllRegionFullPaths() {
if (this.cache.isClosed() || this.clientPRAdvisors == null) {
return null;
}
- Set<String> keys = null;
+ Set<String> keys = null;
try {
keys = this.clientPRAdvisors.keySet();
- }
- catch (Exception npe) {
+ } catch (Exception npe) {
return null;
}
return keys;
@@ -830,7 +720,7 @@ public final class ClientMetadataService {
this.clientPRAdvisors.clear();
this.colocatedPRAdvisors.clear();
}
-
+
public boolean isRefreshMetadataTestOnly() {
return isMetadataRefreshed_TEST_ONLY;
}
@@ -847,10 +737,10 @@ public final class ClientMetadataService {
return clientPRAdvisors;
}
- public boolean honourServerGroup(){
+ public boolean honourServerGroup() {
return HONOUR_SERVER_GROUP_IN_PR_SINGLE_HOP;
}
-
+
public boolean isMetadataStable() {
return isMetadataStable;
}
@@ -860,7 +750,7 @@ public final class ClientMetadataService {
}
public int getRefreshTaskCount() {
- synchronized(fetchTaskCountLock) {
+ synchronized (fetchTaskCountLock) {
return refreshTaskCount;
}
}
[2/3] incubator-geode git commit: GEODE-1801: Change the logic to
increment NonSingleHopsCount
Posted by ud...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9b6c10bc/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index d166397..bb9bf93 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -233,17 +233,20 @@ import org.apache.geode.pdx.internal.TypeRegistry;
import org.apache.geode.redis.GeodeRedisServer;
// @todo somebody Come up with more reasonable values for {@link #DEFAULT_LOCK_TIMEOUT}, etc.
+
/**
* GemFire's implementation of a distributed {@link org.apache.geode.cache.Cache}.
- *
*/
@SuppressWarnings("deprecation")
public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePerfStats, DistributionAdvisee, CacheTime {
+
private static final Logger logger = LogService.getLogger();
-
+
// moved *SERIAL_NUMBER stuff to DistributionAdvisor
- /** The default number of seconds to wait for a distributed lock */
+ /**
+ * The default number of seconds to wait for a distributed lock
+ */
public static final int DEFAULT_LOCK_TIMEOUT = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.defaultLockTimeout", 60).intValue();
/**
@@ -251,10 +254,14 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
*/
public static final int DEFAULT_LOCK_LEASE = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.defaultLockLease", 120).intValue();
- /** The default "copy on read" attribute value */
+ /**
+ * The default "copy on read" attribute value
+ */
public static final boolean DEFAULT_COPY_ON_READ = false;
- /** the last instance of GemFireCache created */
+ /**
+ * the last instance of GemFireCache created
+ */
private static volatile GemFireCacheImpl instance = null;
/**
* Just like instance but is valid for a bit longer so that pdx can still find the cache during a close.
@@ -295,9 +302,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
/**
* System property to disable query monitor even if resource manager is in use
*/
- public final boolean QUERY_MONITOR_DISABLED_FOR_LOW_MEM = Boolean
- .getBoolean(DistributionConfig.GEMFIRE_PREFIX + "Cache.DISABLE_QUERY_MONITOR_FOR_LOW_MEMORY");
-
+ public final boolean QUERY_MONITOR_DISABLED_FOR_LOW_MEM = Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "Cache.DISABLE_QUERY_MONITOR_FOR_LOW_MEMORY");
+
/**
* Property set to true if resource manager heap percentage is set and query monitor is required
*/
@@ -309,13 +315,15 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
public static final String FIND_REST_ENABLED_SERVERS_FUNCTION_ID = FindRestEnabledServersFunction.class.getName();
/**
- * True if the user is allowed lock when memory resources appear to be overcommitted.
+ * True if the user is allowed lock when memory resources appear to be overcommitted.
*/
public static final boolean ALLOW_MEMORY_LOCK_WHEN_OVERCOMMITTED = Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "Cache.ALLOW_MEMORY_OVERCOMMIT");
//time in ms
private static final int FIVE_HOURS = 5 * 60 * 60 * 1000;
- /** To test MAX_QUERY_EXECUTION_TIME option. */
+ /**
+ * To test MAX_QUERY_EXECUTION_TIME option.
+ */
public int TEST_MAX_QUERY_EXECUTION_TIME = -1;
public boolean TEST_MAX_QUERY_EXECUTION_TIME_OVERRIDE_EXCEPTION = false;
@@ -346,21 +354,31 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
protected volatile boolean closingGatewaySendersByShutdownAll = false;
protected volatile boolean closingGatewayReceiversByShutdownAll = false;
- /** Amount of time (in seconds) to wait for a distributed lock */
+ /**
+ * Amount of time (in seconds) to wait for a distributed lock
+ */
private int lockTimeout = DEFAULT_LOCK_TIMEOUT;
- /** Amount of time a lease of a distributed lock lasts */
+ /**
+ * Amount of time a lease of a distributed lock lasts
+ */
private int lockLease = DEFAULT_LOCK_LEASE;
- /** Amount of time to wait for a <code>netSearch</code> to complete */
+ /**
+ * Amount of time to wait for a <code>netSearch</code> to complete
+ */
private int searchTimeout = DEFAULT_SEARCH_TIMEOUT;
private final CachePerfStats cachePerfStats;
- /** Date on which this instances was created */
+ /**
+ * Date on which this instances was created
+ */
private final Date creationDate;
- /** thread pool for event dispatching */
+ /**
+ * thread pool for event dispatching
+ */
private final ThreadPoolExecutor eventThreadPool;
/**
@@ -382,19 +400,19 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
* {@link #allGatewaySendersLock}
*/
private volatile Set<GatewaySender> allGatewaySenders = Collections.emptySet();
-
+
/**
- * The list of all async event queues added to the cache.
+ * The list of all async event queues added to the cache.
* CopyOnWriteArrayList is used to allow concurrent add, remove and retrieval operations.
*/
private volatile Set<AsyncEventQueue> allVisibleAsyncEventQueues = new CopyOnWriteArraySet<AsyncEventQueue>();
/**
- * The list of all async event queues added to the cache.
+ * The list of all async event queues added to the cache.
* CopyOnWriteArrayList is used to allow concurrent add, remove and retrieval operations.
*/
private volatile Set<AsyncEventQueue> allAsyncEventQueues = new CopyOnWriteArraySet<AsyncEventQueue>();
-
+
/**
* Controls updates to the list of all gateway receivers
*
@@ -408,7 +426,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
*/
private volatile Set<GatewayReceiver> allGatewayReceivers = Collections.emptySet();
- /** PartitionedRegion instances (for required-events notification */
+ /**
+ * PartitionedRegion instances (for required-events notification
+ */
// This is a HashSet because I know that clear() on it does not
// allocate any objects.
private final HashSet<PartitionedRegion> partitionedRegions = new HashSet<PartitionedRegion>();
@@ -422,27 +442,34 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
private final ConcurrentMap<String, DistributedRegion> regionsInDestroy = new ConcurrentHashMap<String, DistributedRegion>();
public final Object allGatewayHubsLock = new Object();
-
+
/**
* conflict resolver for WAN, if any
- * @guarded.By {@link #allGatewayHubsLock}
*/
private GatewayConflictResolver gatewayConflictResolver;
- /** Is this is "server" cache? */
+ /**
+ * Is this is "server" cache?
+ */
private boolean isServer = false;
- /** transaction manager for this cache */
+ /**
+ * transaction manager for this cache
+ */
private final TXManagerImpl txMgr;
private RestAgent restAgent;
-
+
private boolean isRESTServiceRunning = false;
-
- /** Copy on Read feature for all read operations e.g. get */
+
+ /**
+ * Copy on Read feature for all read operations e.g. get
+ */
private volatile boolean copyOnRead = DEFAULT_COPY_ON_READ;
-
- /** The named region attributes registered with this cache. */
+
+ /**
+ * The named region attributes registered with this cache.
+ */
private final Map namedRegionAttributes = Collections.synchronizedMap(new HashMap());
/**
@@ -456,7 +483,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
*/
protected volatile Throwable disconnectCause = null;
- /** context where this cache was created -- for debugging, really... */
+ /**
+ * context where this cache was created -- for debugging, really...
+ */
public Exception creationStack = null;
/**
@@ -469,8 +498,6 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
/**
* DistributedLockService for PartitionedRegions. Remains null until the first PartitionedRegion is created. Destroyed
* by GemFireCache when closing the cache. Protected by synchronization on this GemFireCache.
- *
- * @guarded.By prLockServiceLock
*/
private DistributedLockService prLockService;
@@ -478,15 +505,14 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
* lock used to access prLockService
*/
private final Object prLockServiceLock = new Object();
-
+
/**
* DistributedLockService for GatewaySenders. Remains null until the
* first GatewaySender is created. Destroyed by GemFireCache when closing
* the cache.
- * @guarded.By gatewayLockServiceLock
*/
private volatile DistributedLockService gatewayLockService;
-
+
/**
* Lock used to access gatewayLockService
*/
@@ -497,11 +523,11 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
private final AtomicReference<BackupManager> backupManager = new AtomicReference<BackupManager>();
private HeapEvictor heapEvictor = null;
-
+
private OffHeapEvictor offHeapEvictor = null;
private final Object heapEvictorLock = new Object();
-
+
private final Object offHeapEvictorLock = new Object();
private ResourceEventsListener listener;
@@ -533,13 +559,15 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
private final TXEntryStateFactory txEntryStateFactory;
private final CacheConfig cacheConfig;
-
+
private final DiskStoreMonitor diskMonitor;
-
+
// Stores the properties used to initialize declarables.
private final Map<Declarable, Properties> declarablePropertiesMap = new ConcurrentHashMap<Declarable, Properties>();
- /** {@link PropertyResolver} to resolve ${} type property strings */
+ /**
+ * {@link PropertyResolver} to resolve ${} type property strings
+ */
protected static PropertyResolver resolver;
protected static boolean xmlParameterizationEnabled = !Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "xml.parameterization.disabled");
@@ -553,24 +581,25 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
* is specified
*/
private GemFireMemcachedServer memcachedServer;
-
+
/**
* Redis server is started when {@link DistributionConfig#getRedisPort()} is set
*/
private GeodeRedisServer redisServer;
-
+
/**
* {@link ExtensionPoint} support.
+ *
* @since GemFire 8.1
*/
private SimpleExtensionPoint<Cache> extensionPoint = new SimpleExtensionPoint<Cache>(this, this);
-
+
private final CqService cqService;
-
+
private final Set<RegionListener> regionListeners = new ConcurrentHashSet<RegionListener>();
-
+
private final Map<Class<? extends CacheService>, CacheService> services = new HashMap<Class<? extends CacheService>, CacheService>();
-
+
public static final int DEFAULT_CLIENT_FUNCTION_TIMEOUT = 0;
private static int clientFunctionTimeout;
@@ -589,25 +618,22 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
/**
- * Invokes mlockall(). Locks all pages mapped into the address space of the
- * calling process. This includes the pages of the code, data and stack segment,
- * as well as shared libraries, user space kernel data, shared memory, and
- * memory-mapped files. All mapped pages are guaranteed to be resident in RAM
- * when the call returns successfully; the pages are guaranteed to stay in RAM
+ * Invokes mlockall(). Locks all pages mapped into the address space of the
+ * calling process. This includes the pages of the code, data and stack segment,
+ * as well as shared libraries, user space kernel data, shared memory, and
+ * memory-mapped files. All mapped pages are guaranteed to be resident in RAM
+ * when the call returns successfully; the pages are guaranteed to stay in RAM
* until later unlocked.
- *
- * @param flags
- * MCL_CURRENT 1 - Lock all pages which are currently mapped into the
- * address space of the process.
- *
- * MCL_FUTURE 2 - Lock all pages which will become mapped into the address
- * space of the process in the future. These could be for instance new
- * pages required by a growing heap and stack as well as new memory mapped
- * files or shared memory regions.
- *
- * @return
- * 0 if success, non-zero if error and errno set
- *
+ *
+ * @param flags MCL_CURRENT 1 - Lock all pages which are currently mapped into the
+ * address space of the process.
+ * <p>
+ * MCL_FUTURE 2 - Lock all pages which will become mapped into the address
+ * space of the process in the future. These could be for instance new
+ * pages required by a growing heap and stack as well as new memory mapped
+ * files or shared memory regions.
+ *
+ * @return 0 if success, non-zero if error and errno set
*/
private static native int mlockall(int flags);
@@ -626,13 +652,11 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
int errno = Native.getLastError();
String msg = "mlockall failed: " + errno;
if (errno == 1 || errno == 12) { // EPERM || ENOMEM
- msg = "Unable to lock memory due to insufficient free space or privileges. "
- + "Please check the RLIMIT_MEMLOCK soft resource limit (ulimit -l) and "
- + "increase the available memory if needed";
+ msg = "Unable to lock memory due to insufficient free space or privileges. " + "Please check the RLIMIT_MEMLOCK soft resource limit (ulimit -l) and " + "increase the available memory if needed";
}
throw new IllegalStateException(msg);
}
-
+
/**
* This is for debugging cache-open issues (esp. {@link org.apache.geode.cache.CacheExistsException})
*/
@@ -669,28 +693,32 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
// ////////////////////// Constructors /////////////////////////
- /** Map of Futures used to track Regions that are being reinitialized */
+ /**
+ * Map of Futures used to track Regions that are being reinitialized
+ */
private final ConcurrentMap reinitializingRegions = new ConcurrentHashMap();
- /** Returns the last created instance of GemFireCache */
+ /**
+ * Returns the last created instance of GemFireCache
+ */
public static GemFireCacheImpl getInstance() {
return instance;
}
-
+
/* Used for testing, retain the old instance in the test and re-set the value when test completes*/
public static GemFireCacheImpl setInstanceForTests(GemFireCacheImpl cache) {
GemFireCacheImpl oldInstance = instance;
- instance = cache;
- return oldInstance;
+ instance = cache;
+ return oldInstance;
}
/**
* Returns an existing instance. If a cache does not exist
* throws a cache closed exception.
- *
+ *
* @return the existing cache
- * @throws CacheClosedException
- * if an existing cache can not be found.
+ *
+ * @throws CacheClosedException if an existing cache can not be found.
*/
public static final GemFireCacheImpl getExisting() {
final GemFireCacheImpl result = instance;
@@ -698,21 +726,19 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
return result;
}
if (result != null) {
- throw result.getCacheClosedException(LocalizedStrings
- .CacheFactory_THE_CACHE_HAS_BEEN_CLOSED.toLocalizedString(), null);
+ throw result.getCacheClosedException(LocalizedStrings.CacheFactory_THE_CACHE_HAS_BEEN_CLOSED.toLocalizedString(), null);
}
- throw new CacheClosedException(LocalizedStrings
- .CacheFactory_A_CACHE_HAS_NOT_YET_BEEN_CREATED.toLocalizedString());
+ throw new CacheClosedException(LocalizedStrings.CacheFactory_A_CACHE_HAS_NOT_YET_BEEN_CREATED.toLocalizedString());
}
/**
* Returns an existing instance. If a cache does not exist throws an exception.
*
- * @param reason
- * the reason an existing cache is being requested.
+ * @param reason the reason an existing cache is being requested.
+ *
* @return the existing cache
- * @throws CacheClosedException
- * if an existing cache can not be found.
+ *
+ * @throws CacheClosedException if an existing cache can not be found.
*/
public static GemFireCacheImpl getExisting(String reason) {
GemFireCacheImpl result = getInstance();
@@ -753,16 +779,12 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
public static GemFireCacheImpl createWithAsyncEventListeners(DistributedSystem system, CacheConfig cacheConfig, TypeRegistry typeRegistry) {
return basicCreate(system, true, cacheConfig, null, false, true, typeRegistry);
}
-
- public static Cache create(DistributedSystem system, boolean existingOk, CacheConfig cacheConfig) {
+
+ public static Cache create(DistributedSystem system, boolean existingOk, CacheConfig cacheConfig) {
return basicCreate(system, existingOk, cacheConfig, null, false, ASYNC_EVENT_LISTENERS, null);
}
- private static GemFireCacheImpl basicCreate(DistributedSystem system, boolean existingOk, CacheConfig cacheConfig, PoolFactory pf, boolean isClient, boolean asyncEventListeners, TypeRegistry typeRegistry)
- throws CacheExistsException, TimeoutException, CacheWriterException,
- GatewayException,
- RegionExistsException
- {
+ private static GemFireCacheImpl basicCreate(DistributedSystem system, boolean existingOk, CacheConfig cacheConfig, PoolFactory pf, boolean isClient, boolean asyncEventListeners, TypeRegistry typeRegistry) throws CacheExistsException, TimeoutException, CacheWriterException, GatewayException, RegionExistsException {
try {
synchronized (GemFireCacheImpl.class) {
GemFireCacheImpl instance = checkExistingCache(existingOk, cacheConfig);
@@ -799,6 +821,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
/**
* Creates a new instance of GemFireCache and populates it according to the <code>cache.xml</code>, if appropriate.
+ *
* @param typeRegistry: currently only unit tests set this parameter to a non-null value
*/
private GemFireCacheImpl(boolean isClient, PoolFactory pf, DistributedSystem system, CacheConfig cacheConfig, boolean asyncEventListeners, TypeRegistry typeRegistry) {
@@ -810,7 +833,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
// Synchronized to prevent a new cache from being created
// before an old one has finished closing
synchronized (GemFireCacheImpl.class) {
-
+
// start JTA transaction manager within this synchronized block
// to prevent race with cache close. fixes bug 43987
JNDIInvoker.mapTransactions(system);
@@ -834,13 +857,12 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
DM dm = this.system.getDistributionManager();
if (dm instanceof DistributionManager) {
if (((DistributionManager) dm).getDMType() == DistributionManager.ADMIN_ONLY_DM_TYPE) {
- throw new IllegalStateException(LocalizedStrings.GemFireCache_CANNOT_CREATE_A_CACHE_IN_AN_ADMINONLY_VM
- .toLocalizedString());
+ throw new IllegalStateException(LocalizedStrings.GemFireCache_CANNOT_CREATE_A_CACHE_IN_AN_ADMINONLY_VM.toLocalizedString());
}
}
this.rootRegions = new HashMap();
-
+
this.cqService = CqServiceProvider.create(this);
initReliableMessageQueueFactory();
@@ -857,7 +879,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
this.persistentMemberManager = new PersistentMemberManager();
if (asyncEventListeners) {
- final ThreadGroup group = LoggingThreadGroup.createThreadGroup("Message Event Threads",logger);
+ final ThreadGroup group = LoggingThreadGroup.createThreadGroup("Message Event Threads", logger);
ThreadFactory tf = new ThreadFactory() {
public Thread newThread(final Runnable command) {
final Runnable r = new Runnable() {
@@ -881,7 +903,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
this.resourceAdvisor = ResourceAdvisor.createResourceAdvisor(this);
// Initialize the advisor here, but wait to exchange profiles until cache is fully built
this.jmxAdvisor = JmxManagerAdvisor.createJmxManagerAdvisor(new JmxManagerAdvisee(this));
-
+
resourceManager = InternalResourceManager.createResourceManager(this);
this.serialNumber = DistributionAdvisor.createSerialNumber();
@@ -890,10 +912,10 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
/*
* Only bother creating an off-heap evictor if we have off-heap memory enabled.
*/
- if(null != getOffHeapStore()) {
+ if (null != getOffHeapStore()) {
getResourceManager().addResourceListener(ResourceType.OFFHEAP_MEMORY, getOffHeapEvictor());
}
-
+
recordedEventSweeper = EventTracker.startTrackerServices(this);
tombstoneService = TombstoneService.initialize(this);
@@ -925,13 +947,13 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
resolver = new CacheXmlPropertyResolver(false, PropertyResolver.NO_SYSTEM_PROPERTIES_OVERRIDE, null);
}
}
-
+
SystemFailure.signalCacheCreate();
-
+
diskMonitor = new DiskStoreMonitor();
} // synchronized
}
-
+
public boolean isRESTServiceRunning() {
return isRESTServiceRunning;
}
@@ -941,13 +963,14 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
/**
- * Used by Hydra tests to get handle of Rest Agent
+ * Used by Hydra tests to get handle of Rest Agent
+ *
* @return RestAgent
*/
public RestAgent getRestAgent() {
return restAgent;
}
-
+
/*****
* Request the shared configuration from the locator(s) which have the Cluster config service running
*/
@@ -955,20 +978,20 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
//Request the shared configuration from the locator(s)
final DistributionConfig config = this.system.getConfig();
- if (!(dm instanceof DistributionManager))
+ if (!(dm instanceof DistributionManager)) {
return null;
+ }
// do nothing if this vm is/has locator or this is a client
- if( ((DistributionManager)dm).getDMType() == DistributionManager.LOCATOR_DM_TYPE
- || isClient
- || Locator.getLocator() !=null )
+ if (((DistributionManager) dm).getDMType() == DistributionManager.LOCATOR_DM_TYPE || isClient || Locator.getLocator() != null) {
return null;
+ }
Map<InternalDistributedMember, Collection<String>> scl = this.getDistributionManager().getAllHostedLocatorsWithSharedConfiguration();
//If there are no locators with Shared configuration, that means the system has been started without shared configuration
//then do not make requests to the locators
- if(scl.isEmpty()) {
+ if (scl.isEmpty()) {
logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCache_NO_LOCATORS_FOUND_WITH_SHARED_CONFIGURATION));
return null;
}
@@ -985,10 +1008,10 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
logger.info(response.describeConfig());
Configuration clusterConfig = response.getRequestedConfiguration().get(SharedConfiguration.CLUSTER_CONFIG);
- Properties clusterSecProperties = (clusterConfig==null) ? new Properties():clusterConfig.getGemfireProperties();
+ Properties clusterSecProperties = (clusterConfig == null) ? new Properties() : clusterConfig.getGemfireProperties();
// If not using shared configuration, return null or throw an exception is locator is secured
- if(!config.getUseSharedConfiguration()){
+ if (!config.getUseSharedConfiguration()) {
if (clusterSecProperties.containsKey(ConfigurationProperties.SECURITY_MANAGER)) {
throw new GemFireConfigException(LocalizedStrings.GEMFIRE_CACHE_SECURITY_MISCONFIGURATION_2.toLocalizedString());
} else {
@@ -999,8 +1022,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
Properties serverSecProperties = config.getSecurityProps();
//check for possible mis-configuration
- if (isMisConfigured(clusterSecProperties, serverSecProperties, ConfigurationProperties.SECURITY_MANAGER)
- || isMisConfigured(clusterSecProperties, serverSecProperties, ConfigurationProperties.SECURITY_POST_PROCESSOR)) {
+ if (isMisConfigured(clusterSecProperties, serverSecProperties, ConfigurationProperties.SECURITY_MANAGER) || isMisConfigured(clusterSecProperties, serverSecProperties, ConfigurationProperties.SECURITY_POST_PROCESSOR)) {
throw new GemFireConfigException(LocalizedStrings.GEMFIRE_CACHE_SECURITY_MISCONFIGURATION.toLocalizedString());
}
return response;
@@ -1012,8 +1034,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
}
- public void deployJarsRecevedFromClusterConfiguration(ConfigurationResponse response){
- try{
+ public void deployJarsRecevedFromClusterConfiguration(ConfigurationResponse response) {
+ try {
ClusterConfigurationLoader.deployJarsReceivedFromClusterConfiguration(this, response);
} catch (IOException e) {
throw new GemFireConfigException(LocalizedStrings.GemFireCache_EXCEPTION_OCCURED_WHILE_DEPLOYING_JARS_FROM_SHARED_CONDFIGURATION.toLocalizedString(), e);
@@ -1024,17 +1046,19 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
// When called, clusterProps and serverProps and key could not be null
- public static boolean isMisConfigured(Properties clusterProps, Properties serverProps, String key){
+ public static boolean isMisConfigured(Properties clusterProps, Properties serverProps, String key) {
String clusterPropValue = clusterProps.getProperty(key);
String serverPropValue = serverProps.getProperty(key);
// if this server prop is not specified, this is always OK.
- if(StringUtils.isBlank(serverPropValue))
+ if (StringUtils.isBlank(serverPropValue)) {
return false;
+ }
// server props is not blank, but cluster props is blank, NOT OK.
- if(StringUtils.isBlank(clusterPropValue))
+ if (StringUtils.isBlank(clusterPropValue)) {
return true;
+ }
// at this point check for eqality
return !clusterPropValue.equals(serverPropValue);
@@ -1042,18 +1066,18 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
public List<String> getSharedConfigLocatorConnectionStringList() {
List<String> locatorConnectionStringList = new ArrayList<String>();
-
+
Map<InternalDistributedMember, Collection<String>> scl = this.getDistributionManager().getAllHostedLocatorsWithSharedConfiguration();
//If there are no locators with Shared configuration, that means the system has been started without shared configuration
//then do not make requests to the locators
if (!scl.isEmpty()) {
- Set<Entry<InternalDistributedMember, Collection<String>>> locs = scl.entrySet();
-
+ Set<Entry<InternalDistributedMember, Collection<String>>> locs = scl.entrySet();
+
for (Entry<InternalDistributedMember, Collection<String>> loc : locs) {
Collection<String> locStrings = loc.getValue();
Iterator<String> locStringIter = locStrings.iterator();
-
+
while (locStringIter.hasNext()) {
locatorConnectionStringList.add(locStringIter.next());
}
@@ -1061,11 +1085,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
return locatorConnectionStringList;
}
-
-
-
-
+
/**
* Used by unit tests to force cache creation to use a test generated cache.xml
*/
@@ -1073,6 +1094,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
/**
* @return true if cache is created using a ClientCacheFactory
+ *
* @see #hasPool()
*/
public boolean isClient() {
@@ -1091,7 +1113,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
private Collection<Pool> getAllPools() {
Collection<Pool> pools = PoolManagerImpl.getPMI().getMap().values();
- for (Iterator<Pool> itr = pools.iterator(); itr.hasNext();) {
+ for (Iterator<Pool> itr = pools.iterator(); itr.hasNext(); ) {
PoolImpl pool = (PoolImpl) itr.next();
if (pool.isUsedByGateway()) {
itr.remove();
@@ -1121,14 +1143,14 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
GemFireCacheImpl.instance = this;
GemFireCacheImpl.pdxInstance = this;
-
+
MinimumSystemRequirements.checkAndLog();
-
- for (Iterator<CacheLifecycleListener> iter = cacheLifecycleListeners.iterator(); iter.hasNext();) {
+
+ for (Iterator<CacheLifecycleListener> iter = cacheLifecycleListeners.iterator(); iter.hasNext(); ) {
CacheLifecycleListener listener = (CacheLifecycleListener) iter.next();
listener.cacheCreated(this);
}
-
+
ClassPathLoader.setLatestToDefault();
//request and check cluster configuration
@@ -1138,10 +1160,10 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
// apply the cluster's properties configuration and initialize security using that configuration
ClusterConfigurationLoader.applyClusterPropertiesConfiguration(this, configurationResponse, system.getConfig());
securityService.initSecurity(system.getConfig().getSecurityProps());
-
+
SystemMemberCacheEventProcessor.send(this, Operation.CACHE_CREATE);
this.resourceAdvisor.initializationGate();
-
+
//Register function that we need to execute to fetch available REST service endpoints in DS
FunctionService.registerFunction(new FindRestEnabledServersFunction());
@@ -1161,7 +1183,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
boolean completedCacheXml = false;
initializeServices();
-
+
try {
//Deploy all the jars from the deploy working dir.
new JarDeployer(this.system.getConfig().getDeployWorkingDir()).loadPreviouslyDeployedJars();
@@ -1179,19 +1201,18 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
}
}
-
+
this.clientpf = null;
-
+
startColocatedJmxManagerLocator();
-
+
startMemcachedServer();
-
+
startRedisServer();
-
+
startRestAgentServer(this);
- int time = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "CLIENT_FUNCTION_TIMEOUT",
- DEFAULT_CLIENT_FUNCTION_TIMEOUT);
+ int time = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "CLIENT_FUNCTION_TIMEOUT", DEFAULT_CLIENT_FUNCTION_TIMEOUT);
clientFunctionTimeout = time >= 0 ? time : DEFAULT_CLIENT_FUNCTION_TIMEOUT;
isInitialized = true;
@@ -1203,34 +1224,30 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
*/
private void initializeServices() {
ServiceLoader<CacheService> loader = ServiceLoader.load(CacheService.class);
- for(CacheService service : loader) {
+ for (CacheService service : loader) {
service.init(this);
this.services.put(service.getInterface(), service);
system.handleResourceEvent(ResourceEvent.CACHE_SERVICE_CREATE, service);
}
}
- private boolean isNotJmxManager(){
+ private boolean isNotJmxManager() {
return (this.system.getConfig().getJmxManagerStart() != true);
}
-
- private boolean isServerNode(){
- return (this.system.getDistributedMember().getVmKind() != DistributionManager.LOCATOR_DM_TYPE
- && this.system.getDistributedMember().getVmKind() != DistributionManager.ADMIN_ONLY_DM_TYPE
- && !isClient());
+
+ private boolean isServerNode() {
+ return (this.system.getDistributedMember().getVmKind() != DistributionManager.LOCATOR_DM_TYPE && this.system.getDistributedMember().getVmKind() != DistributionManager.ADMIN_ONLY_DM_TYPE && !isClient());
}
-
+
private void startRestAgentServer(GemFireCacheImpl cache) {
- if (this.system.getConfig().getStartDevRestApi()
- && isNotJmxManager()
- && isServerNode()) {
+ if (this.system.getConfig().getStartDevRestApi() && isNotJmxManager() && isServerNode()) {
this.restAgent = new RestAgent(this.system.getConfig());
restAgent.start(cache);
} else {
this.restAgent = null;
}
}
-
+
private void startMemcachedServer() {
int port = system.getConfig().getMemcachedPort();
if (port != 0) {
@@ -1239,28 +1256,24 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
String bindAddress = system.getConfig().getMemcachedBindAddress();
assert bindAddress != null;
if (bindAddress.equals(DistributionConfig.DEFAULT_MEMCACHED_BIND_ADDRESS)) {
- logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_STARTING_GEMFIRE_MEMCACHED_SERVER_ON_PORT_0_FOR_1_PROTOCOL,
- new Object[] { port, protocol }));
+ logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_STARTING_GEMFIRE_MEMCACHED_SERVER_ON_PORT_0_FOR_1_PROTOCOL, new Object[] { port, protocol }));
} else {
- logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_STARTING_GEMFIRE_MEMCACHED_SERVER_ON_BIND_ADDRESS_0_PORT_1_FOR_2_PROTOCOL,
- new Object[] { bindAddress, port, protocol }));
+ logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_STARTING_GEMFIRE_MEMCACHED_SERVER_ON_BIND_ADDRESS_0_PORT_1_FOR_2_PROTOCOL, new Object[] { bindAddress, port, protocol }));
}
this.memcachedServer = new GemFireMemcachedServer(bindAddress, port, Protocol.valueOf(protocol.toUpperCase()));
this.memcachedServer.start();
}
}
-
+
private void startRedisServer() {
int port = system.getConfig().getRedisPort();
if (port != 0) {
String bindAddress = system.getConfig().getRedisBindAddress();
assert bindAddress != null;
if (bindAddress.equals(DistributionConfig.DEFAULT_REDIS_BIND_ADDRESS)) {
- getLoggerI18n().info(LocalizedStrings.GemFireCacheImpl_STARTING_GEMFIRE_REDIS_SERVER_ON_PORT_0,
- new Object[] { port });
+ getLoggerI18n().info(LocalizedStrings.GemFireCacheImpl_STARTING_GEMFIRE_REDIS_SERVER_ON_PORT_0, new Object[] { port });
} else {
- getLoggerI18n().info(LocalizedStrings.GemFireCacheImpl_STARTING_GEMFIRE_REDIS_SERVER_ON_BIND_ADDRESS_0_PORT_1,
- new Object[] { bindAddress, port });
+ getLoggerI18n().info(LocalizedStrings.GemFireCacheImpl_STARTING_GEMFIRE_REDIS_SERVER_ON_BIND_ADDRESS_0_PORT_1, new Object[] { bindAddress, port });
}
this.redisServer = new GeodeRedisServer(bindAddress, port);
this.redisServer.start();
@@ -1293,17 +1306,15 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
try {
url = xmlFile.toURL();
} catch (IOException ex) {
- throw new CacheXmlException(
- LocalizedStrings.GemFireCache_COULD_NOT_CONVERT_XML_FILE_0_TO_AN_URL.toLocalizedString(xmlFile), ex);
+ throw new CacheXmlException(LocalizedStrings.GemFireCache_COULD_NOT_CONVERT_XML_FILE_0_TO_AN_URL.toLocalizedString(xmlFile), ex);
}
}
if (url == null) {
File defaultFile = DistributionConfig.DEFAULT_CACHE_XML_FILE;
if (!xmlFile.equals(defaultFile)) {
if (!xmlFile.exists()) {
- throw new CacheXmlException(LocalizedStrings.GemFireCache_DECLARATIVE_CACHE_XML_FILERESOURCE_0_DOES_NOT_EXIST
- .toLocalizedString(xmlFile));
- } else /* if (!xmlFile.isFile()) */{
+ throw new CacheXmlException(LocalizedStrings.GemFireCache_DECLARATIVE_CACHE_XML_FILERESOURCE_0_DOES_NOT_EXIST.toLocalizedString(xmlFile));
+ } else /* if (!xmlFile.isFile()) */ {
throw new CacheXmlException(LocalizedStrings.GemFireCache_DECLARATIVE_XML_FILE_0_IS_NOT_A_FILE.toLocalizedString(xmlFile));
}
}
@@ -1317,17 +1328,11 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
* given <code>DistributedSystem</code>. Note that this operation cannot be performed in the constructor because
* creating regions in the cache, etc. uses the cache itself (which isn't initialized until the constructor returns).
*
- * @throws CacheXmlException
- * If something goes wrong while parsing the declarative caching XML file.
- * @throws TimeoutException
- * If a {@link org.apache.geode.cache.Region#put(Object, Object)}times out while initializing the cache.
- * @throws CacheWriterException
- * If a <code>CacheWriterException</code> is thrown while initializing the cache.
- * @throws RegionExistsException
- * If the declarative caching XML file desribes a region that already exists (including the root region).
- * @throws GatewayException
- * If a <code>GatewayException</code> is thrown while initializing the cache.
- *
+ * @throws CacheXmlException If something goes wrong while parsing the declarative caching XML file.
+ * @throws TimeoutException If a {@link org.apache.geode.cache.Region#put(Object, Object)}times out while initializing the cache.
+ * @throws CacheWriterException If a <code>CacheWriterException</code> is thrown while initializing the cache.
+ * @throws RegionExistsException If the declarative caching XML file desribes a region that already exists (including the root region).
+ * @throws GatewayException If a <code>GatewayException</code> is thrown while initializing the cache.
* @see #loadCacheXml
*/
private void initializeDeclarativeCache() throws TimeoutException, CacheWriterException, GatewayException, RegionExistsException {
@@ -1362,12 +1367,10 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
} catch (IOException ignore) {
}
} catch (IOException ex) {
- throw new CacheXmlException(LocalizedStrings.GemFireCache_WHILE_OPENING_CACHE_XML_0_THE_FOLLOWING_ERROR_OCCURRED_1
- .toLocalizedString(new Object[] { url.toString(), ex }));
+ throw new CacheXmlException(LocalizedStrings.GemFireCache_WHILE_OPENING_CACHE_XML_0_THE_FOLLOWING_ERROR_OCCURRED_1.toLocalizedString(new Object[] { url.toString(), ex }));
} catch (CacheXmlException ex) {
- CacheXmlException newEx = new CacheXmlException(LocalizedStrings.GemFireCache_WHILE_READING_CACHE_XML_0_1
- .toLocalizedString(new Object[] { url, ex.getMessage() }));
+ CacheXmlException newEx = new CacheXmlException(LocalizedStrings.GemFireCache_WHILE_READING_CACHE_XML_0_1.toLocalizedString(new Object[] { url, ex.getMessage() }));
newEx.setStackTrace(ex.getStackTrace());
newEx.initCause(ex.getCause());
throw newEx;
@@ -1390,11 +1393,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
br.close();
} catch (IOException ignore) {
}
- logger.info(LocalizedMessage.create(
- LocalizedStrings.GemFireCache_INITIALIZING_CACHE_USING__0__1, new Object[]{url.toString(), sb.toString()}));
+ logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCache_INITIALIZING_CACHE_USING__0__1, new Object[] { url.toString(), sb.toString() }));
} else {
- logger.info(LocalizedMessage.create(
- LocalizedStrings.GemFireCache_INITIALIZING_CACHE_USING__0__1, new Object[] {"generated description from old cache", cacheXmlDescription}));
+ logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCache_INITIALIZING_CACHE_USING__0__1, new Object[] { "generated description from old cache", cacheXmlDescription }));
}
}
@@ -1509,12 +1510,16 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
return stopper;
}
- /** return true if the cache was closed due to being shunned by other members */
+ /**
+ * return true if the cache was closed due to being shunned by other members
+ */
public boolean forcedDisconnect() {
return this.forcedDisconnect || this.system.forcedDisconnect();
}
- /** return a CacheClosedException with the given reason */
+ /**
+ * return a CacheClosedException with the given reason
+ */
public CacheClosedException getCacheClosedException(String reason, Throwable cause) {
CacheClosedException result;
if (cause != null) {
@@ -1527,7 +1532,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
return result;
}
- /** if the cache was forcibly closed this exception will reflect the cause */
+ /**
+ * if the cache was forcibly closed this exception will reflect the cause
+ */
public Throwable getDisconnectCause() {
return this.disconnectCause;
}
@@ -1559,8 +1566,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
* @see SystemFailure#loadEmergencyClasses()
*/
static public void loadEmergencyClasses() {
- if (emergencyClassesLoaded)
+ if (emergencyClassesLoaded) {
return;
+ }
emergencyClassesLoaded = true;
InternalDistributedSystem.loadEmergencyClasses();
AcceptorImpl.loadEmergencyClasses();
@@ -1624,7 +1632,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
if (DEBUG) {
System.err.println("DEBUG: closing gateway hubs");
}
-
+
// These are synchronized sets -- avoid potential deadlocks
// instance.pathToRegion.clear(); // garbage collection
// instance.gatewayHubs.clear();
@@ -1729,101 +1737,98 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
pr.acquireDestroyLock();
acquiredLock = true;
- synchronized(pr.getRedundancyProvider()) {
- if (pr.isDataStore() && pr.getDataStore() != null && pr.getDataPolicy() == DataPolicy.PERSISTENT_PARTITION) {
- int numBuckets = pr.getTotalNumberOfBuckets();
- Map<InternalDistributedMember, PersistentMemberID> bucketMaps[] = new Map[numBuckets];
- PartitionedRegionDataStore prds = pr.getDataStore();
-
- // lock all the primary buckets
- Set<Entry<Integer, BucketRegion>> bucketEntries = prds.getAllLocalBuckets();
- for (Map.Entry e : bucketEntries) {
- BucketRegion br = (BucketRegion) e.getValue();
- if (br == null || br.isDestroyed) {
- // bucket region could be destroyed in race condition
- continue;
- }
- br.getBucketAdvisor().tryLockIfPrimary();
+ synchronized (pr.getRedundancyProvider()) {
+ if (pr.isDataStore() && pr.getDataStore() != null && pr.getDataPolicy() == DataPolicy.PERSISTENT_PARTITION) {
+ int numBuckets = pr.getTotalNumberOfBuckets();
+ Map<InternalDistributedMember, PersistentMemberID> bucketMaps[] = new Map[numBuckets];
+ PartitionedRegionDataStore prds = pr.getDataStore();
+
+ // lock all the primary buckets
+ Set<Entry<Integer, BucketRegion>> bucketEntries = prds.getAllLocalBuckets();
+ for (Map.Entry e : bucketEntries) {
+ BucketRegion br = (BucketRegion) e.getValue();
+ if (br == null || br.isDestroyed) {
+ // bucket region could be destroyed in race condition
+ continue;
+ }
+ br.getBucketAdvisor().tryLockIfPrimary();
- // get map <InternalDistriutedMemeber, persistentID> for this bucket's
- // remote members
- bucketMaps[br.getId()] = br.getBucketAdvisor().adviseInitializedPersistentMembers();
+ // get map <InternalDistriutedMemeber, persistentID> for this bucket's
+ // remote members
+ bucketMaps[br.getId()] = br.getBucketAdvisor().adviseInitializedPersistentMembers();
+ if (logger.isDebugEnabled()) {
+ logger.debug("shutDownAll: PR {}: initialized persistent members for {}:{}", pr.getName(), br.getId(), bucketMaps[br.getId()]);
+ }
+ }
if (logger.isDebugEnabled()) {
- logger.debug("shutDownAll: PR {}: initialized persistent members for {}:{}", pr.getName(), br.getId(), bucketMaps[br.getId()]);
+ logger.debug("shutDownAll: All buckets for PR {} are locked.", pr.getName());
}
- }
- if (logger.isDebugEnabled()) {
- logger.debug("shutDownAll: All buckets for PR {} are locked.", pr.getName());
- }
-
- // send lock profile update to other members
- pr.setShutDownAllStatus(PartitionedRegion.PRIMARY_BUCKETS_LOCKED);
- new UpdateAttributesProcessor(pr).distribute(false);
- pr.getRegionAdvisor().waitForProfileStatus(PartitionedRegion.PRIMARY_BUCKETS_LOCKED);
- if (logger.isDebugEnabled()) {
- logger.debug("shutDownAll: PR {}: all bucketlock profiles received.", pr.getName());
- }
- // if async write, do flush
- if (!pr.getAttributes().isDiskSynchronous()) {
- // several PRs might share the same diskstore, we will only flush once
- // even flush is called several times.
- pr.getDiskStore().forceFlush();
- // send flush profile update to other members
- pr.setShutDownAllStatus(PartitionedRegion.DISK_STORE_FLUSHED);
+ // send lock profile update to other members
+ pr.setShutDownAllStatus(PartitionedRegion.PRIMARY_BUCKETS_LOCKED);
new UpdateAttributesProcessor(pr).distribute(false);
- pr.getRegionAdvisor().waitForProfileStatus(PartitionedRegion.DISK_STORE_FLUSHED);
+ pr.getRegionAdvisor().waitForProfileStatus(PartitionedRegion.PRIMARY_BUCKETS_LOCKED);
if (logger.isDebugEnabled()) {
- logger.debug("shutDownAll: PR {}: all flush profiles received.", pr.getName());
- }
- } // async write
-
- // persist other members to OFFLINE_EQUAL for each bucket region
- // iterate through all the bucketMaps and exclude the items whose
- // idm is no longer online
- Set<InternalDistributedMember> membersToPersistOfflineEqual = pr.getRegionAdvisor().adviseDataStore();
- for (Map.Entry e : bucketEntries) {
- BucketRegion br = (BucketRegion) e.getValue();
- if (br == null || br.isDestroyed) {
- // bucket region could be destroyed in race condition
- continue;
+ logger.debug("shutDownAll: PR {}: all bucketlock profiles received.", pr.getName());
}
- Map<InternalDistributedMember, PersistentMemberID> persistMap = getSubMapForLiveMembers(pr, membersToPersistOfflineEqual,
- bucketMaps[br.getId()]);
- if (persistMap != null) {
- br.getPersistenceAdvisor().persistMembersOfflineAndEqual(persistMap);
+
+ // if async write, do flush
+ if (!pr.getAttributes().isDiskSynchronous()) {
+ // several PRs might share the same diskstore, we will only flush once
+ // even flush is called several times.
+ pr.getDiskStore().forceFlush();
+ // send flush profile update to other members
+ pr.setShutDownAllStatus(PartitionedRegion.DISK_STORE_FLUSHED);
+ new UpdateAttributesProcessor(pr).distribute(false);
+ pr.getRegionAdvisor().waitForProfileStatus(PartitionedRegion.DISK_STORE_FLUSHED);
if (logger.isDebugEnabled()) {
- logger.debug("shutDownAll: PR {}: pesisting bucket {}:{}", pr.getName(), br.getId(), persistMap);
+ logger.debug("shutDownAll: PR {}: all flush profiles received.", pr.getName());
+ }
+ } // async write
+
+ // persist other members to OFFLINE_EQUAL for each bucket region
+ // iterate through all the bucketMaps and exclude the items whose
+ // idm is no longer online
+ Set<InternalDistributedMember> membersToPersistOfflineEqual = pr.getRegionAdvisor().adviseDataStore();
+ for (Map.Entry e : bucketEntries) {
+ BucketRegion br = (BucketRegion) e.getValue();
+ if (br == null || br.isDestroyed) {
+ // bucket region could be destroyed in race condition
+ continue;
+ }
+ Map<InternalDistributedMember, PersistentMemberID> persistMap = getSubMapForLiveMembers(pr, membersToPersistOfflineEqual, bucketMaps[br.getId()]);
+ if (persistMap != null) {
+ br.getPersistenceAdvisor().persistMembersOfflineAndEqual(persistMap);
+ if (logger.isDebugEnabled()) {
+ logger.debug("shutDownAll: PR {}: pesisting bucket {}:{}", pr.getName(), br.getId(), persistMap);
+ }
}
}
- }
- // send persited profile update to other members, let all members to persist
- // before close the region
- pr.setShutDownAllStatus(PartitionedRegion.OFFLINE_EQUAL_PERSISTED);
- new UpdateAttributesProcessor(pr).distribute(false);
- pr.getRegionAdvisor().waitForProfileStatus(PartitionedRegion.OFFLINE_EQUAL_PERSISTED);
- if (logger.isDebugEnabled()) {
- logger.debug("shutDownAll: PR {}: all offline_equal profiles received.", pr.getName());
- }
- } // datastore
+ // send persited profile update to other members, let all members to persist
+ // before close the region
+ pr.setShutDownAllStatus(PartitionedRegion.OFFLINE_EQUAL_PERSISTED);
+ new UpdateAttributesProcessor(pr).distribute(false);
+ pr.getRegionAdvisor().waitForProfileStatus(PartitionedRegion.OFFLINE_EQUAL_PERSISTED);
+ if (logger.isDebugEnabled()) {
+ logger.debug("shutDownAll: PR {}: all offline_equal profiles received.", pr.getName());
+ }
+ } // datastore
- // after done all steps for buckets, close pr
- // close accessor directly
- RegionEventImpl event = new RegionEventImpl(pr, Operation.REGION_CLOSE, null, false, getMyId(), true);
- try {
- // not to acquire lock
- pr.basicDestroyRegion(event, false, false, true);
- } catch (CacheWriterException e) {
- // not possible with local operation, CacheWriter not called
- throw new Error(LocalizedStrings.LocalRegion_CACHEWRITEREXCEPTION_SHOULD_NOT_BE_THROWN_IN_LOCALDESTROYREGION
- .toLocalizedString(), e);
- } catch (TimeoutException e) {
- // not possible with local operation, no distributed locks possible
- throw new Error(LocalizedStrings.LocalRegion_TIMEOUTEXCEPTION_SHOULD_NOT_BE_THROWN_IN_LOCALDESTROYREGION
- .toLocalizedString(), e);
- }
- // pr.close();
+ // after done all steps for buckets, close pr
+ // close accessor directly
+ RegionEventImpl event = new RegionEventImpl(pr, Operation.REGION_CLOSE, null, false, getMyId(), true);
+ try {
+ // not to acquire lock
+ pr.basicDestroyRegion(event, false, false, true);
+ } catch (CacheWriterException e) {
+ // not possible with local operation, CacheWriter not called
+ throw new Error(LocalizedStrings.LocalRegion_CACHEWRITEREXCEPTION_SHOULD_NOT_BE_THROWN_IN_LOCALDESTROYREGION.toLocalizedString(), e);
+ } catch (TimeoutException e) {
+ // not possible with local operation, no distributed locks possible
+ throw new Error(LocalizedStrings.LocalRegion_TIMEOUTEXCEPTION_SHOULD_NOT_BE_THROWN_IN_LOCALDESTROYREGION.toLocalizedString(), e);
+ }
+ // pr.close();
} // synchronized
} catch (CacheClosedException cce) {
logger.debug("Encounter CacheClosedException when shutDownAll is closing PR: {}:{}", pr.getFullPath(), cce.getMessage());
@@ -1838,8 +1843,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
}
- private Map<InternalDistributedMember, PersistentMemberID> getSubMapForLiveMembers(PartitionedRegion pr,
- Set<InternalDistributedMember> membersToPersistOfflineEqual, Map<InternalDistributedMember, PersistentMemberID> bucketMap) {
+ private Map<InternalDistributedMember, PersistentMemberID> getSubMapForLiveMembers(PartitionedRegion pr, Set<InternalDistributedMember> membersToPersistOfflineEqual, Map<InternalDistributedMember, PersistentMemberID> bucketMap) {
if (bucketMap == null) {
return null;
}
@@ -1877,8 +1881,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
stopper.checkCancelInProgress(null);
if (this.prLockService == null) {
try {
- this.prLockService = DLockService.create(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME, getDistributedSystem(),
- true /* distributed */, true /* destroyOnDisconnect */, true /* automateFreeResources */);
+ this.prLockService = DLockService.create(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME, getDistributedSystem(), true /* distributed */, true /* destroyOnDisconnect */, true /* automateFreeResources */);
} catch (IllegalArgumentException e) {
this.prLockService = DistributedLockService.getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME);
if (this.prLockService == null) {
@@ -1892,6 +1895,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
/**
* Gets or lazily creates the GatewaySender distributed lock service.
+ *
* @return the GatewaySender distributed lock service
*/
public DistributedLockService getGatewaySenderLockService() {
@@ -1900,16 +1904,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
stopper.checkCancelInProgress(null);
if (this.gatewayLockService == null) {
try {
- this.gatewayLockService = DLockService.create(
- AbstractGatewaySender.LOCK_SERVICE_NAME,
- getDistributedSystem(),
- true /*distributed*/,
- true /*destroyOnDisconnect*/,
- true /*automateFreeResources*/);
- }
- catch (IllegalArgumentException e) {
- this.gatewayLockService = DistributedLockService.getServiceNamed(
- AbstractGatewaySender.LOCK_SERVICE_NAME);
+ this.gatewayLockService = DLockService.create(AbstractGatewaySender.LOCK_SERVICE_NAME, getDistributedSystem(), true /*distributed*/, true /*destroyOnDisconnect*/, true /*automateFreeResources*/);
+ } catch (IllegalArgumentException e) {
+ this.gatewayLockService = DistributedLockService.getServiceNamed(AbstractGatewaySender.LOCK_SERVICE_NAME);
if (this.gatewayLockService == null) {
throw e; // AbstractGatewaySender.LOCK_SERVICE_NAME must be illegal!
}
@@ -1937,12 +1934,10 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
* the cache. Caller must be synchronized on this GemFireCache.
*/
private void destroyGatewaySenderLockService() {
- if (DistributedLockService
- .getServiceNamed(AbstractGatewaySender.LOCK_SERVICE_NAME) != null) {
+ if (DistributedLockService.getServiceNamed(AbstractGatewaySender.LOCK_SERVICE_NAME) != null) {
try {
DistributedLockService.destroy(AbstractGatewaySender.LOCK_SERVICE_NAME);
- }
- catch (IllegalArgumentException e) {
+ } catch (IllegalArgumentException e) {
// DistributedSystem.disconnect may have already destroyed the DLS
}
}
@@ -1965,9 +1960,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
this.offHeapEvictor = new OffHeapEvictor(this);
}
return this.offHeapEvictor;
- }
+ }
}
-
+
public PersistentMemberManager getPersistentMemberManager() {
return persistentMemberManager;
}
@@ -1978,8 +1973,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
if (this.clientMetadatService == null) {
this.clientMetadatService = new ClientMetadataService(this);
}
- return this.clientMetadatService;
}
+ return this.clientMetadatService;
}
private final boolean DISABLE_DISCONNECT_DS_ON_CACHE_CLOSE = Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "DISABLE_DISCONNECT_DS_ON_CACHE_CLOSE");
@@ -1987,12 +1982,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
/**
* close the cache
*
- * @param reason
- * the reason the cache is being closed
- * @param systemFailureCause
- * whether this member was ejected from the distributed system
- * @param keepalive
- * whoever added this should javadoc it
+ * @param reason the reason the cache is being closed
+ * @param systemFailureCause whether this member was ejected from the distributed system
+ * @param keepalive whoever added this should javadoc it
*/
public void close(String reason, Throwable systemFailureCause, boolean keepalive) {
close(reason, systemFailureCause, keepalive, false);
@@ -2062,7 +2054,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
this.resourceAdvisor.close();
} catch (CancelException e) {
// ignore
- }
+ }
try {
this.jmxAdvisor.close();
} catch (CancelException e) {
@@ -2098,11 +2090,11 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
* IMPORTANT: any operation during shut down that can time out (create a CancelException) must be inside of this
* try block. If all else fails, we *must* ensure that the cache gets closed!
*/
- try {
+ try {
this.stopServers();
stopMemcachedServer();
-
+
stopRedisServer();
// no need to track PR instances since we won't create any more
@@ -2138,8 +2130,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
LocalRegion prRoot = null;
-
- for (Iterator itr = rootRegionValues.iterator(); itr.hasNext();) {
+
+ for (Iterator itr = rootRegionValues.iterator(); itr.hasNext(); ) {
LocalRegion lr = (LocalRegion) itr.next();
if (isDebugEnabled) {
logger.debug("{}: processing region {}", this, lr.getFullPath());
@@ -2147,7 +2139,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
if (PartitionedRegionHelper.PR_ROOT_REGION_NAME.equals(lr.getName())) {
prRoot = lr;
} else {
- if(lr.getName().contains(ParallelGatewaySenderQueue.QSTRING)){
+ if (lr.getName().contains(ParallelGatewaySenderQueue.QSTRING)) {
continue; //this region will be closed internally by parent region
}
if (isDebugEnabled) {
@@ -2157,8 +2149,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
lr.handleCacheClose(op);
} catch (Exception e) {
if (isDebugEnabled || !forcedDisconnect) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCache_0_ERROR_CLOSING_REGION_1,
- new Object[] { this, lr.getFullPath() }), e);
+ logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCache_0_ERROR_CLOSING_REGION_1, new Object[] { this, lr.getFullPath() }), e);
}
}
}
@@ -2174,15 +2165,14 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
prRoot.handleCacheClose(op);
}
} catch (CancelException e) {
- logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCache_0_ERROR_IN_LAST_STAGE_OF_PARTITIONEDREGION_CACHE_CLOSE,
- this), e);
+ logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCache_0_ERROR_IN_LAST_STAGE_OF_PARTITIONEDREGION_CACHE_CLOSE, this), e);
}
destroyPartitionedRegionLockService();
}
closeDiskStores();
diskMonitor.close();
-
+
// Close the CqService Handle.
try {
if (isDebugEnabled) {
@@ -2309,7 +2299,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
if (this.queryMonitor != null) {
this.queryMonitor.stopMonitoring();
}
- stopDiskStoreTaskPool();
+ stopDiskStoreTaskPool();
} finally {
// NO DISTRIBUTED MESSAGING CAN BE DONE HERE!
@@ -2324,7 +2314,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
// Added to close the TransactionManager's cleanup thread
TransactionManagerImpl.refresh();
-
+
if (!keepDS) {
// keepDS is used by ShutdownAll. It will override DISABLE_DISCONNECT_DS_ON_CACHE_CLOSE
if (!DISABLE_DISCONNECT_DS_ON_CACHE_CLOSE) {
@@ -2334,20 +2324,20 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
TypeRegistry.close();
// do this late to prevent 43412
TypeRegistry.setPdxSerializer(null);
-
- for (Iterator iter = cacheLifecycleListeners.iterator(); iter.hasNext();) {
+
+ for (Iterator iter = cacheLifecycleListeners.iterator(); iter.hasNext(); ) {
CacheLifecycleListener listener = (CacheLifecycleListener) iter.next();
listener.cacheClosed(this);
- }
+ }
stopRestAgentServer();
// Fix for #49856
SequenceLoggerImpl.signalCacheClose();
SystemFailure.signalCacheClose();
-
+
} // static synchronization on GemFireCache.class
}
-
+
// see Cache.isReconnecting()
public boolean isReconnecting() {
return this.system.isReconnecting();
@@ -2355,7 +2345,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
// see Cache.waitUntilReconnected(long, TimeUnit)
public boolean waitUntilReconnected(long time, TimeUnit units) throws InterruptedException {
- boolean systemReconnected = this.system.waitUntilReconnected(time, units);
+ boolean systemReconnected = this.system.waitUntilReconnected(time, units);
if (!systemReconnected) {
return false;
}
@@ -2365,12 +2355,12 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
return true;
}
-
+
// see Cache.stopReconnecting()
public void stopReconnecting() {
this.system.stopReconnecting();
}
-
+
// see Cache.getReconnectedCache()
public Cache getReconnectedCache() {
GemFireCacheImpl c = GemFireCacheImpl.getInstance();
@@ -2382,21 +2372,20 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
private void stopMemcachedServer() {
if (this.memcachedServer != null) {
- logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_MEMCACHED_SERVER_ON_PORT_0_IS_SHUTTING_DOWN,
- new Object[] { this.system.getConfig().getMemcachedPort() }));
+ logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_MEMCACHED_SERVER_ON_PORT_0_IS_SHUTTING_DOWN, new Object[] { this.system.getConfig().getMemcachedPort() }));
this.memcachedServer.shutdown();
}
}
-
+
private void stopRedisServer() {
- if (redisServer != null)
+ if (redisServer != null) {
this.redisServer.shutdown();
+ }
}
-
+
private void stopRestAgentServer() {
- if ( this.restAgent != null) {
- logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_REST_SERVER_ON_PORT_0_IS_SHUTTING_DOWN,
- new Object[] { this.system.getConfig().getHttpServicePort() }));
+ if (this.restAgent != null) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_REST_SERVER_ON_PORT_0_IS_SHUTTING_DOWN, new Object[] { this.system.getConfig().getHttpServicePort() }));
this.restAgent.stop();
}
}
@@ -2437,14 +2426,12 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
};*/
final ThreadFactory compactThreadFactory = GemfireCacheHelper.CreateThreadFactory(compactThreadGroup, "Idle OplogCompactor");
- this.diskStoreTaskPool = new ThreadPoolExecutor(MAXT, MAXT, 1, TimeUnit.SECONDS,
- new LinkedBlockingQueue(),
- compactThreadFactory);
+ this.diskStoreTaskPool = new ThreadPoolExecutor(MAXT, MAXT, 1, TimeUnit.SECONDS, new LinkedBlockingQueue(), compactThreadFactory);
}
private final ConcurrentMap<String, DiskStoreImpl> diskStores = new ConcurrentHashMap<String, DiskStoreImpl>();
private final ConcurrentMap<String, DiskStoreImpl> regionOwnedDiskStores = new ConcurrentHashMap<String, DiskStoreImpl>();
-
+
public void addDiskStore(DiskStoreImpl dsi) {
this.diskStores.put(dsi.getName(), dsi);
if (!dsi.isOffline()) {
@@ -2456,8 +2443,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
this.diskStores.remove(dsi.getName());
this.regionOwnedDiskStores.remove(dsi.getName());
/** Added for M&M **/
- if(!dsi.getOwnedByRegion())
- system.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, dsi);
+ if (!dsi.getOwnedByRegion()) {
+ system.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, dsi);
+ }
}
public void addRegionOwnedDiskStore(DiskStoreImpl dsi) {
@@ -2607,7 +2595,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
public int stopGatewaySenders(boolean byShutdownAll) {
final boolean isDebugEnabled = logger.isDebugEnabled();
-
+
int cnt = 0;
closingGatewaySendersByShutdownAll = byShutdownAll;
synchronized (allGatewaySendersLock) {
@@ -2620,7 +2608,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
try {
sender.stop();
- advisor = ((AbstractGatewaySender)sender).getSenderAdvisor();
+ advisor = ((AbstractGatewaySender) sender).getSenderAdvisor();
if (advisor != null) {
if (isDebugEnabled) {
logger.debug("Stopping the GatewaySender advisor");
@@ -2628,8 +2616,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
advisor.close();
}
cnt++;
- }
- catch (CancelException e) {
+ } catch (CancelException e) {
if (isDebugEnabled) {
logger.debug("Ignored cache closure while closing sender {}", sender, e);
}
@@ -2638,13 +2625,13 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
} // synchronized
destroyGatewaySenderLockService();
-
+
if (isDebugEnabled) {
logger.debug("{}: finished stopping {} gateway sender(s), total is {}", this, cnt, allGatewaySenders.size());
}
return cnt;
}
-
+
public int stopGatewayReceivers(boolean byShutdownAll) {
int cnt = 0;
closingGatewayReceiversByShutdownAll = byShutdownAll;
@@ -2658,8 +2645,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
try {
receiver.stop();
cnt++;
- }
- catch (CancelException e) {
+ } catch (CancelException e) {
if (logger.isDebugEnabled()) {
logger.debug("Ignored cache closure while closing receiver {}", receiver, e);
}
@@ -2676,7 +2662,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
void stopServers() {
final boolean isDebugEnabled = logger.isDebugEnabled();
-
+
if (isDebugEnabled) {
logger.debug("{}: stopping cache servers...", this);
}
@@ -2701,7 +2687,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
// now that all the cache servers have stopped empty the static pool of commBuffers it might have used.
ServerConnection.emptyCommBufferPool();
}
-
+
// stop HA services if they had been started
if (isDebugEnabled) {
logger.debug("{}: stopping HA services...", this);
@@ -2816,11 +2802,11 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
public LogWriterI18n getLoggerI18n() {
return this.system.getInternalLogWriter();
}
-
+
public LogWriterI18n getSecurityLoggerI18n() {
return this.system.getSecurityInternalLogWriter();
}
-
+
public InternalLogWriter getInternalLogWriter() {
return this.system.getInternalLogWriter();
}
@@ -2848,8 +2834,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
/**
* Get the list of all instances of properties for Declarables with the given class name.
- *
+ *
* @param className Class name of the declarable
+ *
* @return List of all instances of properties found for the given declarable
*/
public List<Properties> getDeclarableProperties(final String className) {
@@ -2863,17 +2850,18 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
return propertiesList;
}
-
+
/**
* Get the properties for the given declarable.
- *
+ *
* @param declarable The declarable
+ *
* @return Properties found for the given declarable
*/
public Properties getDeclarableProperties(final Declarable declarable) {
return this.declarablePropertiesMap.get(declarable);
}
-
+
/**
* Returns the date and time that this cache was created.
*
@@ -2896,7 +2884,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
* All entry and region operations should be using this time rather than
* System.currentTimeMillis(). Specially all version stamps/tags must be
* populated with this timestamp.
- *
+ *
* @return distributed cache time.
*/
@Override
@@ -3058,8 +3046,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
public Region basicCreateRegion(String name, RegionAttributes attrs) throws RegionExistsException, TimeoutException {
try {
- InternalRegionArguments ira = new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false)
- .setSnapshotInputStream(null).setImageTarget(null);
+ InternalRegionArguments ira = new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false).setSnapshotInputStream(null).setImageTarget(null);
if (attrs instanceof UserSpecifiedRegionAttributes) {
ira.setIndexes(((UserSpecifiedRegionAttributes) attrs).getIndexes());
@@ -3073,8 +3060,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
}
- public <K, V> Region<K, V> createVMRegion(String name, RegionAttributes<K, V> p_attrs, InternalRegionArguments internalRegionArgs)
- throws RegionExistsException, TimeoutException, IOException, ClassNotFoundException {
+ public <K, V> Region<K, V> createVMRegion(String name, RegionAttributes<K, V> p_attrs, InternalRegionArguments internalRegionArgs) throws RegionExistsException, TimeoutException, IOException, ClassNotFoundException {
if (getMyId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
if (!internalRegionArgs.isUsedForMetaRegion() && internalRegionArgs.getInternalMetaRegion() == null) {
throw new IllegalStateException("Regions can not be created in a locator.");
@@ -3087,7 +3073,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
if (attrs == null) {
throw new IllegalArgumentException(LocalizedStrings.GemFireCache_ATTRIBUTES_MUST_NOT_BE_NULL.toLocalizedString());
}
-
+
LocalRegion rgn = null;
// final boolean getDestroyLock = attrs.getDestroyLockFlag();
final InputStream snapshotInputStream = internalRegionArgs.getSnapshotInputStream();
@@ -3100,7 +3086,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
final String regionPath = LocalRegion.calcFullPath(name, null);
try {
- for (;;) {
+ for (; ; ) {
getCancelCriterion().checkCancelInProgress(null);
Future future = null;
@@ -3157,11 +3143,12 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
} catch (CancellationException e) {
// future was cancelled
} finally {
- if (interrupted)
+ if (interrupted) {
Thread.currentThread().interrupt();
+ }
}
} // for
-
+
boolean success = false;
try {
setRegionByPath(rgn.getFullPath(), rgn);
@@ -3174,8 +3161,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
// don't log this
throw e;
} catch (final RuntimeException validationException) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.GemFireCache_INITIALIZATION_FAILED_FOR_REGION_0, rgn.getFullPath()), validationException);
+ logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCache_INITIALIZATION_FAILED_FOR_REGION_0, rgn.getFullPath()), validationException);
throw validationException;
} finally {
if (!success) {
@@ -3189,10 +3175,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
} catch (Throwable t) {
SystemFailure.checkFailure();
stopper.checkCancelInProgress(t);
-
+
// bug #44672 - log the failure but don't override the original exception
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.GemFireCache_INIT_CLEANUP_FAILED_FOR_REGION_0, rgn.getFullPath()), t);
+ logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCache_INIT_CLEANUP_FAILED_FOR_REGION_0, rgn.getFullPath()), t);
} finally {
// clean up if initialize fails for any reason
@@ -3207,8 +3192,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
} // success
}
-
-
+
rgn.postCreateRegion();
} catch (RegionExistsException ex) {
// outside of sync make sure region is initialized to fix bug 37563
@@ -3228,16 +3212,15 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
return rgn;
}
- public RegionAttributes invokeRegionBefore(LocalRegion parent,
- String name, RegionAttributes attrs, InternalRegionArguments internalRegionArgs) {
- for(RegionListener listener : regionListeners) {
+ public RegionAttributes invokeRegionBefore(LocalRegion parent, String name, RegionAttributes attrs, InternalRegionArguments internalRegionArgs) {
+ for (RegionListener listener : regionListeners) {
attrs = listener.beforeCreate(parent, name, attrs, internalRegionArgs);
}
return attrs;
}
-
+
public void invokeRegionAfter(LocalRegion region) {
- for(RegionListener listener : regionListeners) {
+ for (RegionListener listener : regionListeners) {
listener.afterCreate(region);
}
}
@@ -3279,8 +3262,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
synchronized (this.rootRegions) {
for (Object r : this.rootRegions.values()) {
LocalRegion rgn = (LocalRegion) r;
- if (rgn.isSecret() || rgn.isUsedForMetaRegion() || rgn instanceof HARegion || rgn.isUsedForPartitionedRegionAdmin()
- || rgn.isInternalRegion()/* rgn.isUsedForPartitionedRegionBucket() */) {
+ if (rgn.isSecret() || rgn.isUsedForMetaRegion() || rgn instanceof HARegion || rgn.isUsedForPartitionedRegionAdmin() || rgn.isInternalRegion()/* rgn.isUsedForPartitionedRegionBucket() */) {
continue; // Skip administrative PartitionedRegions
}
result.add(rgn);
@@ -3299,8 +3281,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
/**
- * @throws IllegalArgumentException
- * if path is not valid
+ * @throws IllegalArgumentException if path is not valid
*/
private static void validatePath(String path) {
if (path == null) {
@@ -3338,8 +3319,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
LocalRegion root;
synchronized (this.rootRegions) {
root = (LocalRegion) this.rootRegions.get(pathParts[0]);
- if (root == null)
+ if (root == null) {
return null;
+ }
}
if (logger.isDebugEnabled()) {
logger.debug("GemFireCache.getRegion, calling getSubregion on root({}): {}", pathParts[0], pathParts[1]);
@@ -3353,8 +3335,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
/**
- * @param returnDestroyedRegion
- * if true, okay to return a destroyed region
+ * @param returnDestroyedRegion if true, okay to return a destroyed region
*/
public Region getRegion(String path, boolean returnDestroyedRegion) {
stopper.checkCancelInProgress(null);
@@ -3396,8 +3377,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
/**
- * @param returnDestroyedRegion
- * if true, okay to return a destroyed partitioned region
+ * @param returnDestroyedRegion if true, okay to return a destroyed partitioned region
*/
public final Region getPartitionedRegion(String path, boolean returnDestroyedRegion) {
stopper.checkCancelInProgress(null);
@@ -3412,7 +3392,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
}
}
-
+
String[] pathParts = parsePath(path);
LocalRegion root;
LogWriterI18n logger = getLoggerI18n();
@@ -3441,7 +3421,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
}
- /** Return true if this region is initializing */
+ /**
+ * Return true if this region is initializing
+ */
boolean isGlobalRegionInitializing(String fullPath) {
stopper.checkCancelInProgress(null);
int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.ANY_INIT); // go through
@@ -3454,7 +3436,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
}
- /** Return true if this region is initializing */
+ /**
+ * Return true if this region is initializing
+ */
boolean isGlobalRegionInitializing(LocalRegion region) {
boolean result = region != null && region.scope.isGlobal() && !region.isInitialized();
if (result) {
@@ -3477,18 +3461,17 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
stopper.checkCancelInProgress(null);
Set regions = new HashSet();
synchronized (this.rootRegions) {
- for (Iterator itr = this.rootRegions.values().iterator(); itr.hasNext();) {
+ for (Iterator itr = this.rootRegions.values().iterator(); itr.hasNext(); ) {
LocalRegion r = (LocalRegion) itr.next();
// If this is an internal meta-region, don't return it to end user
- if (r.isSecret() || r.isUsedForMetaRegion() || r instanceof HARegion || !includePRAdminRegions
- && (r.isUsedForPartitionedRegionAdmin() || r.isUsedForPartitionedRegionBucket())) {
+ if (r.isSecret() || r.isUsedForMetaRegion() || r instanceof HARegion || !includePRAdminRegions && (r.isUsedForPartitionedRegionAdmin() || r.isUsedForPartitionedRegionBucket())) {
continue; // Skip administrative PartitionedRegions
}
regions.add(r);
}
}
if (waitForInit) {
- for (Iterator r = regions.iterator(); r.hasNext();) {
+ for (Iterator r = regions.iterator(); r.hasNext(); ) {
LocalRegion lr = (LocalRegion) r.next();
// lr.waitOnInitialization();
if (!lr.checkForInitialization()) {
@@ -3506,8 +3489,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
*/
public void cleanupForClient(CacheClientNotifier ccn, ClientProxyMembershipID client) {
try {
- if (isClosed())
+ if (isClosed()) {
return;
+ }
Iterator it = rootRegions(false, false).iterator();
while (it.hasNext()) {
LocalRegion lr = (LocalRegion) it.next();
@@ -3571,8 +3555,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
stopper.checkCancelInProgress(null);
if (seconds < 0) {
- throw new IllegalArgumentException(
- LocalizedStrings.GemFireCache_THE_MESSAGESYNCINTERVAL_PROPERTY_FOR_CACHE_CANNOT_BE_NEGATIVE.toLocalizedString());
+ throw new IllegalArgumentException(LocalizedStrings.GemFireCache_THE_MESSAGESYNCINTERVAL_PROPERTY_FOR_CACHE_CANNOT_BE_NEGATIVE.toLocalizedString());
}
HARegionQueue.setMessageSyncInterval(seconds);
}
@@ -3609,29 +3592,25 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
/**
* Register the specified region name as reinitializing, creating and adding a Future for it to the map.
*
- * @throws IllegalStateException
- * if there is already a region by that name registered.
+ * @throws IllegalStateException if there is already a region by that name registered.
*/
void regionReinitializing(String fullPath) {
Object old = this.reinitializingRegions.putIfAbsent(fullPath, new FutureResult(this.stopper));
if (old != null) {
- throw new IllegalStateException(LocalizedStrings.GemFireCache_FOUND_AN_EXISTING_REINITALIZING_REGION_NAMED_0
- .toLocalizedString(fullPath));
+ throw new IllegalStateException(LocalizedStrings.GemFireCache_FOUND_AN_EXISTING_REINITALIZING_REGION_NAMED_0.toLocalizedString(fullPath));
}
}
/**
* Set the reinitialized region and unregister it as reinitializing.
*
- * @throws IllegalStateException
- * if there is no region by that name registered as reinitializing.
+ * @throws IllegalStateException if there is no region by that name registered as reinitializing.
*/
void regionReinitialized(Region region) {
String regionName = region.getFullPath();
FutureResult future = (FutureResult) this.reinitializingRegions.get(regionName);
if (future == null) {
- throw new IllegalStateException(LocalizedStrings.GemFireCache_COULD_NOT_FIND_A_REINITIALIZING_REGION_NAMED_0
- .toLocalizedString(regionName));
+ throw new IllegalStateException(LocalizedStrings.GemFireCache_COULD_NOT_FIND_A_REINITIALIZING_REGION_NAMED_0.toLocalizedString(regionName));
}
future.set(region);
unregisterReinitializingRegion(regionName);
@@ -3640,11 +3619,11 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
/**
* Clear a reinitializing region, e.g. reinitialization failed.
*
- * @throws IllegalStateException
- * if cannot find reinitializing region registered by that name.
+ * @throws IllegalStateException if cannot find reinitializing region registered by that name.
*/
void unregisterReinitializingRegion(String fullPath) {
- /* Object previous = */this.reinitializingRegions.remove(fullPath);
+ /* Object previous = */
+ this.reinitializingRegions.remove(fullPath);
// if (previous == null) {
// throw new IllegalStateException("Could not find a reinitializing region
// named " +
@@ -3684,8 +3663,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
/**
* Remove the specified root region
*
- * @param rootRgn
- * the region to be removed
+ * @param rootRgn the region to be removed
+ *
* @return true if root region was removed, false if not found
*/
boolean removeRoot(LocalRegion rootRgn) {
@@ -3696,14 +3675,15 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
LocalRegion previous = (LocalRegion) this.rootRegions.remove(rgnName);
Assert.assertTrue(previous == rootRgn);
return true;
- } else
+ } else {
return false;
+ }
}
}
/**
* @return array of two Strings, the root name and the relative path from root If there is no relative path from root,
- * then String[1] will be an empty string
+ * then String[1] will be an empty string
*/
static String[] parsePath(String p_path) {
String path = p_path;
@@ -3743,15 +3723,15 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
return cacheLifecycleListeners.remove(l);
}
}
-
- public void addRegionListener(RegionListener l ) {
+
+ public void addRegionListener(RegionListener l) {
this.regionListeners.add(l);
}
-
- public void removeRegionListener(RegionListener l ) {
+
+ public void removeRegionListener(RegionListener l) {
this.regionListeners.remove(l);
}
-
+
@SuppressWarnings("unchecked")
public <T extends CacheService> T getService(Class<T> clazz) {
return (T) services.get(clazz);
@@ -3770,7 +3750,6 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
/**
* @see CacheClientProxy
- * @guarded.By {@link #ccpTimerMutex}
*/
private SystemTimer ccpTimer;
@@ -3841,7 +3820,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
sendAddCacheServerProfileMessage();
return bridge;
}
-
+
public void addGatewaySender(GatewaySender sender) {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
@@ -3859,8 +3838,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
tmp.add(sender);
this.allGatewaySenders = Collections.unmodifiableSet(tmp);
} else {
- throw new IllegalStateException(LocalizedStrings.GemFireCache_A_GATEWAYSENDER_WITH_ID_0_IS_ALREADY_DEFINED_IN_THIS_CACHE
- .toLocalizedString(sender.getId()));
+ throw new IllegalStateException(LocalizedStrings.GemFireCache_A_GATEWAYSENDER_WITH_ID_0_IS_ALREADY_DEFINED_IN_THIS_CACHE.toLocalizedString(sender.getId()));
}
}
@@ -3874,12 +3852,12 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
}
- if(!sender.isParallel()) {
- Region dynamicMetaRegion = getRegion(DynamicRegionFactory.dynamicRegionListName);
- if(dynamicMetaRegion == null) {
- if(logger.isDebugEnabled()) {
- logger.debug(" The dynamic region is null. ");
- }
+ if (!sender.isParallel()) {
+ Region dynamicMetaRegion = getRegion(DynamicRegionFactory.dynamicRegionListName);
+ if (dynamicMetaRegion == null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(" The dynamic region is null. ");
+ }
} else {
dynamicMetaRegion.getAttributesMutator().addGatewaySenderId(sender.getId());
}
@@ -3888,12 +3866,12 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_CREATE, sender);
}
}
-
+
public void removeGatewaySender(GatewaySender sender) {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
}
-
+
stopper.checkCancelInProgress(null);
synchronized (allGatewaySendersLock) {
@@ -3926,22 +3904,21 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
public void addAsyncEventQueue(AsyncEventQueueImpl asyncQueue) {
this.allAsyncEventQueues.add(asyncQueue);
- if(!asyncQueue.
<TRUNCATED>