You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/04/05 19:39:14 UTC
[6/8] incubator-geode git commit: GEODE-920: lazily create
HaContainer for cache server
GEODE-920: lazily create HaContainer for cache server
The CacheClientNotifier instance might have been created by gateway receicver.
Then not to create the HaContainer until a cache server starts and amend
the CacheClientNotifier instance by initialize the HaContainer.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/49e3f523
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/49e3f523
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/49e3f523
Branch: refs/heads/feature/GEODE-1162
Commit: 49e3f523d16389f5e84847c6dcfd6ab45427f8c2
Parents: 066a9d5
Author: zhouxh <gz...@pivotal.io>
Authored: Fri Mar 25 10:04:35 2016 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Mon Apr 4 15:48:57 2016 -0700
----------------------------------------------------------------------
.../internal/cache/InitialImageOperation.java | 15 +-
.../gemfire/internal/cache/LocalRegion.java | 3 +
.../internal/cache/ha/HAContainerRegion.java | 5 +
.../cache/tier/sockets/CacheClientNotifier.java | 58 +++--
.../cache/wan/CacheClientNotifierDUnitTest.java | 225 +++++++++++++++++++
5 files changed, 279 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/49e3f523/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
index 9bd3faf..a72ca8e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
@@ -2179,9 +2179,11 @@ public class InitialImageOperation {
if (logger.isDebugEnabled()) {
try {
CacheClientNotifier ccn = CacheClientNotifier.getInstance();
- CacheClientProxy proxy = ((HAContainerWrapper)ccn.getHaContainer()).getProxy(
- region.getName());
- logger.debug("Processing FilterInfo for proxy: {} : {}", proxy, msg);
+ if (ccn != null && ccn.getHaContainer() != null) {
+ CacheClientProxy proxy = ((HAContainerWrapper)ccn.getHaContainer()).getProxy(
+ region.getName());
+ logger.debug("Processing FilterInfo for proxy: {} : {}", proxy, msg);
+ }
} catch (Exception ex) {
// Ignore.
}
@@ -3705,8 +3707,13 @@ public class InitialImageOperation {
*/
public void registerFilters(LocalRegion region) {
CacheClientNotifier ccn = CacheClientNotifier.getInstance();
+
CacheClientProxy proxy;
try {
+ if (ccn == null || ccn.getHaContainer() == null) {
+ logger.info("Found null cache client notifier. Failed to register Filters during HARegion GII. Region :{}", region.getName());
+ return;
+ }
proxy = ((HAContainerWrapper)ccn.getHaContainer()).getProxy(
region.getName());
} catch (Exception ex) {
@@ -3716,7 +3723,7 @@ public class InitialImageOperation {
}
if (proxy == null) {
- logger.info("Found null client proxy. Failed to register Filters during HARegion GII. Region :{}", region.getName());
+ logger.info("Found null client proxy. Failed to register Filters during HARegion GII. Region :{}, HaContainer :{}", region.getName(), ccn.getHaContainer());
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/49e3f523/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index 8e30a7a..3ff48bb 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -4576,6 +4576,9 @@ public class LocalRegion extends AbstractRegion
public void refreshEntriesFromServerKeys(Connection con, List serverKeys,
InterestResultPolicy pol)
{
+ if (serverKeys == null) {
+ return;
+ }
ServerRegionProxy proxy = getServerProxy();
if (logger.isDebugEnabled()) {
logKeys(serverKeys, pol);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/49e3f523/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerRegion.java
index 3bed769..fe0c112 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerRegion.java
@@ -55,6 +55,11 @@ public class HAContainerRegion implements HAContainerWrapper {
}
}
+ public Region getMapForTest() {
+ Region region = (Region)map;
+ return region;
+ }
+
public Object putProxy(String haName, CacheClientProxy proxy) {
return haRegionNameToProxy.put(haName, proxy);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/49e3f523/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
index 6ac4690..3178b8d 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -162,6 +162,12 @@ public class CacheClientNotifier {
messageTimeToLive, transactionTimeToLive,
listener, overflowAttributesList, isGatewayReceiver);
}
+
+ if (!isGatewayReceiver && ccnSingleton.getHaContainer() == null) {
+ // Gateway receiver might have create CCN instance without HaContainer
+ // In this case, the HaContainer should be lazily created here
+ ccnSingleton.initHaContainer(overflowAttributesList);
+ }
// else {
// ccnSingleton.acceptorStats = acceptorStats;
// ccnSingleton.maximumMessageCount = maximumMessageCount;
@@ -1671,9 +1677,11 @@ public class CacheClientNotifier {
if (noActiveServer() && ccnSingleton != null){
ccnSingleton = null;
- haContainer.cleanUp();
- if (isDebugEnabled) {
- logger.debug("haContainer ({}) is now cleaned up.", haContainer.getName());
+ if (haContainer != null) {
+ haContainer.cleanUp();
+ if (isDebugEnabled) {
+ logger.debug("haContainer ({}) is now cleaned up.", haContainer.getName());
+ }
}
this.clearCompiledQueries();
blackListedClients.clear();
@@ -2147,25 +2155,6 @@ public class CacheClientNotifier {
// Set the security LogWriter
this.securityLogWriter = (InternalLogWriter)cache.getSecurityLogger();
- // Create the overflow artifacts
- if (overflowAttributesList != null
- && !HARegionQueue.HA_EVICTION_POLICY_NONE.equals(overflowAttributesList
- .get(0))) {
- haContainer = new HAContainerRegion(cache.getRegion(Region.SEPARATOR
- + CacheServerImpl.clientMessagesRegion((GemFireCacheImpl)cache,
- (String)overflowAttributesList.get(0),
- ((Integer)overflowAttributesList.get(1)).intValue(),
- ((Integer)overflowAttributesList.get(2)).intValue(),
- (String)overflowAttributesList.get(3),
- (Boolean)overflowAttributesList.get(4))));
- }
- else {
- haContainer = new HAContainerMap(new HashMap());
- }
- if (logger.isDebugEnabled()) {
- logger.debug("ha container ({}) has been created.", haContainer.getName());
- }
-
this.maximumMessageCount = maximumMessageCount;
this.messageTimeToLive = messageTimeToLive;
this.transactionTimeToLive = transactionTimeToLive;
@@ -2608,7 +2597,7 @@ public class CacheClientNotifier {
* (in case of eviction policy "none"). In both the cases, it'll store
* HAEventWrapper as its key and ClientUpdateMessage as its value.
*/
- private final HAContainerWrapper haContainer;
+ private HAContainerWrapper haContainer;
// /**
// * The singleton <code>CacheClientNotifier</code> instance
@@ -2691,6 +2680,29 @@ public class CacheClientNotifier {
public Map getHaContainer() {
return haContainer;
}
+
+ public void initHaContainer(List overflowAttributesList) {
+ // lazily initialize haContainer in case this CCN instance was created by a gateway receiver
+ if (overflowAttributesList != null
+ && !HARegionQueue.HA_EVICTION_POLICY_NONE.equals(overflowAttributesList
+ .get(0))) {
+ haContainer = new HAContainerRegion(_cache.getRegion(Region.SEPARATOR
+ + CacheServerImpl.clientMessagesRegion((GemFireCacheImpl)_cache,
+ (String)overflowAttributesList.get(0),
+ ((Integer)overflowAttributesList.get(1)).intValue(),
+ ((Integer)overflowAttributesList.get(2)).intValue(),
+ (String)overflowAttributesList.get(3),
+ (Boolean)overflowAttributesList.get(4))));
+ }
+ else {
+ haContainer = new HAContainerMap(new HashMap());
+ }
+ assert haContainer != null;
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("ha container ({}) has been created.", haContainer.getName());
+ }
+ }
private final Set blackListedClients = new CopyOnWriteArraySet();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/49e3f523/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java
new file mode 100755
index 0000000..9557f0d
--- /dev/null
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.EvictionAttributes;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.server.ClientSubscriptionConfig;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.cache.CacheServerImpl;
+import com.gemstone.gemfire.internal.cache.UserSpecifiedRegionAttributes;
+import com.gemstone.gemfire.internal.cache.ha.HAContainerRegion;
+import com.gemstone.gemfire.internal.cache.ha.HAContainerWrapper;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
+import com.gemstone.gemfire.internal.cache.xmlcache.RegionAttributesCreation;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+
+public class CacheClientNotifierDUnitTest extends WANTestBase {
+ private static final int NUM_KEYS = 10;
+
+ public CacheClientNotifierDUnitTest(String name) {
+ super(name);
+ // TODO Auto-generated constructor stub
+ }
+
+ private int createCacheServerWithCSC(VM vm, final boolean withCSC, final int capacity,
+ final String policy, final String diskStoreName) {
+ final int serverPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+
+ SerializableRunnable createCacheServer = new SerializableRunnable() {
+ @Override
+ public void run() throws Exception {
+ CacheServerImpl server = (CacheServerImpl)cache.addCacheServer();
+ server.setPort(serverPort);
+ if (withCSC) {
+ if (diskStoreName != null) {
+ DiskStore ds = cache.findDiskStore(diskStoreName);
+ if(ds == null) {
+ ds = cache.createDiskStoreFactory().create(diskStoreName);
+ }
+ }
+ ClientSubscriptionConfig csc = server.getClientSubscriptionConfig();
+ csc.setCapacity(capacity);
+ csc.setEvictionPolicy(policy);
+ csc.setDiskStoreName(diskStoreName);
+ server.setHostnameForClients("localhost");
+ //server.setGroups(new String[]{"serv"});
+ }
+ try {
+ server.start();
+ } catch (IOException e) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start server ", e);
+ }
+ }
+ };
+ vm.invoke(createCacheServer);
+ return serverPort;
+ }
+
+ private void checkCacheServer(VM vm, final int serverPort, final boolean withCSC, final int capacity) {
+ SerializableRunnable checkCacheServer = new SerializableRunnable() {
+
+ @Override
+ public void run() throws Exception {
+ List<CacheServer> cacheServers = ((GemFireCacheImpl)cache).getCacheServersAndGatewayReceiver();
+ CacheServerImpl server = null;
+ for (CacheServer cs:cacheServers) {
+ if (cs.getPort() == serverPort) {
+ server = (CacheServerImpl)cs;
+ break;
+ }
+ }
+ assertNotNull(server);
+ CacheClientNotifier ccn = server.getAcceptor().getCacheClientNotifier();
+ HAContainerRegion haContainer = (HAContainerRegion)ccn.getHaContainer();
+ if (server.getAcceptor().isGatewayReceiver()) {
+ assertNull(haContainer);
+ return;
+ }
+ Region internalRegion = haContainer.getMapForTest();
+ RegionAttributes ra = internalRegion.getAttributes();
+ EvictionAttributes ea = ra.getEvictionAttributes();
+ if (withCSC) {
+ assertNotNull(ea);
+ assertEquals(capacity, ea.getMaximum());
+ assertEquals(EvictionAction.OVERFLOW_TO_DISK, ea.getAction());
+ } else {
+ assertNull(ea);
+ }
+ }
+ };
+ vm.invoke(checkCacheServer);
+ }
+
+ private void closeCacheServer(VM vm, final int serverPort) {
+ SerializableRunnable stopCacheServer = new SerializableRunnable() {
+
+ @Override
+ public void run() throws Exception {
+ List<CacheServer> cacheServers = cache.getCacheServers();
+ CacheServerImpl server = null;
+ for (CacheServer cs:cacheServers) {
+ if (cs.getPort() == serverPort) {
+ server = (CacheServerImpl)cs;
+ break;
+ }
+ }
+ assertNotNull(server);
+ server.stop();
+ }
+ };
+ vm.invoke(stopCacheServer);
+ }
+
+ private void verifyRegionSize(VM vm, final int expect) {
+ SerializableRunnable verifyRegionSize = new SerializableRunnable() {
+ @Override
+ public void run() throws Exception {
+ final Region region = cache.getRegion(getTestMethodName() + "_PR");
+
+ Wait.waitForCriterion(new WaitCriterion() {
+ public boolean done() {
+ return region.size() == expect;
+ }
+ public String description() {
+ return null;
+ }
+ }, 60000, 100, false);
+ assertEquals(expect, region.size());
+ }
+ };
+ vm.invoke(verifyRegionSize);
+ }
+
+ /*
+ * The test will start several cache servers, including gateway receivers.
+ * Shutdown them and verify the CacheClientNofifier for each server is correct
+ */
+ public void testMultipleCacheServer() throws Exception {
+ /* test senario: */
+ /* create 1 GatewaySender on vm0 */
+ /* create 1 GatewayReceiver on vm1 */
+ /* create 2 cache servers on vm1, one with overflow. */
+ /* verify if the cache server2 still has the overflow attributes */
+ /* create 1 cache client1 on vm2 to register interest on cache server1 */
+ /* create 1 cache client2 on vm3 to register interest on cache server1 */
+ /* do some puts to GatewaySender on vm0 */
+
+ // create sender at ln
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+
+ // create recever and cache servers will be at ny
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+ vm1.invoke(() -> WANTestBase.createCache( nyPort ));
+ int receiverPort = vm1.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ checkCacheServer(vm1, receiverPort, false, 0);
+
+ // create PR for receiver
+ vm1.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+ // create cache server1 with overflow
+ int serverPort = createCacheServerWithCSC(vm1, true, 3, "entry", "DEFAULT");
+ checkCacheServer(vm1, serverPort, true, 3);
+
+ // create cache server 2
+ final int serverPort2 = createCacheServerWithCSC(vm1, false, 0, null, null);
+ // Currently, only the first cache server's overflow attributes will take effect
+ // It will be enhanced in GEODE-1102
+ checkCacheServer(vm1, serverPort2, true, 3);
+ LogService.getLogger().info("receiverPort="+receiverPort+",serverPort="+serverPort+",serverPort2="+serverPort2);
+
+ vm2.invoke(() -> WANTestBase.createClientWithLocator(nyPort, "localhost", getTestMethodName() + "_PR" ));
+ vm3.invoke(() -> WANTestBase.createClientWithLocator(nyPort, "localhost", getTestMethodName() + "_PR" ));
+
+ vm0.invoke(() -> WANTestBase.createCache( lnPort ));
+ vm0.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 400, false, false, null, true ));
+ vm0.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+ vm0.invoke(() -> WANTestBase.startSender( "ln" ));
+ vm0.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", NUM_KEYS ));
+
+ /* verify */
+ verifyRegionSize(vm0, NUM_KEYS);
+ verifyRegionSize(vm1, NUM_KEYS);
+ verifyRegionSize(vm2, NUM_KEYS);
+ verifyRegionSize(vm3, NUM_KEYS);
+
+ // close a cache server, then re-test
+ closeCacheServer(vm1, serverPort2);
+
+ vm0.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", NUM_KEYS*2 ));
+
+ /* verify */
+ verifyRegionSize(vm0, NUM_KEYS*2);
+ verifyRegionSize(vm1, NUM_KEYS*2);
+ verifyRegionSize(vm2, NUM_KEYS*2);
+ verifyRegionSize(vm3, NUM_KEYS*2);
+ }
+
+}