You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/05/11 18:01:32 UTC
incubator-geode git commit: Fix up HADispatcherDUnitTest
Repository: incubator-geode
Updated Branches:
refs/heads/feature/GEODE-1376 c3906859a -> fde0c8cd9
Fix up HADispatcherDUnitTest
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/fde0c8cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/fde0c8cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/fde0c8cd
Branch: refs/heads/feature/GEODE-1376
Commit: fde0c8cd91cfeb58034c1281b153df6f5fb48cf7
Parents: c390685
Author: Kirk Lund <kl...@apache.org>
Authored: Wed May 11 11:01:17 2016 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Wed May 11 11:01:17 2016 -0700
----------------------------------------------------------------------
.../cache/ha/HADispatcherDUnitTest.java | 608 ++++++-------------
1 file changed, 193 insertions(+), 415 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fde0c8cd/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/ha/HADispatcherDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/ha/HADispatcherDUnitTest.java b/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/ha/HADispatcherDUnitTest.java
index aaf8b6f..fd7c559 100755
--- a/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/ha/HADispatcherDUnitTest.java
+++ b/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/ha/HADispatcherDUnitTest.java
@@ -16,11 +16,20 @@
*/
package com.gemstone.gemfire.internal.cache.ha;
-import hydra.Log;
-
+import static com.gemstone.gemfire.internal.AvailablePort.*;
+import static com.gemstone.gemfire.test.dunit.Assert.*;
+import static com.gemstone.gemfire.test.dunit.Host.*;
+import static com.gemstone.gemfire.test.dunit.LogWriterUtils.*;
+import static com.gemstone.gemfire.test.dunit.NetworkUtils.*;
+import static com.gemstone.gemfire.test.dunit.Wait.*;
+
+import java.io.IOException;
import java.util.Iterator;
import java.util.Properties;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheException;
@@ -36,16 +45,18 @@ import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.client.internal.PoolImpl;
import com.gemstone.gemfire.cache.query.CqAttributes;
import com.gemstone.gemfire.cache.query.CqAttributesFactory;
+import com.gemstone.gemfire.cache.query.CqException;
+import com.gemstone.gemfire.cache.query.CqExistsException;
import com.gemstone.gemfire.cache.query.CqListener;
import com.gemstone.gemfire.cache.query.CqQuery;
import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.RegionNotFoundException;
import com.gemstone.gemfire.cache.query.cq.dunit.CqQueryTestListener;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
-import com.gemstone.gemfire.cache30.ClientServerTestCase;
import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.cache30.ClientServerTestCase;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.cache.CacheServerImpl;
import com.gemstone.gemfire.internal.cache.Conflatable;
import com.gemstone.gemfire.internal.cache.HARegion;
@@ -54,13 +65,10 @@ import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerTestUtil;
import com.gemstone.gemfire.internal.cache.tier.sockets.ConflationDUnitTest;
import com.gemstone.gemfire.internal.cache.tier.sockets.HAEventWrapper;
-import com.gemstone.gemfire.test.dunit.DistributedTestCase;
-import com.gemstone.gemfire.test.dunit.Host;
-import com.gemstone.gemfire.test.dunit.LogWriterUtils;
-import com.gemstone.gemfire.test.dunit.NetworkUtils;
import com.gemstone.gemfire.test.dunit.VM;
-import com.gemstone.gemfire.test.dunit.Wait;
import com.gemstone.gemfire.test.dunit.WaitCriterion;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
/**
* This Dunit test is to verify that when the dispatcher of CS dispatches the
@@ -76,118 +84,136 @@ import com.gemstone.gemfire.test.dunit.WaitCriterion;
* 7. Again the entry in the regionque of client2 on server2.It should not be present.
* 8. close client1 and client2
* 9. close server1 and server2
- *
*/
-
-public class HADispatcherDUnitTest extends DistributedTestCase
-{
-
- VM server1 = null;
-
- VM server2 = null;
-
- VM client1 = null;
-
- VM client2 = null;
-
- public int PORT1;
-
- public int PORT2;
-
- private static final String REGION_NAME = "HADispatcherDUnitTest_region";
-
- protected static Cache cache = null;
-
- public static final Object dummyObj = "dummyObject";
-
- static volatile boolean isObjectPresent = false;
-
- final static String KEY1 = "KEY1";
-
- final static String VALUE1 = "VALUE1";
-
- final static String KEY2 = "KEY2";
-
- final static String VALUE2 = "VALUE2";
-
- static volatile boolean waitFlag = true;
-
- public HADispatcherDUnitTest(String name) {
- super(name);
- }
+@Category(DistributedTest.class)
+public class HADispatcherDUnitTest extends JUnit4DistributedTestCase {
+
+ private static final String REGION_NAME = HADispatcherDUnitTest.class.getSimpleName() + "_region";
+ private static final Object dummyObj = "dummyObject";
+ private static final String KEY1 = "KEY1";
+ private static final String VALUE1 = "VALUE1";
+ private static final String KEY2 = "KEY2";
+ private static final String VALUE2 = "VALUE2";
+
+ private static Cache cache = null;
+ private static volatile boolean isObjectPresent = false;
+ private static volatile boolean waitFlag = true;
+
+ private VM server1 = null;
+ private VM server2 = null;
+ private VM client1 = null;
+ private VM client2 = null;
+ private int PORT1;
+ private int PORT2;
@Override
public final void postSetUp() throws Exception {
- final Host host = Host.getHost(0);
-
+ String serverHostName = getServerHostName(getHost(0));
+
// Server1 VM
- server1 = host.getVM(0);
+ server1 = getHost(0).getVM(0);
// Server2 VM
- server2 = host.getVM(1);
+ server2 = getHost(0).getVM(1);
// Client 1 VM
- client1 = host.getVM(2);
+ client1 = getHost(0).getVM(2);
// client 2 VM
- client2 = host.getVM(3);
+ client2 = getHost(0).getVM(3);
+
+ PORT1 = ((Integer) server1.invoke(() -> createServerCache(new Boolean(false)))).intValue();
- PORT1 = ((Integer)server1.invoke(() -> HADispatcherDUnitTest.createServerCache( new Boolean(false) ))).intValue();
server1.invoke(() -> ConflationDUnitTest.setIsSlowStart());
- server1.invoke(() -> HADispatcherDUnitTest.makeDispatcherSlow());
- server1.invoke(() -> HADispatcherDUnitTest.setQRMslow());
- PORT2 = ((Integer)server2.invoke(() -> HADispatcherDUnitTest.createServerCache( new Boolean(true) ))).intValue();
+ server1.invoke(() -> makeDispatcherSlow());
+ server1.invoke(() -> setQRMslow());
+
+ PORT2 = ((Integer) server2.invoke(() -> createServerCache(new Boolean(true)))).intValue();
client1.invoke(() -> CacheServerTestUtil.disableShufflingOfEndpoints());
client2.invoke(() -> CacheServerTestUtil.disableShufflingOfEndpoints());
- client1.invoke(() -> HADispatcherDUnitTest.createClientCache(
- NetworkUtils.getServerHostName(host),
- new Integer(PORT1), new Integer(PORT2),
- new Boolean(false) ));
- client2.invoke(() -> HADispatcherDUnitTest.createClientCache(
- NetworkUtils.getServerHostName(host),
- new Integer(PORT1), new Integer(PORT2),
- new Boolean(true) ));
+ client1.invoke(() -> createClientCache(serverHostName, new Integer(PORT1), new Integer(PORT2), new Boolean(false)));
+ client2.invoke(() -> createClientCache(serverHostName, new Integer(PORT1), new Integer(PORT2), new Boolean(true)));
}
@Override
public final void preTearDown() throws Exception {
- client1.invoke(() -> HADispatcherDUnitTest.closeCache());
- client2.invoke(() -> HADispatcherDUnitTest.closeCache());
+ client1.invoke(() -> closeCache());
+ client2.invoke(() -> closeCache());
// close server
- server1.invoke(() -> HADispatcherDUnitTest.resetQRMslow());
- server1.invoke(() -> HADispatcherDUnitTest.closeCache());
- server2.invoke(() -> HADispatcherDUnitTest.closeCache());
+ server1.invoke(() -> resetQRMslow());
+ server1.invoke(() -> closeCache());
+ server2.invoke(() -> closeCache());
+ }
+
+ @Test
+ public void testDispatcher() throws Exception {
+ clientPut(client1, KEY1, VALUE1);
+ // Waiting in the client2 till it receives the event for the key.
+ checkFromClient(client2);
+
+ // performing check in the regionqueue of the server2
+ checkFromServer(server2, KEY1);
+
+ // For CQ Only.
+ // performing put from the client1
+ clientPut(client1, KEY2, VALUE2);
+ checkFromClient(client2);
+ checkFromServer(server2, KEY2);
+
+ getLogWriter().info("testDispatcher() completed successfully");
}
- public static void closeCache()
- {
+ /**
+ * This is to test the serialization mechanism of ClientUpdateMessage.
+ * Added after CQ support.
+ * This could be done in different way, by overflowing the HARegion queue.
+ */
+ @Test
+ public void testClientUpdateMessageSerialization() throws Exception {
+ // Update Value.
+ clientPut(client1, KEY1, VALUE1);
+ getLogWriter().fine(">>>>>>>> after clientPut(c1, k1, v1)");
+ // Waiting in the client2 till it receives the event for the key.
+ checkFromClient(client2);
+ getLogWriter().fine("after checkFromClient(c2)");
+
+ // performing check in the regionqueue of the server2
+ checkFromServer(server2, KEY1);
+ getLogWriter().fine("after checkFromServer(s2, k1)");
+
+ // UPDATE.
+ clientPut(client1, KEY1, VALUE1);
+ getLogWriter().fine("after clientPut 2 (c1, k1, v1)");
+ // Waiting in the client2 till it receives the event for the key.
+ checkFromClient(client2);
+ getLogWriter().fine("after checkFromClient 2 (c2)");
+
+ // performing check in the regionqueue of the server2
+ checkFromServer(server2, KEY1);
+ getLogWriter().fine("after checkFromServer 2 (s2, k1)");
+
+ getLogWriter().info("testClientUpdateMessageSerialization() completed successfully");
+ }
+
+ private void closeCache() {
if (cache != null && !cache.isClosed()) {
cache.close();
cache.getDistributedSystem().disconnect();
}
}
- public static void setQRMslow()
- {
+ private void setQRMslow() throws InterruptedException {
int oldMessageSyncInterval = cache.getMessageSyncInterval();
cache.setMessageSyncInterval(6);
- try {
- Thread.sleep((oldMessageSyncInterval + 1)*1000);
- }
- catch (InterruptedException e) {
- fail("Unexcepted InterruptedException Occurred");
- }
+ Thread.sleep((oldMessageSyncInterval + 1) * 1000);
}
- public static void resetQRMslow()
- {
+ private void resetQRMslow() {
cache.setMessageSyncInterval(HARegionQueue.DEFAULT_MESSAGE_SYNC_INTERVAL);
}
-
- public static void makeDispatcherSlow()
- {
+ private void makeDispatcherSlow() {
System.setProperty("slowStartTimeForTesting", "5000");
}
@@ -195,8 +221,7 @@ public class HADispatcherDUnitTest extends DistributedTestCase
// performing put from the client1
vm.invoke(new CacheSerializableRunnable("putFromClient") {
@Override
- public void run2() throws CacheException
- {
+ public void run2() throws CacheException {
Region region = cache.getRegion(Region.SEPARATOR + REGION_NAME);
assertNotNull(region);
region.put(key, value);
@@ -208,18 +233,16 @@ public class HADispatcherDUnitTest extends DistributedTestCase
// Waiting in the client till it receives the event for the key.
vm.invoke(new CacheSerializableRunnable("checkFromClient") {
@Override
- public void run2() throws CacheException
- {
+ public void run2() throws CacheException {
Region region = cache.getRegion(Region.SEPARATOR + REGION_NAME);
assertNotNull(region);
- cache.getLogger().fine("starting the wait");
+ cache.getLogger().fine("starting the wait");
synchronized (dummyObj) {
while (waitFlag) {
try {
dummyObj.wait(30000);
- }
- catch (InterruptedException e) {
- fail("interrupted");
+ } catch (InterruptedException e) {
+ fail("interrupted", e);
}
}
}
@@ -231,117 +254,41 @@ public class HADispatcherDUnitTest extends DistributedTestCase
}
private void checkFromServer(VM vm, final Object key) {
- // Thread.sleep(10000); // why sleep if the invoke will retry?
- // performing check in the regionqueue of the server2
vm.invoke(new CacheSerializableRunnable("checkFromServer") {
@Override
- public void run2() throws CacheException
- {
+ public void run2() throws CacheException {
Iterator iter = cache.getCacheServers().iterator();
- CacheServerImpl server = (CacheServerImpl)iter.next();
- Iterator iter_prox = server.getAcceptor().getCacheClientNotifier()
- .getClientProxies().iterator();
+ CacheServerImpl server = (CacheServerImpl) iter.next();
+ Iterator iter_prox = server.getAcceptor().getCacheClientNotifier().getClientProxies().iterator();
isObjectPresent = false;
+
while (iter_prox.hasNext()) {
- final CacheClientProxy proxy = (CacheClientProxy)iter_prox.next();
-// ClientProxyMembershipID proxyID = proxy.getProxyID();
-/* Conflict from CQ branch ------------------------------------------------------
- Region regionqueue = cache.getRegion(Region.SEPARATOR
- + HARegionQueue.createRegionName(proxyID.toString()));
- assertNotNull(regionqueue);
- Iterator itr = regionqueue.values().iterator();
- while (itr.hasNext()) {
- Object obj = itr.next();
- if (obj
- .getClass()
- .getName()
- .equals(
- "com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessage")) {
- Conflatable confObj = (Conflatable)obj;
- Log.getLogWriter().info("value of the object ");
- if (key.equals(confObj.getKeyToConflate()))
- isObjectPresent = true;
--------------------------------------------------------------------------------*/
- //HARegion region = (HARegion)cache.getRegion(Region.SEPARATOR
- // + HAHelper.getRegionQueueName(proxyID.toString()));
- //assertNotNull(region);
- HARegion region = (HARegion) proxy.getHARegion();
- assertNotNull(region);
- final HARegionQueue regionQueue = region.getOwner();
-
- WaitCriterion wc = new WaitCriterion() {
-
- public boolean done() {
- int sz = regionQueue.size();
- cache.getLogger().fine("regionQueue.size()::"+ sz);
- return sz == 0 || !proxy.isConnected();
- }
+ final CacheClientProxy proxy = (CacheClientProxy) iter_prox.next();
+ HARegion region = (HARegion) proxy.getHARegion();
+ assertNotNull(region);
+ final HARegionQueue regionQueue = region.getOwner();
+
+ WaitCriterion wc = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ int sz = regionQueue.size();
+ cache.getLogger().fine("regionQueue.size()::" + sz);
+ return sz == 0 || !proxy.isConnected();
+ }
+ @Override
+ public String description() {
+ return "regionQueue not empty with size " + regionQueue.size() + " for proxy " + proxy;
+ }
+ };
+ waitForCriterion(wc, 60 * 1000, 1000, true);
- public String description() {
- return "regionQueue not empty with size " + regionQueue.size()
- + " for proxy " + proxy;
- }
- };
- Wait.waitForCriterion(wc, 60 * 1000, 1000, true);
- cache.getLogger().fine("processed a proxy");
+ cache.getLogger().fine("processed a proxy");
}
}
});
- }
-
- public void testDispatcher() throws Exception
- {
- clientPut(client1, KEY1, VALUE1);
- // Waiting in the client2 till it receives the event for the key.
- checkFromClient(client2);
-
- // performing check in the regionqueue of the server2
- checkFromServer(server2, KEY1);
-
- // For CQ Only.
- // performing put from the client1
- clientPut(client1, KEY2, VALUE2);
- checkFromClient(client2);
- checkFromServer(server2, KEY2);
-
- Log.getLogWriter().info("testDispatcher() completed successfully");
}
- /*
- * This is to test the serialization mechanism of ClientUpdateMessage.
- * Added after CQ support.
- * This could be done in different way, by overflowing the HARegion queue.
- *
- */
- public void /*test*/ClientUpdateMessageSerialization() throws Exception
- {
- // Update Value.
- clientPut(client1, KEY1, VALUE1);
- Log.getLogWriter().fine(">>>>>>>> after clientPut(c1, k1, v1)");
- // Waiting in the client2 till it receives the event for the key.
- checkFromClient(client2);
- Log.getLogWriter().fine("after checkFromClient(c2)");
-
- // performing check in the regionqueue of the server2
- checkFromServer(server2, KEY1);
- Log.getLogWriter().fine("after checkFromServer(s2, k1)");
-
- // UPDATE.
- clientPut(client1, KEY1, VALUE1);
- Log.getLogWriter().fine("after clientPut 2 (c1, k1, v1)");
- // Waiting in the client2 till it receives the event for the key.
- checkFromClient(client2);
- Log.getLogWriter().fine("after checkFromClient 2 (c2)");
-
- // performing check in the regionqueue of the server2
- checkFromServer(server2, KEY1);
- Log.getLogWriter().fine("after checkFromServer 2 (s2, k1)");
-
- Log.getLogWriter().info("testClientUpdateMessageSerialization() completed successfully");
- }
-
- private void createCache(Properties props) throws Exception
- {
+ private void createCache(Properties props) {
DistributedSystem ds = getSystem(props);
assertNotNull(ds);
ds.disconnect();
@@ -350,10 +297,8 @@ public class HADispatcherDUnitTest extends DistributedTestCase
assertNotNull(cache);
}
- public static Integer createServerCache(Boolean isListenerPresent)
- throws Exception
- {
- new HADispatcherDUnitTest("temp").createCache(new Properties());
+ private Integer createServerCache(Boolean isListenerPresent) throws IOException {
+ createCache(new Properties());
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.REPLICATE);
@@ -363,27 +308,25 @@ public class HADispatcherDUnitTest extends DistributedTestCase
}
RegionAttributes attrs = factory.create();
cache.createRegion(REGION_NAME, attrs);
- CacheServerImpl server = (CacheServerImpl)cache.addCacheServer();
+ CacheServerImpl server = (CacheServerImpl) cache.addCacheServer();
assertNotNull(server);
- int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ int port = getRandomAvailablePort(SOCKET);
server.setPort(port);
server.setNotifyBySubscription(true);
server.start();
return new Integer(server.getPort());
}
- public static void createClientCache(String hostName, Integer port1, Integer port2,
- Boolean isListenerPresent) throws Exception
- {
+ private void createClientCache(String hostName, Integer port1, Integer port2, Boolean isListenerPresent) throws CqException, CqExistsException, RegionNotFoundException {
int PORT1 = port1.intValue();
int PORT2 = port2.intValue();
Properties props = new Properties();
props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
props.setProperty(DistributionConfig.LOCATORS_NAME, "");
- new HADispatcherDUnitTest("temp").createCache(props);
+ createCache(props);
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
- ClientServerTestCase.configureConnectionPool(factory, hostName, new int[] {PORT1,PORT2}, true, -1, 2, null);
+ ClientServerTestCase.configureConnectionPool(factory, hostName, new int[]{PORT1, PORT2}, true, -1, 2, null);
if (isListenerPresent.booleanValue() == true) {
CacheListener clientListener = new HAClientListener();
factory.setCacheListener(clientListener);
@@ -394,40 +337,42 @@ public class HADispatcherDUnitTest extends DistributedTestCase
assertNotNull(region);
{
- LocalRegion lr = (LocalRegion)region;
- final PoolImpl pool = (PoolImpl)(lr.getServerProxy().getPool());
+ LocalRegion lr = (LocalRegion) region;
+ final PoolImpl pool = (PoolImpl) (lr.getServerProxy().getPool());
WaitCriterion ev = new WaitCriterion() {
public boolean done() {
return pool.getPrimary() != null;
}
+
public String description() {
return null;
}
};
- Wait.waitForCriterion(ev, 30 * 1000, 200, true);
+ waitForCriterion(ev, 30 * 1000, 200, true);
ev = new WaitCriterion() {
public boolean done() {
return pool.getRedundants().size() >= 1;
}
+
public String description() {
return null;
}
};
- Wait.waitForCriterion(ev, 30 * 1000, 200, true);
-
+ waitForCriterion(ev, 30 * 1000, 200, true);
+
assertNotNull(pool.getPrimary());
- assertTrue("backups="+pool.getRedundants() + " expected=" + 1,
- pool.getRedundants().size() >= 1);
+ assertTrue("backups=" + pool.getRedundants() + " expected=" + 1,
+ pool.getRedundants().size() >= 1);
assertEquals(PORT1, pool.getPrimaryPort());
}
region.registerInterest(KEY1);
// Register CQ.
- createCQ();
+ createCQ();
}
- private static void createCQ(){
+ private void createCQ() throws CqException, CqExistsException, RegionNotFoundException {
QueryService cqService = null;
try {
cqService = cache.getQueryService();
@@ -435,253 +380,86 @@ public class HADispatcherDUnitTest extends DistributedTestCase
cqe.printStackTrace();
fail("Failed to getCQService.");
}
-
+
// Create CQ Attributes.
CqAttributesFactory cqf = new CqAttributesFactory();
- CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())};
+ CqListener[] cqListeners = {new CqQueryTestListener(getLogWriter())};
cqf.initCqListeners(cqListeners);
CqAttributes cqa = cqf.create();
-
+
String cqName = "CQForHARegionQueueTest";
String queryStr = "Select * from " + Region.SEPARATOR + REGION_NAME;
-
+
// Create CQ.
- try {
- CqQuery cq1 = cqService.newCq(cqName, queryStr, cqa);
- cq1.execute();
- } catch (Exception ex){
- LogWriterUtils.getLogWriter().info("CQService is :" + cqService);
- ex.printStackTrace();
- AssertionError err = new AssertionError("Failed to create/execute CQ " + cqName + " . ");
- err.initCause(ex);
- throw err;
- }
+ CqQuery cq1 = cqService.newCq(cqName, queryStr, cqa);
+ cq1.execute();
}
-
/**
* This is the client listener which notifies the waiting thread when it
* receives the event.
*/
- protected static class HAClientListener extends CacheListenerAdapter implements Declarable {
- public void afterCreate(EntryEvent event)
- {
- synchronized (HADispatcherDUnitTest.dummyObj) {
+ private static class HAClientListener extends CacheListenerAdapter implements Declarable {
+
+ @Override
+ public void afterCreate(EntryEvent event) {
+ synchronized (dummyObj) {
try {
Object value = event.getNewValue();
- if (value.equals(HADispatcherDUnitTest.VALUE1)) {
- HADispatcherDUnitTest.waitFlag = false;
- HADispatcherDUnitTest.dummyObj.notifyAll();
+ if (value.equals(VALUE1)) {
+ waitFlag = false;
+ dummyObj.notifyAll();
}
- }
- catch (Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
}
}
}
- public void afterUpdate(EntryEvent event)
- {
-
- }
-
- public void afterInvalidate(EntryEvent event)
- {
-
- }
-
- public void afterDestroy(EntryEvent event)
- {
-
- }
-
- public void afterRegionInvalidate(RegionEvent event)
- {
-
- }
-
- public void afterRegionDestroy(RegionEvent event)
- {
-
- }
-
- public void close()
- {
-
- }
-
- public void init(Properties props)
- {
-
- }
- public void afterRegionCreate(RegionEvent event)
- {
- // TODO Auto-generated method stub
-
- }
- public void afterRegionClear(RegionEvent event)
- {
- // TODO Auto-generated method stub
-
- }
- public void afterRegionLive(RegionEvent event)
- {
- // TODO NOT Auto-generated method stub, added by vrao
-
+ @Override
+ public void init(Properties props) {
}
}
/**
* This is the server listener which ensures that regionqueue is properly populated
*/
- protected static class HAServerListener extends CacheListenerAdapter {
+ private static class HAServerListener extends CacheListenerAdapter {
@Override
- public void afterCreate(EntryEvent event)
- {
+ public void afterCreate(EntryEvent event) {
Cache cache = event.getRegion().getCache();
Iterator iter = cache.getCacheServers().iterator();
- CacheServerImpl server = (CacheServerImpl)iter.next();
- HADispatcherDUnitTest.isObjectPresent = false;
+ CacheServerImpl server = (CacheServerImpl) iter.next();
+ isObjectPresent = false;
// The event not be there in the region first time; try couple of time.
// This should have been replaced by listener on the HARegion and doing wait for event arrival in that.
- while (true) {
- Iterator iter_prox = server.getAcceptor().getCacheClientNotifier()
- .getClientProxies().iterator();
- while (iter_prox.hasNext()) {
- CacheClientProxy proxy = (CacheClientProxy)iter_prox.next();
-// ClientProxyMembershipID proxyID = proxy.getProxyID();
- HARegion regionForQueue = (HARegion)proxy.getHARegion();
+ while (true) {
+ for (Iterator iter_prox = server.getAcceptor().getCacheClientNotifier().getClientProxies().iterator(); iter_prox.hasNext();) {
+ CacheClientProxy proxy = (CacheClientProxy) iter_prox.next();
+ HARegion regionForQueue = (HARegion) proxy.getHARegion();
- Iterator itr = regionForQueue.values().iterator();
- while (itr.hasNext()) {
+ for (Iterator itr = regionForQueue.values().iterator(); itr.hasNext();) {
Object obj = itr.next();
if (obj instanceof HAEventWrapper) {
- Conflatable confObj = (Conflatable)obj;
- if ((HADispatcherDUnitTest.KEY1).equals(confObj.getKeyToConflate()) ||
- (HADispatcherDUnitTest.KEY2).equals(confObj.getKeyToConflate())) {
- HADispatcherDUnitTest.isObjectPresent = true;
+ Conflatable confObj = (Conflatable) obj;
+ if (KEY1.equals(confObj.getKeyToConflate()) || KEY2.equals(confObj.getKeyToConflate())) {
+ isObjectPresent = true;
}
}
}
}
- if (HADispatcherDUnitTest.isObjectPresent == true) {
+
+ if (isObjectPresent) {
break; // From while.
}
+
try {
Thread.sleep(10);
- } catch (InterruptedException ex) {fail("interrupted");}
+ } catch (InterruptedException e) {
+ fail("interrupted", e);
+ }
}
}
-
- // this test is no longer needed since these
- // messages are not longer Externalizable
-// /*
-// * This is for testing ClientUpdateMessage's serialization code.
-// */
-// public void afterUpdate(EntryEvent event)
-// {
-// Log.getLogWriter().info("In HAServerListener::AfterUpdate::Event=" + event);
-// Cache cache = event.getRegion().getCache();
-// Iterator iter = cache.getCacheServers().iterator();
-// BridgeServerImpl server = (BridgeServerImpl)iter.next();
-// HADispatcherDUnitTest.isObjectPresent = false;
-
-// // The event not be there in the region first time; try couple of time.
-// // This should have been replaced by listener on the HARegion and doing wait for event arrival in that.
-// while (true) {
-
-// Iterator iter_prox = server.getAcceptor().getCacheClientNotifier()
-// .getClientProxies().iterator();
-// while (iter_prox.hasNext()) {
-// CacheClientProxy proxy = (CacheClientProxy)iter_prox.next();
-// ClientProxyMembershipID proxyID = proxy.getProxyID();
-// HARegion regionForQueue = (HARegion)cache.getRegion(Region.SEPARATOR
-// + HARegionQueue.createRegionName(proxyID.toString()));
-// if (regionForQueue == null) {
-// // I observed dunit throwing an NPE here.
-// // I changed it to just keep retrying which caused dunit to hang.
-// // The queue is gone we are shutting down so just return
-// return;
-// }
-// Iterator itr = regionForQueue.values().iterator();
-// while (itr.hasNext()) {
-// Object obj = itr.next();
-// if (obj.getClass().getName().equals(
-// "com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessage")) {
-// com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessage clientUpdateMessage =
-// (com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessage)obj;
-// try{
-// // Test for readExternal(), writeExternal().
-// ByteArrayOutputStream outStream = new ByteArrayOutputStream();
-// ObjectOutputStream out = new ObjectOutputStream(outStream);
-// clientUpdateMessage.writeExternal(out);
-// out.flush();
-// ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(outStream.toByteArray()));
-
-// com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessage newClientUpdateMessage =
-// new com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessageImpl();
-
-// newClientUpdateMessage.readExternal(in);
-
-// Log.getLogWriter().info("Newly constructed ClientUpdateMessage is :" + newClientUpdateMessage.toString());
-// if (!newClientUpdateMessage.hasCqs() ||
-// ((ClientUpdateMessageImpl)newClientUpdateMessage).getClientCqs() == null ||
-// ((ClientUpdateMessageImpl)newClientUpdateMessage).getClientCqs().size() != 2){
-// throw new Exception("CQ Info not present");
-// }
-// HashMap clientCQs = ((ClientUpdateMessageImpl)newClientUpdateMessage).getClientCqs();
-
-// // Try to print CQ details - debug.
-// for (Iterator cciter = clientCQs.keySet().iterator(); cciter.hasNext();) {
-// ClientProxyMembershipID proxyId = (ClientProxyMembershipID)cciter.next();
-
-// HashMap cqs = (HashMap)clientCQs.get(proxyId);
-// Log.getLogWriter().info("Client ID is :" + proxyId);
-
-// for (Iterator cqIter = cqs.keySet().iterator();cqIter.hasNext();){
-// // Add CQ Name.
-// String cq = (String)cqIter.next();
-// // Add CQ Op.
-// Log.getLogWriter().info("CQ Name :" + cq + " CQ OP :" + ((Integer)cqs.get(cq)).intValue());
-// }
-
-// }
-
-// // Test for toData(), fromData().
-// ByteArrayOutputStream dataOutStream = new ByteArrayOutputStream();
-// DataOutputStream dataout = new DataOutputStream(dataOutStream);
-// clientUpdateMessage.toData(dataout);
-// dataOutStream.flush();
-// DataInputStream datain = new DataInputStream(new ByteArrayInputStream(dataOutStream.toByteArray()));
-
-// com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessage newClientUpdateMessage2 =
-// new com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessageImpl();
-
-// newClientUpdateMessage2.fromData(datain);
-
-// Log.getLogWriter().info("Newly constructed ClientUpdateMessage is :" + newClientUpdateMessage2.toString());
-// if (!newClientUpdateMessage2.hasCqs() ||
-// ((ClientUpdateMessageImpl)newClientUpdateMessage2).getClientCqs() == null ||
-// ((ClientUpdateMessageImpl)newClientUpdateMessage2).getClientCqs().size() != 2){
-// throw new Exception("CQ Info not present");
-// }
-
-// } catch (Exception ex) {
-// Log.getLogWriter().info("Exception while serializing ClientUpdateMessage.", ex);
-// return;
-// }
-// HADispatcherDUnitTest.isObjectPresent = true;
-// }
-// }
-// }
-// if (HADispatcherDUnitTest.isObjectPresent == true) {
-// break; // From while.
-// }
-// try {
-// Thread.sleep(10);
-// } catch (Exception ex) {}
-// }
-// }
}
}