You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/05/11 18:01:32 UTC

incubator-geode git commit: Fix up HADispatcherDUnitTest

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-1376 c3906859a -> fde0c8cd9


Fix up HADispatcherDUnitTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/fde0c8cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/fde0c8cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/fde0c8cd

Branch: refs/heads/feature/GEODE-1376
Commit: fde0c8cd91cfeb58034c1281b153df6f5fb48cf7
Parents: c390685
Author: Kirk Lund <kl...@apache.org>
Authored: Wed May 11 11:01:17 2016 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Wed May 11 11:01:17 2016 -0700

----------------------------------------------------------------------
 .../cache/ha/HADispatcherDUnitTest.java         | 608 ++++++-------------
 1 file changed, 193 insertions(+), 415 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fde0c8cd/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/ha/HADispatcherDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/ha/HADispatcherDUnitTest.java b/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/ha/HADispatcherDUnitTest.java
index aaf8b6f..fd7c559 100755
--- a/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/ha/HADispatcherDUnitTest.java
+++ b/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/ha/HADispatcherDUnitTest.java
@@ -16,11 +16,20 @@
  */
 package com.gemstone.gemfire.internal.cache.ha;
 
-import hydra.Log;
-
+import static com.gemstone.gemfire.internal.AvailablePort.*;
+import static com.gemstone.gemfire.test.dunit.Assert.*;
+import static com.gemstone.gemfire.test.dunit.Host.*;
+import static com.gemstone.gemfire.test.dunit.LogWriterUtils.*;
+import static com.gemstone.gemfire.test.dunit.NetworkUtils.*;
+import static com.gemstone.gemfire.test.dunit.Wait.*;
+
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.Properties;
 
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
 import com.gemstone.gemfire.cache.AttributesFactory;
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.CacheException;
@@ -36,16 +45,18 @@ import com.gemstone.gemfire.cache.Scope;
 import com.gemstone.gemfire.cache.client.internal.PoolImpl;
 import com.gemstone.gemfire.cache.query.CqAttributes;
 import com.gemstone.gemfire.cache.query.CqAttributesFactory;
+import com.gemstone.gemfire.cache.query.CqException;
+import com.gemstone.gemfire.cache.query.CqExistsException;
 import com.gemstone.gemfire.cache.query.CqListener;
 import com.gemstone.gemfire.cache.query.CqQuery;
 import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.RegionNotFoundException;
 import com.gemstone.gemfire.cache.query.cq.dunit.CqQueryTestListener;
 import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
-import com.gemstone.gemfire.cache30.ClientServerTestCase;
 import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.cache30.ClientServerTestCase;
 import com.gemstone.gemfire.distributed.DistributedSystem;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.internal.AvailablePort;
 import com.gemstone.gemfire.internal.cache.CacheServerImpl;
 import com.gemstone.gemfire.internal.cache.Conflatable;
 import com.gemstone.gemfire.internal.cache.HARegion;
@@ -54,13 +65,10 @@ import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy;
 import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerTestUtil;
 import com.gemstone.gemfire.internal.cache.tier.sockets.ConflationDUnitTest;
 import com.gemstone.gemfire.internal.cache.tier.sockets.HAEventWrapper;
-import com.gemstone.gemfire.test.dunit.DistributedTestCase;
-import com.gemstone.gemfire.test.dunit.Host;
-import com.gemstone.gemfire.test.dunit.LogWriterUtils;
-import com.gemstone.gemfire.test.dunit.NetworkUtils;
 import com.gemstone.gemfire.test.dunit.VM;
-import com.gemstone.gemfire.test.dunit.Wait;
 import com.gemstone.gemfire.test.dunit.WaitCriterion;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
 
 /**
  * This Dunit test is to verify that when the dispatcher of CS dispatches the
@@ -76,118 +84,136 @@ import com.gemstone.gemfire.test.dunit.WaitCriterion;
  * 7. Again the entry in the regionque of client2 on server2.It should not be present.
  * 8. close client1 and client2
  * 9. close server1 and server2
- *
  */
-
-public class HADispatcherDUnitTest extends DistributedTestCase
-{
-
-  VM server1 = null;
-
-  VM server2 = null;
-
-  VM client1 = null;
-
-  VM client2 = null;
-
-  public int PORT1;
-
-  public int PORT2;
-
-  private static final String REGION_NAME = "HADispatcherDUnitTest_region";
-
-  protected static Cache cache = null;
-
-  public static final Object dummyObj = "dummyObject";
-
-  static volatile boolean isObjectPresent = false;
-
-  final static String KEY1 = "KEY1";
-
-  final static String VALUE1 = "VALUE1";
-
-  final static String KEY2 = "KEY2";
-
-  final static String VALUE2 = "VALUE2";
-
-  static volatile boolean waitFlag = true;
-
-  public HADispatcherDUnitTest(String name) {
-    super(name);
-  }
+@Category(DistributedTest.class)
+public class HADispatcherDUnitTest extends JUnit4DistributedTestCase {
+
+  private static final String REGION_NAME = HADispatcherDUnitTest.class.getSimpleName() + "_region";
+  private static final Object dummyObj = "dummyObject";
+  private static final String KEY1 = "KEY1";
+  private static final String VALUE1 = "VALUE1";
+  private static final String KEY2 = "KEY2";
+  private static final String VALUE2 = "VALUE2";
+
+  private static Cache cache = null;
+  private static volatile boolean isObjectPresent = false;
+  private static volatile boolean waitFlag = true;
+
+  private VM server1 = null;
+  private VM server2 = null;
+  private VM client1 = null;
+  private VM client2 = null;
+  private int PORT1;
+  private int PORT2;
 
   @Override
   public final void postSetUp() throws Exception {
-    final Host host = Host.getHost(0);
-    
+    String serverHostName = getServerHostName(getHost(0));
+
     // Server1 VM
-    server1 = host.getVM(0);
+    server1 = getHost(0).getVM(0);
 
     // Server2 VM
-    server2 = host.getVM(1);
+    server2 = getHost(0).getVM(1);
 
     // Client 1 VM
-    client1 = host.getVM(2);
+    client1 = getHost(0).getVM(2);
 
     // client 2 VM
-    client2 = host.getVM(3);
+    client2 = getHost(0).getVM(3);
+
+    PORT1 = ((Integer) server1.invoke(() -> createServerCache(new Boolean(false)))).intValue();
 
-    PORT1 = ((Integer)server1.invoke(() -> HADispatcherDUnitTest.createServerCache( new Boolean(false) ))).intValue();
     server1.invoke(() -> ConflationDUnitTest.setIsSlowStart());
-    server1.invoke(() -> HADispatcherDUnitTest.makeDispatcherSlow());
-    server1.invoke(() -> HADispatcherDUnitTest.setQRMslow());
-    PORT2 = ((Integer)server2.invoke(() -> HADispatcherDUnitTest.createServerCache( new Boolean(true) ))).intValue();
+    server1.invoke(() -> makeDispatcherSlow());
+    server1.invoke(() -> setQRMslow());
+
+    PORT2 = ((Integer) server2.invoke(() -> createServerCache(new Boolean(true)))).intValue();
 
     client1.invoke(() -> CacheServerTestUtil.disableShufflingOfEndpoints());
     client2.invoke(() -> CacheServerTestUtil.disableShufflingOfEndpoints());
-    client1.invoke(() -> HADispatcherDUnitTest.createClientCache(
-            NetworkUtils.getServerHostName(host),
-            new Integer(PORT1), new Integer(PORT2),
-            new Boolean(false) ));
-    client2.invoke(() -> HADispatcherDUnitTest.createClientCache(
-            NetworkUtils.getServerHostName(host),
-            new Integer(PORT1), new Integer(PORT2),
-            new Boolean(true) ));
+    client1.invoke(() -> createClientCache(serverHostName, new Integer(PORT1), new Integer(PORT2), new Boolean(false)));
+    client2.invoke(() -> createClientCache(serverHostName, new Integer(PORT1), new Integer(PORT2), new Boolean(true)));
   }
 
   @Override
   public final void preTearDown() throws Exception {
-    client1.invoke(() -> HADispatcherDUnitTest.closeCache());
-    client2.invoke(() -> HADispatcherDUnitTest.closeCache());
+    client1.invoke(() -> closeCache());
+    client2.invoke(() -> closeCache());
     // close server
-    server1.invoke(() -> HADispatcherDUnitTest.resetQRMslow());
-    server1.invoke(() -> HADispatcherDUnitTest.closeCache());
-    server2.invoke(() -> HADispatcherDUnitTest.closeCache());
+    server1.invoke(() -> resetQRMslow());
+    server1.invoke(() -> closeCache());
+    server2.invoke(() -> closeCache());
+  }
+
+  @Test
+  public void testDispatcher() throws Exception {
+    clientPut(client1, KEY1, VALUE1);
+    // Waiting in the client2 till it receives the event for the key.
+    checkFromClient(client2);
+
+    // performing check in the regionqueue of the server2
+    checkFromServer(server2, KEY1);
+
+    // For CQ Only.
+    // performing put from the client1
+    clientPut(client1, KEY2, VALUE2);
+    checkFromClient(client2);
+    checkFromServer(server2, KEY2);
+
+    getLogWriter().info("testDispatcher() completed successfully");
   }
 
-  public static void closeCache()
-  {
+  /**
+   * This is to test the serialization mechanism of ClientUpdateMessage.
+   * Added after CQ support.
+   * This could be done in different way, by overflowing the HARegion queue.
+   */
+  @Test
+  public void testClientUpdateMessageSerialization() throws Exception {
+    // Update Value.
+    clientPut(client1, KEY1, VALUE1);
+    getLogWriter().fine(">>>>>>>> after clientPut(c1, k1, v1)");
+    // Waiting in the client2 till it receives the event for the key.
+    checkFromClient(client2);
+    getLogWriter().fine("after checkFromClient(c2)");
+
+    // performing check in the regionqueue of the server2
+    checkFromServer(server2, KEY1);
+    getLogWriter().fine("after checkFromServer(s2, k1)");
+
+    // UPDATE.
+    clientPut(client1, KEY1, VALUE1);
+    getLogWriter().fine("after clientPut 2 (c1, k1, v1)");
+    // Waiting in the client2 till it receives the event for the key.
+    checkFromClient(client2);
+    getLogWriter().fine("after checkFromClient 2 (c2)");
+
+    // performing check in the regionqueue of the server2
+    checkFromServer(server2, KEY1);
+    getLogWriter().fine("after checkFromServer 2 (s2, k1)");
+
+    getLogWriter().info("testClientUpdateMessageSerialization() completed successfully");
+  }
+
+  private void closeCache() {
     if (cache != null && !cache.isClosed()) {
       cache.close();
       cache.getDistributedSystem().disconnect();
     }
   }
 
-  public static void setQRMslow()
-  {
+  private void setQRMslow() throws InterruptedException {
     int oldMessageSyncInterval = cache.getMessageSyncInterval();
     cache.setMessageSyncInterval(6);
-    try {
-      Thread.sleep((oldMessageSyncInterval + 1)*1000);
-    }
-    catch (InterruptedException e) {
-      fail("Unexcepted InterruptedException Occurred");
-    }
+    Thread.sleep((oldMessageSyncInterval + 1) * 1000);
   }
 
-  public static void resetQRMslow()
-  {
+  private void resetQRMslow() {
     cache.setMessageSyncInterval(HARegionQueue.DEFAULT_MESSAGE_SYNC_INTERVAL);
   }
 
-
-  public static void makeDispatcherSlow()
-  {
+  private void makeDispatcherSlow() {
     System.setProperty("slowStartTimeForTesting", "5000");
   }
 
@@ -195,8 +221,7 @@ public class HADispatcherDUnitTest extends DistributedTestCase
     // performing put from the client1
     vm.invoke(new CacheSerializableRunnable("putFromClient") {
       @Override
-      public void run2() throws CacheException
-      {
+      public void run2() throws CacheException {
         Region region = cache.getRegion(Region.SEPARATOR + REGION_NAME);
         assertNotNull(region);
         region.put(key, value);
@@ -208,18 +233,16 @@ public class HADispatcherDUnitTest extends DistributedTestCase
     // Waiting in the client till it receives the event for the key.
     vm.invoke(new CacheSerializableRunnable("checkFromClient") {
       @Override
-      public void run2() throws CacheException
-      {
+      public void run2() throws CacheException {
         Region region = cache.getRegion(Region.SEPARATOR + REGION_NAME);
         assertNotNull(region);
-        cache.getLogger().fine("starting the wait");        
+        cache.getLogger().fine("starting the wait");
         synchronized (dummyObj) {
           while (waitFlag) {
             try {
               dummyObj.wait(30000);
-            }
-            catch (InterruptedException e) {
-              fail("interrupted");
+            } catch (InterruptedException e) {
+              fail("interrupted", e);
             }
           }
         }
@@ -231,117 +254,41 @@ public class HADispatcherDUnitTest extends DistributedTestCase
   }
 
   private void checkFromServer(VM vm, final Object key) {
-    // Thread.sleep(10000); // why sleep if the invoke will retry?
-    //  performing check in the regionqueue of the server2
     vm.invoke(new CacheSerializableRunnable("checkFromServer") {
       @Override
-      public void run2() throws CacheException
-      {
+      public void run2() throws CacheException {
         Iterator iter = cache.getCacheServers().iterator();
-        CacheServerImpl server = (CacheServerImpl)iter.next();
-        Iterator iter_prox = server.getAcceptor().getCacheClientNotifier()
-          .getClientProxies().iterator();
+        CacheServerImpl server = (CacheServerImpl) iter.next();
+        Iterator iter_prox = server.getAcceptor().getCacheClientNotifier().getClientProxies().iterator();
         isObjectPresent = false;
+
         while (iter_prox.hasNext()) {
-          final CacheClientProxy proxy = (CacheClientProxy)iter_prox.next();
-//          ClientProxyMembershipID proxyID = proxy.getProxyID();
-/* Conflict from CQ branch ------------------------------------------------------
-          Region regionqueue = cache.getRegion(Region.SEPARATOR
-              + HARegionQueue.createRegionName(proxyID.toString()));
-          assertNotNull(regionqueue);
-          Iterator itr = regionqueue.values().iterator();
-          while (itr.hasNext()) {
-            Object obj = itr.next();
-            if (obj
-                .getClass()
-                .getName()
-                .equals(
-                "com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessage")) {
-              Conflatable confObj = (Conflatable)obj;
-              Log.getLogWriter().info("value of the object ");
-              if (key.equals(confObj.getKeyToConflate()))
-                isObjectPresent = true;
--------------------------------------------------------------------------------*/
-            //HARegion region = (HARegion)cache.getRegion(Region.SEPARATOR
-            //    + HAHelper.getRegionQueueName(proxyID.toString()));
-            //assertNotNull(region);
-            HARegion region = (HARegion) proxy.getHARegion();
-            assertNotNull(region);
-            final HARegionQueue regionQueue = region.getOwner();
-
-            WaitCriterion wc = new WaitCriterion() {
-
-              public boolean done() {
-                int sz = regionQueue.size();
-                cache.getLogger().fine("regionQueue.size()::"+ sz);
-                return sz == 0 || !proxy.isConnected();
-              }
+          final CacheClientProxy proxy = (CacheClientProxy) iter_prox.next();
+          HARegion region = (HARegion) proxy.getHARegion();
+          assertNotNull(region);
+          final HARegionQueue regionQueue = region.getOwner();
+
+          WaitCriterion wc = new WaitCriterion() {
+            @Override
+            public boolean done() {
+              int sz = regionQueue.size();
+              cache.getLogger().fine("regionQueue.size()::" + sz);
+              return sz == 0 || !proxy.isConnected();
+            }
+            @Override
+            public String description() {
+              return "regionQueue not empty with size " + regionQueue.size() + " for proxy " + proxy;
+            }
+          };
+          waitForCriterion(wc, 60 * 1000, 1000, true);
 
-              public String description() {
-                return "regionQueue not empty with size " + regionQueue.size()
-                    + " for proxy " + proxy;
-              }
-            };
-            Wait.waitForCriterion(wc, 60 * 1000, 1000, true);
-            cache.getLogger().fine("processed a proxy");
+          cache.getLogger().fine("processed a proxy");
         }
       }
     });
-  }    
-  
-  public void testDispatcher() throws Exception
-  {
-    clientPut(client1, KEY1, VALUE1);
-    // Waiting in the client2 till it receives the event for the key.
-    checkFromClient(client2);
-
-    // performing check in the regionqueue of the server2
-    checkFromServer(server2, KEY1);
-    
-    // For CQ Only.
-    // performing put from the client1
-    clientPut(client1, KEY2, VALUE2);
-    checkFromClient(client2);
-    checkFromServer(server2, KEY2);
-    
-    Log.getLogWriter().info("testDispatcher() completed successfully");
   }
 
-  /*
-   * This is to test the serialization mechanism of ClientUpdateMessage.
-   * Added after CQ support. 
-   * This could be done in different way, by overflowing the HARegion queue.
-   * 
-   */
-  public void /*test*/ClientUpdateMessageSerialization() throws Exception
-  {
-    // Update Value.
-    clientPut(client1, KEY1, VALUE1);
-    Log.getLogWriter().fine(">>>>>>>> after clientPut(c1, k1, v1)");
-    // Waiting in the client2 till it receives the event for the key.
-    checkFromClient(client2);
-    Log.getLogWriter().fine("after checkFromClient(c2)");
-
-    // performing check in the regionqueue of the server2
-    checkFromServer(server2, KEY1);
-    Log.getLogWriter().fine("after checkFromServer(s2, k1)");
-
-    // UPDATE.
-    clientPut(client1, KEY1, VALUE1);
-    Log.getLogWriter().fine("after clientPut 2 (c1, k1, v1)");
-    // Waiting in the client2 till it receives the event for the key.
-    checkFromClient(client2);
-    Log.getLogWriter().fine("after checkFromClient 2 (c2)");
-
-    // performing check in the regionqueue of the server2
-    checkFromServer(server2, KEY1);
-    Log.getLogWriter().fine("after checkFromServer 2 (s2, k1)");
-
-    Log.getLogWriter().info("testClientUpdateMessageSerialization() completed successfully");
-  }
-
-  private void createCache(Properties props) throws Exception
-  {
+  private void createCache(Properties props) {
     DistributedSystem ds = getSystem(props);
     assertNotNull(ds);
     ds.disconnect();
@@ -350,10 +297,8 @@ public class HADispatcherDUnitTest extends DistributedTestCase
     assertNotNull(cache);
   }
 
-  public static Integer createServerCache(Boolean isListenerPresent)
-      throws Exception
-  {
-    new HADispatcherDUnitTest("temp").createCache(new Properties());
+  private Integer createServerCache(Boolean isListenerPresent) throws IOException {
+    createCache(new Properties());
     AttributesFactory factory = new AttributesFactory();
     factory.setScope(Scope.DISTRIBUTED_ACK);
     factory.setDataPolicy(DataPolicy.REPLICATE);
@@ -363,27 +308,25 @@ public class HADispatcherDUnitTest extends DistributedTestCase
     }
     RegionAttributes attrs = factory.create();
     cache.createRegion(REGION_NAME, attrs);
-    CacheServerImpl server = (CacheServerImpl)cache.addCacheServer();
+    CacheServerImpl server = (CacheServerImpl) cache.addCacheServer();
     assertNotNull(server);
-    int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    int port = getRandomAvailablePort(SOCKET);
     server.setPort(port);
     server.setNotifyBySubscription(true);
     server.start();
     return new Integer(server.getPort());
   }
 
-  public static void createClientCache(String hostName, Integer port1, Integer port2,
-      Boolean isListenerPresent) throws Exception
-  {
+  private void createClientCache(String hostName, Integer port1, Integer port2, Boolean isListenerPresent) throws CqException, CqExistsException, RegionNotFoundException {
     int PORT1 = port1.intValue();
     int PORT2 = port2.intValue();
     Properties props = new Properties();
     props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
     props.setProperty(DistributionConfig.LOCATORS_NAME, "");
-    new HADispatcherDUnitTest("temp").createCache(props);
+    createCache(props);
     AttributesFactory factory = new AttributesFactory();
     factory.setScope(Scope.DISTRIBUTED_ACK);
-    ClientServerTestCase.configureConnectionPool(factory, hostName, new int[] {PORT1,PORT2}, true, -1, 2, null);
+    ClientServerTestCase.configureConnectionPool(factory, hostName, new int[]{PORT1, PORT2}, true, -1, 2, null);
     if (isListenerPresent.booleanValue() == true) {
       CacheListener clientListener = new HAClientListener();
       factory.setCacheListener(clientListener);
@@ -394,40 +337,42 @@ public class HADispatcherDUnitTest extends DistributedTestCase
     assertNotNull(region);
 
     {
-      LocalRegion lr = (LocalRegion)region;
-      final PoolImpl pool = (PoolImpl)(lr.getServerProxy().getPool());
+      LocalRegion lr = (LocalRegion) region;
+      final PoolImpl pool = (PoolImpl) (lr.getServerProxy().getPool());
       WaitCriterion ev = new WaitCriterion() {
         public boolean done() {
           return pool.getPrimary() != null;
         }
+
         public String description() {
           return null;
         }
       };
-      Wait.waitForCriterion(ev, 30 * 1000, 200, true);
+      waitForCriterion(ev, 30 * 1000, 200, true);
       ev = new WaitCriterion() {
         public boolean done() {
           return pool.getRedundants().size() >= 1;
         }
+
         public String description() {
           return null;
         }
       };
-      Wait.waitForCriterion(ev, 30 * 1000, 200, true);
-      
+      waitForCriterion(ev, 30 * 1000, 200, true);
+
       assertNotNull(pool.getPrimary());
-      assertTrue("backups="+pool.getRedundants() + " expected=" + 1,
-                 pool.getRedundants().size() >= 1);
+      assertTrue("backups=" + pool.getRedundants() + " expected=" + 1,
+              pool.getRedundants().size() >= 1);
       assertEquals(PORT1, pool.getPrimaryPort());
     }
 
     region.registerInterest(KEY1);
 
     // Register CQ.
-    createCQ(); 
+    createCQ();
   }
 
-  private static void createCQ(){
+  private void createCQ() throws CqException, CqExistsException, RegionNotFoundException {
     QueryService cqService = null;
     try {
       cqService = cache.getQueryService();
@@ -435,253 +380,86 @@ public class HADispatcherDUnitTest extends DistributedTestCase
       cqe.printStackTrace();
       fail("Failed to getCQService.");
     }
-    
+
     // Create CQ Attributes.
     CqAttributesFactory cqf = new CqAttributesFactory();
-    CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())};    
+    CqListener[] cqListeners = {new CqQueryTestListener(getLogWriter())};
     cqf.initCqListeners(cqListeners);
     CqAttributes cqa = cqf.create();
-    
+
     String cqName = "CQForHARegionQueueTest";
     String queryStr = "Select * from " + Region.SEPARATOR + REGION_NAME;
-    
+
     // Create CQ.
-    try {
-      CqQuery cq1 = cqService.newCq(cqName, queryStr, cqa);
-      cq1.execute();
-    } catch (Exception ex){
-      LogWriterUtils.getLogWriter().info("CQService is :" + cqService);
-      ex.printStackTrace();
-      AssertionError err = new AssertionError("Failed to create/execute CQ " + cqName + " . ");
-      err.initCause(ex);
-      throw err;
-    }
+    CqQuery cq1 = cqService.newCq(cqName, queryStr, cqa);
+    cq1.execute();
   }
 
-
   /**
    * This is the client listener which notifies the waiting thread when it
    * receives the event.
    */
-  protected static class HAClientListener extends CacheListenerAdapter implements Declarable  {
-    public void afterCreate(EntryEvent event)
-    {
-      synchronized (HADispatcherDUnitTest.dummyObj) {
+  private static class HAClientListener extends CacheListenerAdapter implements Declarable {
+
+    @Override
+    public void afterCreate(EntryEvent event) {
+      synchronized (dummyObj) {
         try {
           Object value = event.getNewValue();
-          if (value.equals(HADispatcherDUnitTest.VALUE1)) {
-            HADispatcherDUnitTest.waitFlag = false;
-            HADispatcherDUnitTest.dummyObj.notifyAll();
+          if (value.equals(VALUE1)) {
+            waitFlag = false;
+            dummyObj.notifyAll();
           }
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
           e.printStackTrace();
         }
       }
     }
 
-    public void afterUpdate(EntryEvent event)
-    {
-
-    }
-
-    public void afterInvalidate(EntryEvent event)
-    {
-
-    }
-
-    public void afterDestroy(EntryEvent event)
-    {
-
-    }
-
-    public void afterRegionInvalidate(RegionEvent event)
-    {
-
-    }
-
-    public void afterRegionDestroy(RegionEvent event)
-    {
-
-    }
-
-    public void close()
-    {
-
-    }
-
-    public void init(Properties props)
-    {
-
-    }
-    public void afterRegionCreate(RegionEvent event)
-    {
-      // TODO Auto-generated method stub
-
-    }
-    public void afterRegionClear(RegionEvent event)
-    {
-      // TODO Auto-generated method stub
-
-    }
-    public void afterRegionLive(RegionEvent event)
-    {
-      // TODO NOT Auto-generated method stub, added by vrao
-
+    @Override
+    public void init(Properties props) {
     }
   }
 
   /**
    * This is the server listener which ensures that regionqueue is properly populated
    */
-  protected static class HAServerListener extends CacheListenerAdapter {
+  private static class HAServerListener extends CacheListenerAdapter {
     @Override
-    public void afterCreate(EntryEvent event)
-    {
+    public void afterCreate(EntryEvent event) {
       Cache cache = event.getRegion().getCache();
       Iterator iter = cache.getCacheServers().iterator();
-      CacheServerImpl server = (CacheServerImpl)iter.next();
-      HADispatcherDUnitTest.isObjectPresent = false;
+      CacheServerImpl server = (CacheServerImpl) iter.next();
+      isObjectPresent = false;
 
       // The event not be there in the region first time; try couple of time.
       // This should have been replaced by listener on the HARegion and doing wait for event arrival in that.
-      while (true) { 
-        Iterator iter_prox = server.getAcceptor().getCacheClientNotifier()
-        .getClientProxies().iterator();
-        while (iter_prox.hasNext()) {
-          CacheClientProxy proxy = (CacheClientProxy)iter_prox.next();
-//          ClientProxyMembershipID proxyID = proxy.getProxyID();
-          HARegion regionForQueue = (HARegion)proxy.getHARegion();
+      while (true) {
+        for (Iterator iter_prox = server.getAcceptor().getCacheClientNotifier().getClientProxies().iterator(); iter_prox.hasNext();) {
+          CacheClientProxy proxy = (CacheClientProxy) iter_prox.next();
+          HARegion regionForQueue = (HARegion) proxy.getHARegion();
 
-          Iterator itr = regionForQueue.values().iterator();
-          while (itr.hasNext()) {
+          for (Iterator itr = regionForQueue.values().iterator(); itr.hasNext();) {
             Object obj = itr.next();
             if (obj instanceof HAEventWrapper) {
-              Conflatable confObj = (Conflatable)obj;
-              if ((HADispatcherDUnitTest.KEY1).equals(confObj.getKeyToConflate()) ||
-                  (HADispatcherDUnitTest.KEY2).equals(confObj.getKeyToConflate())) {
-                HADispatcherDUnitTest.isObjectPresent = true;
+              Conflatable confObj = (Conflatable) obj;
+              if (KEY1.equals(confObj.getKeyToConflate()) || KEY2.equals(confObj.getKeyToConflate())) {
+                isObjectPresent = true;
               }
             }
           }
         }
-        if (HADispatcherDUnitTest.isObjectPresent == true) {
+
+        if (isObjectPresent) {
           break; // From while.
         }
+
         try {
           Thread.sleep(10);
-        } catch (InterruptedException ex) {fail("interrupted");}
+        } catch (InterruptedException e) {
+          fail("interrupted", e);
+        }
       }
     }
-
-    // this test is no longer needed since these
-    // messages are not longer Externalizable
-//  /*
-//  * This is for testing ClientUpdateMessage's serialization code.
-//  */
-//  public void afterUpdate(EntryEvent event)
-//  {
-//  Log.getLogWriter().info("In HAServerListener::AfterUpdate::Event=" + event);
-//  Cache cache = event.getRegion().getCache();
-//  Iterator iter = cache.getCacheServers().iterator();
-//  BridgeServerImpl server = (BridgeServerImpl)iter.next();
-//  HADispatcherDUnitTest.isObjectPresent = false;
-
-//  // The event not be there in the region first time; try couple of time.
-//  // This should have been replaced by listener on the HARegion and doing wait for event arrival in that.
-//  while (true) { 
-
-//  Iterator iter_prox = server.getAcceptor().getCacheClientNotifier()
-//  .getClientProxies().iterator();
-//  while (iter_prox.hasNext()) {
-//  CacheClientProxy proxy = (CacheClientProxy)iter_prox.next();
-//  ClientProxyMembershipID proxyID = proxy.getProxyID();
-//  HARegion regionForQueue = (HARegion)cache.getRegion(Region.SEPARATOR
-//  + HARegionQueue.createRegionName(proxyID.toString()));
-//  if (regionForQueue == null) {
-//  // I observed dunit throwing an NPE here.
-//  // I changed it to just keep retrying which caused dunit to hang.
-//  // The queue is gone we are shutting down so just return
-//  return;
-//  }
-//  Iterator itr = regionForQueue.values().iterator();
-//  while (itr.hasNext()) {
-//  Object obj = itr.next();
-//  if (obj.getClass().getName().equals(
-//  "com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessage")) {
-//  com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessage clientUpdateMessage = 
-//  (com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessage)obj;
-//  try{
-//  // Test for readExternal(), writeExternal().
-//  ByteArrayOutputStream outStream = new ByteArrayOutputStream();
-//  ObjectOutputStream out = new ObjectOutputStream(outStream);
-//  clientUpdateMessage.writeExternal(out);
-//  out.flush();
-//  ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(outStream.toByteArray()));
-
-//  com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessage newClientUpdateMessage = 
-//  new com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessageImpl();
-
-//  newClientUpdateMessage.readExternal(in);
-
-//  Log.getLogWriter().info("Newly constructed ClientUpdateMessage is :" + newClientUpdateMessage.toString());
-//  if (!newClientUpdateMessage.hasCqs() || 
-//  ((ClientUpdateMessageImpl)newClientUpdateMessage).getClientCqs() == null ||
-//  ((ClientUpdateMessageImpl)newClientUpdateMessage).getClientCqs().size() != 2){
-//  throw new Exception("CQ Info not present");
-//  }
-//  HashMap clientCQs = ((ClientUpdateMessageImpl)newClientUpdateMessage).getClientCqs();
-
-//  // Try to print CQ details - debug.
-//  for (Iterator cciter = clientCQs.keySet().iterator(); cciter.hasNext();) {
-//  ClientProxyMembershipID proxyId = (ClientProxyMembershipID)cciter.next();
-
-//  HashMap cqs = (HashMap)clientCQs.get(proxyId);
-//  Log.getLogWriter().info("Client ID is :" + proxyId); 
-
-//  for (Iterator cqIter = cqs.keySet().iterator();cqIter.hasNext();){
-//  // Add CQ Name.
-//  String cq = (String)cqIter.next();
-//  // Add CQ Op.
-//  Log.getLogWriter().info("CQ Name :" + cq + " CQ OP :" + ((Integer)cqs.get(cq)).intValue());
-//  }
-
-//  }
-
-//  // Test for toData(), fromData().
-//  ByteArrayOutputStream dataOutStream = new ByteArrayOutputStream();
-//  DataOutputStream dataout = new DataOutputStream(dataOutStream);
-//  clientUpdateMessage.toData(dataout);
-//  dataOutStream.flush();
-//  DataInputStream datain = new DataInputStream(new ByteArrayInputStream(dataOutStream.toByteArray()));
-
-//  com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessage newClientUpdateMessage2 = 
-//  new com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessageImpl();
-
-//  newClientUpdateMessage2.fromData(datain);
-
-//  Log.getLogWriter().info("Newly constructed ClientUpdateMessage is :" + newClientUpdateMessage2.toString());
-//  if (!newClientUpdateMessage2.hasCqs() || 
-//  ((ClientUpdateMessageImpl)newClientUpdateMessage2).getClientCqs() == null ||
-//  ((ClientUpdateMessageImpl)newClientUpdateMessage2).getClientCqs().size() != 2){
-//  throw new Exception("CQ Info not present");
-//  }
-
-//  } catch (Exception ex) {
-//  Log.getLogWriter().info("Exception while serializing ClientUpdateMessage.", ex);
-//  return;
-//  }
-//  HADispatcherDUnitTest.isObjectPresent = true;
-//  }
-//  }
-//  }
-//  if (HADispatcherDUnitTest.isObjectPresent == true) {
-//  break; // From while.
-//  }
-//  try {
-//  Thread.sleep(10);
-//  } catch (Exception ex) {}
-//  }
-//  }
   }
 }