You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/06/07 20:54:58 UTC

[43/62] [abbrv] incubator-geode git commit: Merge remote-tracking branch 'origin/develop' into feature/GEODE-837

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd38e10f/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolDUnitTest.java
----------------------------------------------------------------------
diff --cc geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolDUnitTest.java
index 0634ce8,a4842de..d8fff8b
--- a/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolDUnitTest.java
+++ b/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolDUnitTest.java
@@@ -16,53 -16,18 +16,53 @@@
   */
  package com.gemstone.gemfire.cache.query.cq.dunit;
  
- import static org.junit.Assert.*;
 -import com.gemstone.gemfire.cache.*;
++import static com.gemstone.gemfire.distributed.DistributedSystemConfigProperties.*;
++import static com.gemstone.gemfire.test.dunit.Assert.*;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Enumeration;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Properties;
 +import java.util.Set;
 +
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +import com.gemstone.gemfire.cache.AttributesFactory;
 +import com.gemstone.gemfire.cache.Cache;
 +import com.gemstone.gemfire.cache.CacheException;
 +import com.gemstone.gemfire.cache.EvictionAction;
 +import com.gemstone.gemfire.cache.EvictionAttributes;
 +import com.gemstone.gemfire.cache.MirrorType;
 +import com.gemstone.gemfire.cache.PartitionAttributes;
 +import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 +import com.gemstone.gemfire.cache.Region;
 +import com.gemstone.gemfire.cache.RegionAttributes;
 +import com.gemstone.gemfire.cache.Scope;
  import com.gemstone.gemfire.cache.client.PoolFactory;
  import com.gemstone.gemfire.cache.client.PoolManager;
 -import com.gemstone.gemfire.cache.query.*;
 +import com.gemstone.gemfire.cache.query.CqAttributes;
 +import com.gemstone.gemfire.cache.query.CqAttributesFactory;
 +import com.gemstone.gemfire.cache.query.CqAttributesMutator;
 +import com.gemstone.gemfire.cache.query.CqExistsException;
 +import com.gemstone.gemfire.cache.query.CqListener;
 +import com.gemstone.gemfire.cache.query.CqQuery;
 +import com.gemstone.gemfire.cache.query.IndexType;
 +import com.gemstone.gemfire.cache.query.Query;
 +import com.gemstone.gemfire.cache.query.QueryService;
 +import com.gemstone.gemfire.cache.query.SelectResults;
 +import com.gemstone.gemfire.cache.query.Struct;
  import com.gemstone.gemfire.cache.query.data.Portfolio;
  import com.gemstone.gemfire.cache.query.internal.CqStateImpl;
  import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
  import com.gemstone.gemfire.cache.server.CacheServer;
  import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
 -import com.gemstone.gemfire.cache30.CacheTestCase;
  import com.gemstone.gemfire.cache30.CertifiableTestCacheListener;
  import com.gemstone.gemfire.cache30.ClientServerTestCase;
- import com.gemstone.gemfire.distributed.internal.DistributionConfig;
  import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
  import com.gemstone.gemfire.internal.AvailablePortHelper;
  import com.gemstone.gemfire.internal.cache.InitialImageOperation;
@@@ -70,26 -35,22 +70,27 @@@ import com.gemstone.gemfire.internal.ca
  import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
  import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy;
  import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
 -import com.gemstone.gemfire.test.dunit.*;
 -
 -import java.io.IOException;
 -import java.util.*;
 -
 -import static com.gemstone.gemfire.distributed.DistributedSystemConfigProperties.LOCATORS;
 -import static com.gemstone.gemfire.distributed.DistributedSystemConfigProperties.MCAST_PORT;
 +import com.gemstone.gemfire.test.dunit.Assert;
 +import com.gemstone.gemfire.test.dunit.Host;
 +import com.gemstone.gemfire.test.dunit.IgnoredException;
 +import com.gemstone.gemfire.test.dunit.Invoke;
 +import com.gemstone.gemfire.test.dunit.LogWriterUtils;
 +import com.gemstone.gemfire.test.dunit.NetworkUtils;
++import com.gemstone.gemfire.test.dunit.RMIException;
 +import com.gemstone.gemfire.test.dunit.SerializableRunnable;
 +import com.gemstone.gemfire.test.dunit.VM;
 +import com.gemstone.gemfire.test.dunit.Wait;
 +import com.gemstone.gemfire.test.dunit.WaitCriterion;
 +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
 +import com.gemstone.gemfire.test.junit.categories.DistributedTest;
  
  /**
-- * This class tests the ContiunousQuery mechanism in GemFire.
++ * This class tests the ContinuousQuery mechanism in GemFire.
   * It does so by creating a cache server with a cache and a pre-defined region and
   * a data loader. The client creates the same region and attaches the connection pool.
 - * 
 - *
   */
 -public class CqQueryUsingPoolDUnitTest extends CacheTestCase {
 +@Category(DistributedTest.class)
 +public class CqQueryUsingPoolDUnitTest extends JUnit4CacheTestCase {
    
    /** The port on which the bridge server was started in this VM */
    private static int bridgeServerPort;
@@@ -152,7 -113,7 +153,6 @@@
        "SELECT ALL * FROM /root/" + regions[0] + " p where p.ID > 0 and p.status='active'",
        //11 - Test for "No Alias" 
        "SELECT ALL * FROM /root/" + regions[0] + " where ID > 0",
--      
    };
    
    private String[] invalidCQs = new String [] {
@@@ -173,6 -138,6 +173,7 @@@
      // system before creating connection pools
      getSystem();
      Invoke.invokeInEveryVM(new SerializableRunnable("getSystem") {
++      @Override
        public void run() {
          getSystem();
        }
@@@ -208,6 -173,6 +209,7 @@@
    {
      SerializableRunnable createServer = new CacheSerializableRunnable(
          "Create Cache Server") {
++      @Override
        public void run2() throws CacheException
        {
          LogWriterUtils.getLogWriter().info("### Create Cache Server. ###");
@@@ -233,7 -198,7 +235,6 @@@
          catch (Exception ex) {
            Assert.fail("While starting CacheServer", ex);
          }
--        
        }
      };
  
@@@ -251,46 -216,46 +252,40 @@@
    {
      SerializableRunnable createServer = new CacheSerializableRunnable(
          "Create Cache Server") {
++      @Override
        public void run2() throws CacheException
        {
--          LogWriterUtils.getLogWriter().info("### Create Cache Server. ###");
--          //AttributesFactory factory = new AttributesFactory();
--          //factory.setScope(Scope.DISTRIBUTED_ACK);
--          //factory.setMirrorType(MirrorType.KEYS_VALUES);
--          
--          //int maxMem = 0;
--          AttributesFactory attr = new AttributesFactory();
--          //attr.setValueConstraint(valueConstraint);
--          PartitionAttributesFactory paf = new PartitionAttributesFactory();
--          if (isAccessor){
--            paf.setLocalMaxMemory(0);
--          }
--          PartitionAttributes prAttr = paf.setTotalNumBuckets(197).setRedundantCopies(redundantCopies).create();
--          attr.setPartitionAttributes(prAttr);
--          
--          assertFalse(getSystem().isLoner());
--          //assertTrue(getSystem().getDistributionManager().getOtherDistributionManagerIds().size() > 0);
--          for (int i = 0; i < regions.length; i++) {
--            Region r = createRegion(regions[i], attr.create());
--            LogWriterUtils.getLogWriter().info("Server created the region: "+r);
--          }
--          try {
--            startBridgeServer(port, true);
--          }
--          catch (Exception ex) {
--            Assert.fail("While starting CacheServer", ex);
--          }
--       
++        LogWriterUtils.getLogWriter().info("### Create Cache Server. ###");
++
++        AttributesFactory attr = new AttributesFactory();
++        PartitionAttributesFactory paf = new PartitionAttributesFactory();
++        if (isAccessor){
++          paf.setLocalMaxMemory(0);
++        }
++        PartitionAttributes prAttr = paf.setTotalNumBuckets(197).setRedundantCopies(redundantCopies).create();
++        attr.setPartitionAttributes(prAttr);
++
++        assertFalse(getSystem().isLoner());
++        for (int i = 0; i < regions.length; i++) {
++          Region r = createRegion(regions[i], attr.create());
++          LogWriterUtils.getLogWriter().info("Server created the region: "+r);
++        }
++        try {
++          startBridgeServer(port, true);
++        }
++        catch (Exception ex) {
++          Assert.fail("While starting CacheServer", ex);
++        }
        }
      };
  
      server.invoke(createServer);
    }
--  
--  
++
    /* Close Cache Server */
    public void closeServer(VM server) {
      server.invoke(new SerializableRunnable("Close CacheServer") {
++      @Override
        public void run() {
          LogWriterUtils.getLogWriter().info("### Close CacheServer. ###");
          stopBridgeServer(getCache());
@@@ -309,6 -274,6 +304,7 @@@
        final String poolName) {
      SerializableRunnable createQService =
        new CacheSerializableRunnable("Create Client") {
++      @Override
        public void run2() throws CacheException {
          LogWriterUtils.getLogWriter().info("### Create Client. ###");
          //Region region1 = null;
@@@ -343,19 -308,19 +339,19 @@@
      client.invoke(createQService);
    }
  
--
    /* Close Client */
    public void closeClient(VM client) {
      SerializableRunnable closeCQService =
        new CacheSerializableRunnable("Close Client") {
++      @Override
        public void run2() throws CacheException {
          LogWriterUtils.getLogWriter().info("### Close Client. ###");
          try {
            ((DefaultQueryService)getCache().getQueryService()).closeCqService();
          } catch (Exception ex) {
            LogWriterUtils.getLogWriter().info("### Failed to get CqService during ClientClose() ###");
++          //TODO: fix eaten exception
          }
--        
        }
      };
      
@@@ -365,17 -330,17 +361,20 @@@
  
    public void createFunctionalIndex(VM vm, final String indexName, final String indexedExpression, final String fromClause) {
      vm.invoke(new CacheSerializableRunnable("Create Functional Index") {
++      @Override
        public void run2() throws CacheException {
          QueryService qs = null;
          try {
            qs = getCache().getQueryService();
          }catch (Exception ex) {
            LogWriterUtils.getLogWriter().info("### Failed to get CqService during ClientClose() ###");
++          //TODO: fix eaten exception
          }
          try {
            qs.createIndex(indexName, IndexType.FUNCTIONAL, indexedExpression, fromClause);
          } catch (Exception ex) {
            LogWriterUtils.getLogWriter().info("### Failed to create Index :" + indexName);
++          //TODO: fix eaten exception
          }
        }
      });
@@@ -384,6 -349,6 +383,7 @@@
    /* Create/Init values */
    public void createValues(VM vm, final String regionName, final int size) {
      vm.invoke(new CacheSerializableRunnable("Create values") {
++      @Override
        public void run2() throws CacheException {
          Region region1 = getRootRegion().getSubregion(regionName);
          for (int i = 1; i <= size; i++) {
@@@ -397,6 -362,6 +397,7 @@@
    /* Create/Init values */
    public void createValuesWithTime(VM vm, final String regionName, final int size) {
      vm.invoke(new CacheSerializableRunnable("Create values") {
++      @Override
        public void run2() throws CacheException {
          Region region1 = getRootRegion().getSubregion(regionName);
          for (int i = 1; i <= size; i++) {
@@@ -412,6 -377,6 +413,7 @@@
    /* delete values */
    public void deleteValues(VM vm, final String regionName, final int size) {
      vm.invoke(new CacheSerializableRunnable("Create values") {
++      @Override
        public void run2() throws CacheException {
          Region region1 = getRootRegion().getSubregion(regionName);
          for (int i = 1; i <= size; i++) {
@@@ -428,6 -393,6 +430,7 @@@
     */  
    public void invalidateValues(VM vm, final String regionName, final int size) {
      vm.invoke(new CacheSerializableRunnable("Create values") {
++      @Override
        public void run2() throws CacheException {
          Region region1 = getRootRegion().getSubregion(regionName);
          for (int i = 1; i <= size; i++) {
@@@ -449,6 -414,6 +452,7 @@@
    
    public void createPool(VM vm, final String poolName, final String[] servers, final int[] ports, final String redundancyLevel) {
      vm.invoke(new CacheSerializableRunnable("createPool :" + poolName) {
++      @Override
        public void run2() throws CacheException {
          // Create Cache.
          getCache();
@@@ -475,11 -440,11 +479,8 @@@
    /* Register CQs */
    public void createCQ(VM vm, final String poolName, final String cqName, final String queryStr) {
      vm.invoke(new CacheSerializableRunnable("Create CQ :" + cqName) {
++      @Override
        public void run2() throws CacheException {
--        //pause(60 * 1000);
--        //getLogWriter().info("### DEBUG CREATE CQ START ####");
--        //pause(20 * 1000);
--        
          LogWriterUtils.getLogWriter().info("### Create CQ. ###" + cqName);
          // Get CQ Service.
          QueryService qService = null;
@@@ -511,6 -476,6 +512,7 @@@
    // REMOVE..........
    public void createCQ(VM vm, final String cqName, final String queryStr) {
      vm.invoke(new CacheSerializableRunnable("Create CQ :" + cqName) {
++      @Override
        public void run2() throws CacheException {
          //pause(60 * 1000);
          //getLogWriter().info("### DEBUG CREATE CQ START ####");
@@@ -547,6 -512,6 +549,7 @@@
    /* Register CQs  with no name, execute, and close*/
    public void createAndExecCQNoName(VM vm, final String poolName,  final String queryStr) {
      vm.invoke(new CacheSerializableRunnable("Create CQ with no name:" ) {
++      @Override
        public void run2() throws CacheException {
          //pause(60 * 1000);
          LogWriterUtils.getLogWriter().info("### DEBUG CREATE CQ START ####");
@@@ -642,91 -607,91 +645,79 @@@
        final int expectedResultsSize,
        final String[] expectedKeys,
        final String expectedErr) {
++
      vm.invoke(new CacheSerializableRunnable("Execute CQ :" + cqName) {
  
        private void work() throws CacheException {
--      //pause(60 * 1000);
--      LogWriterUtils.getLogWriter().info("### DEBUG EXECUTE CQ START ####");
--      //pause(20 * 1000);
--      
--      // Get CQ Service.
--      QueryService cqService = null;
--      CqQuery cq1 = null;
--//    try {
--      cqService = getCache().getQueryService();
--//    } catch (Exception cqe) {
--//    getLogWriter().error(cqe);
--//    AssertionError err = new AssertionError("Failed to get QueryService" + cqName);
--//    err.initCause(ex);
--//    throw err;
--//    fail("Failed to getCQService.");
--//    }
--      
--      // Get CqQuery object.
--      try {
--        cq1 = cqService.getCq(cqName);
--        if (cq1 == null) {
--          LogWriterUtils.getLogWriter().info("Failed to get CqQuery object for CQ name: " + cqName);
--          fail("Failed to get CQ " + cqName);
--        }
--        else {
--          LogWriterUtils.getLogWriter().info("Obtained CQ, CQ name: " + cq1.getName());
--          assertTrue("newCq() state mismatch", cq1.getState().isStopped());
--        }
--      } catch (Exception ex){
--        LogWriterUtils.getLogWriter().info("CqService is :" + cqService);
--        LogWriterUtils.getLogWriter().error(ex);
--        Assert.fail("Failed to execute  CQ " + cqName, ex);
--      }
--      
--      if (initialResults) {
--        SelectResults cqResults = null;
--        
++        LogWriterUtils.getLogWriter().info("### DEBUG EXECUTE CQ START ####");
++
++        // Get CQ Service.
++        QueryService cqService = null;
++        CqQuery cq1 = null;
++        cqService = getCache().getQueryService();
++
++        // Get CqQuery object.
          try {
--          cqResults = cq1.executeWithInitialResults();
++          cq1 = cqService.getCq(cqName);
++          if (cq1 == null) {
++            LogWriterUtils.getLogWriter().info("Failed to get CqQuery object for CQ name: " + cqName);
++            fail("Failed to get CQ " + cqName);
++          }
++          else {
++            LogWriterUtils.getLogWriter().info("Obtained CQ, CQ name: " + cq1.getName());
++            assertTrue("newCq() state mismatch", cq1.getState().isStopped());
++          }
          } catch (Exception ex){
            LogWriterUtils.getLogWriter().info("CqService is :" + cqService);
--          ex.printStackTrace();
--          AssertionError err = new AssertionError("Failed to execute  CQ " + cqName);
--          err.initCause(ex);
--          throw err;
--        }
--        LogWriterUtils.getLogWriter().info("initial result size = " + cqResults.size());
--        assertTrue("executeWithInitialResults() state mismatch", cq1.getState().isRunning());
--        if (expectedResultsSize >= 0) {
--          assertEquals("Unexpected results size for CQ: " + cqName + 
--              " CQ Query :" + cq1.getQueryString(), 
--              expectedResultsSize, cqResults.size());
--        }
--        
--        if (expectedKeys != null) {
--          HashSet resultKeys = new HashSet();
--          for (Object o : cqResults.asList()) {
--            Struct s = (Struct)o;
--            resultKeys.add(s.get("key"));
++          LogWriterUtils.getLogWriter().error(ex);
++          Assert.fail("Failed to execute  CQ " + cqName, ex);
++        }
++
++        if (initialResults) {
++          SelectResults cqResults = null;
++
++          try {
++            cqResults = cq1.executeWithInitialResults();
++          } catch (Exception ex){
++            fail("Failed to execute  CQ " + cqName, ex);
            }
--          for (int i =0; i < expectedKeys.length; i++){
--            assertTrue("Expected key :" +  expectedKeys[i] + 
--                " Not found in CqResults for CQ: " + cqName + 
--              " CQ Query :" + cq1.getQueryString() + 
--              " Keys in CqResults :" + resultKeys,
--              resultKeys.contains(expectedKeys[i]));
++          LogWriterUtils.getLogWriter().info("initial result size = " + cqResults.size());
++          assertTrue("executeWithInitialResults() state mismatch", cq1.getState().isRunning());
++          if (expectedResultsSize >= 0) {
++            assertEquals("Unexpected results size for CQ: " + cqName +
++                " CQ Query :" + cq1.getQueryString(),
++                expectedResultsSize, cqResults.size());
++          }
++
++          if (expectedKeys != null) {
++            HashSet resultKeys = new HashSet();
++            for (Object o : cqResults.asList()) {
++              Struct s = (Struct)o;
++              resultKeys.add(s.get("key"));
++            }
++            for (int i =0; i < expectedKeys.length; i++){
++              assertTrue("Expected key :" +  expectedKeys[i] +
++                  " Not found in CqResults for CQ: " + cqName +
++                " CQ Query :" + cq1.getQueryString() +
++                " Keys in CqResults :" + resultKeys,
++                resultKeys.contains(expectedKeys[i]));
++            }
            }
          }
--      } 
--      else {
--        try {
--          cq1.execute();
--        } catch (Exception ex){
--          if (expectedErr == null) {
--            LogWriterUtils.getLogWriter().info("CqService is :" + cqService, ex);
++        else {
++          try {
++            cq1.execute();
++          } catch (Exception ex){
++            if (expectedErr == null) {
++              LogWriterUtils.getLogWriter().info("CqService is :" + cqService, ex);
++            }
++            Assert.fail("Failed to execute  CQ " + cqName, ex);
            }
--          Assert.fail("Failed to execute  CQ " + cqName, ex);
++          assertTrue("execute() state mismatch", cq1.getState().isRunning());
          }
--        assertTrue("execute() state mismatch", cq1.getState().isRunning());
        }
--    }
--      
++
++      @Override
        public void run2() throws CacheException {
          if (expectedErr != null) {
            getCache().getLogger().info("<ExpectedException action=add>"
@@@ -734,7 -699,7 +725,7 @@@
          }
          try {
            work();
--        } 
++        }
          finally {
            if (expectedErr != null) {
              getCache().getLogger().info("<ExpectedException action=remove>"
@@@ -742,12 -707,12 +733,13 @@@
            }
          }
        }
--      });   
++    });
    }
    
    /* Stop/pause CQ */
    public void stopCQ(VM vm, final String cqName) throws Exception {
      vm.invoke(new CacheSerializableRunnable("Stop CQ :" + cqName) {
++      @Override
        public void run2() throws CacheException {
          LogWriterUtils.getLogWriter().info("### Stop CQ. ###" + cqName);
          // Get CQ Service.
@@@ -775,6 -740,6 +767,7 @@@
    /* Stop/pause CQ */
    private void stopExecCQ(VM vm, final String cqName, final int count) throws Exception {
      vm.invoke(new CacheSerializableRunnable("Stop CQ :" + cqName) {
++      @Override
        public void run2() throws CacheException {
          CqQuery cq1 = null;
          LogWriterUtils.getLogWriter().info("### Stop and Exec CQ. ###" + cqName);
@@@ -817,11 -782,11 +810,11 @@@
        }
      });
    }
--  
--  
++
    /* UnRegister CQs */
    public void closeCQ(VM vm, final String cqName) throws Exception {
      vm.invoke(new CacheSerializableRunnable("Close CQ :" + cqName) {
++      @Override
        public void run2() throws CacheException {
          LogWriterUtils.getLogWriter().info("### Close CQ. ###" + cqName);
          // Get CQ Service.
@@@ -848,6 -813,6 +841,7 @@@
    /* Register CQs */
    public void registerInterestListCQ(VM vm, final String regionName, final int keySize, final boolean all) {
      vm.invoke(new CacheSerializableRunnable("Register InterestList and CQ") {
++      @Override
        public void run2() throws CacheException {
          
          // Get CQ Service.
@@@ -856,10 -821,10 +850,7 @@@
            region = getRootRegion().getSubregion(regionName);
            region.getAttributesMutator().setCacheListener(new CertifiableTestCacheListener(LogWriterUtils.getLogWriter()));
          } catch (Exception cqe) {
--          AssertionError err = new AssertionError("Failed to get Region.");
--          err.initCause(cqe);
--          throw err;
--
++          fail("Failed to get Region.", cqe);
          }
          
          try {
@@@ -873,9 -838,9 +864,7 @@@
              region.registerInterest(list);
            }
          } catch (Exception ex) {
--          AssertionError err = new AssertionError("Failed to Register InterestList");
--          err.initCause(ex);
--          throw err;
++          fail("Failed to Register InterestList", ex);
          }
        }
      });   
@@@ -884,6 -849,6 +873,7 @@@
    /* Validate CQ Count */
    public void validateCQCount(VM vm, final int cqCnt) throws Exception {
      vm.invoke(new CacheSerializableRunnable("validate cq count") {
++      @Override
        public void run2() throws CacheException {
          // Get CQ Service.
          
@@@ -898,20 -863,20 +888,20 @@@
          try {
            numCqs = cqService.getCqs().length;
          } catch (Exception ex) {
--          Assert.fail ("Failed to get the CQ Count.", ex);
++          Assert.fail("Failed to get the CQ Count.", ex);
          }
          assertEquals("Number of cqs mismatch.", cqCnt, numCqs);
        }
      });
    }
    
--  
--  /** 
++  /**
     * Throws AssertionError if the CQ can be found or if any other
     * error occurs
     */
    private void failIfCQExists(VM vm, final String cqName) {
      vm.invoke(new CacheSerializableRunnable("Fail if CQ exists") {
++      @Override
        public void run2() throws CacheException {
          LogWriterUtils.getLogWriter().info("### Fail if CQ Exists. ### " + cqName);
          // Get CQ Service.
@@@ -933,6 -898,6 +923,7 @@@
    private void validateCQError(VM vm, final String cqName,
        final int numError) {
      vm.invoke(new CacheSerializableRunnable("Validate CQs") {
++      @Override
        public void run2() throws CacheException {
          
          LogWriterUtils.getLogWriter().info("### Validating CQ. ### " + cqName);
@@@ -983,6 -948,6 +974,7 @@@
        final int queryDeletes,
        final int totalEvents) {
      vm.invoke(new CacheSerializableRunnable("Validate CQs") {
++      @Override
        public void run2() throws CacheException {
          LogWriterUtils.getLogWriter().info("### Validating CQ. ### " + cqName);
          // Get CQ Service.
@@@ -1020,7 -985,7 +1012,6 @@@
            // Since ResultSet is not maintained for this release.
            // Instead of resultSize its been validated with total number of events.
            fail("test for event counts instead of results size");
--//        assertIndexDetailsEquals("Result Size mismatch", resultSize, listener.getTotalEventCount());
          }
          
          // Check for create count.
@@@ -1098,6 -1063,6 +1089,7 @@@
  
    private void waitForEvent(VM vm, final int event, final String cqName, final String key) {
      vm.invoke(new CacheSerializableRunnable("validate cq count") {
++      @Override
        public void run2() throws CacheException {
          // Get CQ Service.
          QueryService cqService = null;
@@@ -1144,7 -1109,7 +1136,6 @@@
            case REGION_INVALIDATE :
              listener.waitForRegionInvalidate();
              break;            
--
          }
        }
      });
@@@ -1157,6 -1122,6 +1148,7 @@@
     */
    public void waitForCqState(VM vm, final String cqName, final int state) {
      vm.invoke(new CacheSerializableRunnable("Wait For cq State") {
++      @Override
        public void run2() throws CacheException {
          // Get CQ Service.
          QueryService cqService = null;
@@@ -1188,6 -1153,6 +1180,7 @@@
  
    public void clearCQListenerEvents(VM vm, final String cqName) {
      vm.invoke(new CacheSerializableRunnable("validate cq count") {
++      @Override
        public void run2() throws CacheException {
          // Get CQ Service.
          
@@@ -1213,6 -1178,6 +1206,7 @@@
    
    public void validateQuery(VM vm, final String query, final int resultSize) {
      vm.invoke(new CacheSerializableRunnable("Validate Query") {
++      @Override
        public void run2() throws CacheException {
          LogWriterUtils.getLogWriter().info("### Validating Query. ###");
          QueryService qs = getCache().getQueryService();
@@@ -1252,10 -1217,10 +1246,7 @@@
      
      props.setProperty("endpoints", endPoints);
      props.setProperty("retryAttempts", "1");
--    //props.setProperty("establishCallbackConnection", "true");
--    //props.setProperty("LBPolicy", "Sticky");
--    //props.setProperty("readTimeout", "120000");
--    
++
      // Add other property elements.
      if (newProps != null) {
        Enumeration e = newProps.keys();
@@@ -1267,10 -1232,10 +1258,10 @@@
      return props;
    }
    
--  
    // Exercise CQ attributes mutator functions
    private void mutateCQAttributes(VM vm, final String cqName, final int mutator_function) throws Exception {
      vm.invoke(new CacheSerializableRunnable("Stop CQ :" + cqName) {
++      @Override
        public void run2() throws CacheException {
          CqQuery cq1 = null;
          LogWriterUtils.getLogWriter().info("### CQ attributes mutator for ###" + cqName);
@@@ -1327,15 -1292,14 +1318,11 @@@
        }
      });
    }
--  
--  
--  
--  
++
    /**
     * Test for InterestList and CQ registered from same clients.
--   * @throws Exception
     */
 +  @Test
    public void testInterestListAndCQs() throws Exception {
      final Host host = Host.getHost(0);
      VM server = host.getVM(0);
@@@ -1380,20 -1344,20 +1367,18 @@@
          /* queryDeletes: */ 0,
          /* totalEvents: */ size);
      
--    
      // Validate InterestList.
      // CREATE
      client.invoke(new CacheSerializableRunnable("validate updates") {
++      @Override
        public void run2() throws CacheException {
          final Region region = getRootRegion().getSubregion(regions[0]);
          assertNotNull(region);
          
--//        Set keys = region.entrySet();
--//        assertIndexDetailsEquals("Mismatch, number of keys in local region is not equal to the interest list size",
--//            size, keys.size());
          // TODO does this WaitCriterion actually help?
          WaitCriterion wc = new WaitCriterion() {
            String excuse;
++          @Override
            public boolean done() {
              int sz = region.entrySet().size();
              if (sz == size) {
@@@ -1404,6 -1368,6 +1389,7 @@@
                  size + ")";
              return false;
            }
++          @Override
            public String description() {
              return excuse;
            }
@@@ -1424,9 -1388,9 +1410,9 @@@
      for (int i=1; i <=10; i++){
        waitForUpdated(client, "testInterestListAndCQs_0", KEY + i);
      }
--    
--    
++
      client.invoke(new CacheSerializableRunnable("validate updates") {
++      @Override
        public void run2() throws CacheException {
          Region region = getRootRegion().getSubregion(regions[0]);
          assertNotNull(region);
@@@ -1445,6 -1409,6 +1431,7 @@@
      
      // INVALIDATE
      server.invoke(new CacheSerializableRunnable("Invalidate values") {
++      @Override
        public void run2() throws CacheException {
          Region region1 = getRootRegion().getSubregion(regions[0]);
          for (int i = 1; i <= size; i++) {
@@@ -1452,12 -1416,12 +1439,11 @@@
          }
        }
      });
--    
--    
++
      waitForInvalidated(client, "testInterestListAndCQs_0", KEY + 10);
--    
--    
++
      client.invoke(new CacheSerializableRunnable("validate invalidates") {
++      @Override
        public void run2() throws CacheException {
          Region region = getRootRegion().getSubregion(regions[0]);
          assertNotNull(region);
@@@ -1487,6 -1451,6 +1473,7 @@@
      // DESTROY - this should not have any effect on CQ, as the events are
      // already destroyed from invalidate events.
      server.invoke(new CacheSerializableRunnable("Invalidate values") {
++      @Override
        public void run2() throws CacheException {
          Region region1 = getRootRegion().getSubregion(regions[0]);
          for (int i = 1; i <= size; i++) {
@@@ -1497,6 -1461,6 +1484,7 @@@
  
      // Wait for destroyed.
      client.invoke(new CacheSerializableRunnable("validate destroys") {
++      @Override
        public void run2() throws CacheException {
          Region region = getRootRegion().getSubregion(regions[0]);
          assertNotNull(region);
@@@ -1521,15 -1485,14 +1509,12 @@@
      closeClient(client);
      closeServer(server);
    }
--  
--  
++
    /**
     * Test for CQ register and UnRegister.
--   * @throws Exception
     */
 +  @Test
    public void testCQStopExecute() throws Exception {
--    
      final Host host = Host.getHost(0);
      VM server = host.getVM(0);
      VM client = host.getVM(1);
@@@ -1542,8 -1505,8 +1527,6 @@@
      String poolName = "testCQStopExecute";
      createPool(client, poolName, host0, thePort);
      
--    //createClient(client, thePort, host0);
--    
      /* Create CQs. */
      createCQ(client, poolName, "testCQStopExecute_0", cqs[0]); 
      validateCQCount(client, 1);
@@@ -1556,13 -1519,13 +1539,10 @@@
      // Wait for client to Synch.
      
      waitForCreated(client, "testCQStopExecute_0", KEY+size);
--    
--    
++
      // Check if Client and Server in sync.
--    //validateServerClientRegionEntries(server, client, regions[0]);
--    validateQuery(server, cqs[0], 10);    
++    validateQuery(server, cqs[0], 10);
      // validate CQs.
--    //validateCQ(client, "testCQStopExecute_0", size, noTest, noTest, noTest);
      validateCQ(client, "testCQStopExecute_0",
          /* resultSize: */ noTest,
          /* creates: */ size,
@@@ -1586,10 -1549,10 +1566,8 @@@
      size = 30;
      
      // Check if Client and Server in sync.
--    //validateServerClientRegionEntries(server, client, regions[0]);
--    validateQuery(server, cqs[0], 20);    
++    validateQuery(server, cqs[0], 20);
      // validate CQs.
--    //validateCQ(client, "testCQStopExecute_0", size, noTest, noTest, noTest);
      validateCQ(client, "testCQStopExecute_0",
          /* resultSize: */ noTest,
          /* creates: */ 20,
@@@ -1599,8 -1562,8 +1577,7 @@@
          /* queryUpdates: */ 10,
          /* queryDeletes: */ 0,
          /* totalEvents: */ size);
--    
--    
++
      // Stop and execute CQ 20 times
      stopExecCQ(client, "testCQStopExecute_0", 20);
      
@@@ -1614,9 -1577,8 +1591,8 @@@
    
    /**
     * Test for CQ Attributes Mutator functions
--   * @throws Exception
     */
 +  @Test
    public void testCQAttributesMutator() throws Exception {
      final Host host = Host.getHost(0);
      VM server = host.getVM(0);
@@@ -1629,8 -1591,8 +1605,7 @@@
      
      String poolName = "testCQAttributesMutator";
      createPool(client, poolName, host0, thePort);
--    //createClient(client, thePort, host0);
--    
++
      /* Create CQs. */
      String cqName = new String("testCQAttributesMutator_0");
      createCQ(client, poolName, cqName, cqs[0]); 
@@@ -1712,11 -1674,10 +1687,9 @@@
    
    /**
     * Test for CQ register and UnRegister.
--   * @throws Exception
     */
 +  @Test
    public void testCQCreateClose() throws Exception {
--    
      final Host host = Host.getHost(0);
      VM server = host.getVM(0);
      VM client = host.getVM(1);
@@@ -1730,13 -1691,13 +1703,6 @@@
      System.out.println("##### Pool Name :" + poolName + " host :" + host0 + " port :" + thePort);
      createPool(client, poolName, host0, thePort);
  
--    // createClient(client, thePort, host0);
--    
--    /* debug */
--    //getLogWriter().info("### DEBUG STOP ####");
--    //pause(60 * 1000);
--    //getLogWriter().info("### DEBUG START ####");
--    
      /* Create CQs. */
      createCQ(client, poolName, "testCQCreateClose_0", cqs[0]); 
      validateCQCount(client, 1);
@@@ -1789,7 -1750,7 +1755,7 @@@
      try {
        createCQ(client, poolName, "testCQCreateClose_0", cqs[0]);
        fail("Trying to create CQ with same name. Should have thrown CQExistsException");
--    } catch (com.gemstone.gemfire.test.dunit.RMIException rmiExc) {
++    } catch (RMIException rmiExc) {
        Throwable cause = rmiExc.getCause();
        assertTrue("unexpected cause: " + cause.getClass().getName(), cause instanceof AssertionError);
        Throwable causeCause = cause.getCause(); // should be a CQExistsException
@@@ -1804,7 -1765,7 +1770,7 @@@
      try {
        createCQ(server, "testCQCreateClose_1", cqs[0]);
        fail("Trying to create CQ on Cache Server. Should have thrown Exception.");
--    } catch (com.gemstone.gemfire.test.dunit.RMIException rmiExc) {
++    } catch (RMIException rmiExc) {
        Throwable cause = rmiExc.getCause();
        assertTrue("unexpected cause: " + cause.getClass().getName(), 
            cause instanceof AssertionError);
@@@ -1822,6 -1783,6 +1788,7 @@@
      /* Test for closeAllCQs() */
      
      client.invoke(new CacheSerializableRunnable("CloseAll CQ :") {
++      @Override
        public void run2() throws CacheException {
          LogWriterUtils.getLogWriter().info("### Close All CQ. ###");
          // Get CQ Service.
@@@ -1856,6 -1817,6 +1823,7 @@@
      
      // Call close all CQ.
      client.invoke(new CacheSerializableRunnable("CloseAll CQ 2 :") {
++      @Override
        public void run2() throws CacheException {
          LogWriterUtils.getLogWriter().info("### Close All CQ 2. ###");
          // Get CQ Service.
@@@ -1883,9 -1844,8 +1851,8 @@@
    /**
     * This will test the events after region destory.
     * The CQs on the destroy region needs to be closed.
--   *
     */
 +  @Test
    public void testRegionDestroy() throws Exception {
      final Host host = Host.getHost(0);
      VM server = host.getVM(0);
@@@ -1918,8 -1878,8 +1885,7 @@@
      // Wait for client to Synch.
      
      waitForCreated(client, "testRegionDestroy_0", KEY + 10);
--    
--    
++
      // validate CQs.
      validateCQ(client, "testRegionDestroy_0",
          /* resultSize: */ noTest,
@@@ -1934,10 -1894,10 +1900,12 @@@
      // Validate InterestList.
      // CREATE
      client.invoke(new CacheSerializableRunnable("validate updates") {
++      @Override
        public void run2() throws CacheException {
          // Wait for the region to become the correct size
          WaitCriterion wc = new WaitCriterion() {
            String excuse;
++          @Override
            public boolean done() {
              Region region = getRootRegion().getSubregion(regions[0]);
              if (region == null) {
@@@ -1951,6 -1911,6 +1919,7 @@@
              }
              return true;
            }
++          @Override
            public String description() {
              return excuse;
            }
@@@ -1971,6 -1931,6 +1940,7 @@@
      
      // Destroy Region.
      server.invoke(new CacheSerializableRunnable("Destroy Region") {
++      @Override
        public void run2() throws CacheException {
          Region region1 = getRootRegion().getSubregion(regions[0]);
          region1.destroyRegion();
@@@ -1982,15 -1942,14 +1952,13 @@@
      
      closeClient(client);
      closeServer(server);
--    
    }
    
    /**
     * Test for CQ with multiple clients.
     */
 +  @Test
    public void testCQWithMultipleClients() throws Exception {
--    
      final Host host = Host.getHost(0);
      VM server = host.getVM(0);
      VM client1 = host.getVM(1);
@@@ -2045,8 -2004,8 +2013,7 @@@
          /* queryUpdates: */ 0,
          /* queryDeletes: */ 0,
          /* totalEvents: */ size);
--    
--    
++
      /* Close test */
      closeCQ(client1, "testCQWithMultipleClients_0");
      
@@@ -2071,10 -2030,10 +2038,8 @@@
      // Update values on Server. This will be updated on new Client CQs.
      createValues(server, regions[0], size);
      
--    
      waitForUpdated(client3, "testCQWithMultipleClients_0", KEY + 10);
      
--    
      validateCQ(client3, "testCQWithMultipleClients_0",
          /* resultSize: */ noTest,
          /* creates: */ 0,
@@@ -2109,10 -2068,10 +2074,8 @@@
      // Update values on server, update again.
      createValues(server, regions[0], size);
      
--    
      waitForUpdated(client2, "testCQWithMultipleClients_0", KEY + 10);
      
--    
      validateCQ(client2, "testCQWithMultipleClients_0",
          /* resultSize: */ noTest,
          /* creates: */ size,
@@@ -2144,9 -2103,8 +2107,8 @@@
    /**
     * Test for CQ ResultSet.
     */
 +  @Test
    public void testCQResultSet() throws Exception {
--    
      final Host host = Host.getHost(0);
      VM server = host.getVM(0);
      VM client = host.getVM(1);
@@@ -2159,9 -2117,9 +2121,6 @@@
      String poolName = "testCQResultSet";
      createPool(client, poolName, host0, thePort);
      
--    // Create client.
--    // createClient(client, thePort, host0);
--    
      /* CQ Test with initial Values. */
      int size = 10;
      createValues(server, regions[0], size);
@@@ -2185,31 -2143,31 +2144,6 @@@
      
      executeCQ(client, "testCQResultSet_1", true, 2, null, null);
      
--    /* compare values...
--     Disabled since we don't currently maintain results on the client
--     
--     validateCQ(client, "testCQResultSet_1", 2, noTest, noTest, noTest);
--     Portfolio[] values = new Portfolio[] {new Portfolio(2), new Portfolio(4)}; 
--     Hashtable t = new Hashtable();
--     String[] keys = new String[] {"key-2", "key-4"};
--     t.put(keys[0], values[0]);
--     t.put(keys[1], values[1]);
--     
--     compareValues(client, "testCQResultSet_1", t);
--     
--     deleteValues(server, regions[1], 3);
--     t.remove("key-4");
--     pause(2 * 1000);
--     
--     try {
--     compareValues(client, "testCQResultSet_1", t);
--     fail("Should have thrown Exception. The value should not be present in cq results region");
--     }
--     catch (Exception ex) { // @todo check for specific exception type
--     }
--     
--     */
--    
      // Close.
      closeClient(client);    
      closeServer(server);
@@@ -2217,11 -2175,10 +2151,9 @@@
    
    /**
     * Test for CQ Listener events.
--   *
     */
 +  @Test
    public void testCQEvents() throws Exception {
--    
      final Host host = Host.getHost(0);
      VM server = host.getVM(0);
      VM client = host.getVM(1);
@@@ -2293,6 -2250,6 +2225,7 @@@
      
      // Insert invalid Events.
      server.invoke(new CacheSerializableRunnable("Create values") {
++      @Override
        public void run2() throws CacheException {
          Region region1 = getRootRegion().getSubregion(regions[0]);
          for (int i = -1; i >= -5; i--) {
@@@ -2322,12 -2279,11 +2255,9 @@@
    
    /**
     * Test query execution multiple times on server without ALIAS.
--   * @throws Exception
     */
 +  @Test
    public void testCqEventsWithoutAlias() throws Exception {
--
--    
      final Host host = Host.getHost(0);
      VM server = host.getVM(0);
      VM client = host.getVM(1);
@@@ -2340,9 -2296,9 +2270,6 @@@
      String poolName = "testCQEvents";
      createPool(client, poolName, host0, thePort);
  
--    // Create client.
--    //createClient(client, thePort, host0);
--    
      // Create CQs.
      createCQ(client, poolName, "testCQEvents_0", cqs[11]);
      
@@@ -2399,6 -2355,6 +2326,7 @@@
      
      // Insert invalid Events.
      server.invoke(new CacheSerializableRunnable("Create values") {
++      @Override
        public void run2() throws CacheException {
          Region region1 = getRootRegion().getSubregion(regions[0]);
          for (int i = -1; i >= -5; i--) {
@@@ -2425,11 -2381,10 +2353,11 @@@
      closeClient(client);
      closeServer(server);
    }
++
    /**
     * Test for stopping and restarting CQs.
--   * @throws Exception
     */
 +  @Test
    public void testEnableDisableCQ() throws Exception {
      final Host host = Host.getHost(0);
      VM server = host.getVM(0);
@@@ -2452,6 -2407,6 +2380,7 @@@
      
      /* Test for disableCQ */
      client.invoke(new CacheSerializableRunnable("Client disableCQs()") {
++      @Override
        public void run2() throws CacheException {
          // Get CQ Service.
          QueryService cqService = null;
@@@ -2482,6 -2437,6 +2411,7 @@@
      
      /* Test for enable CQ */
      client.invoke(new CacheSerializableRunnable("Client enableCQs()") {
++      @Override
        public void run2() throws CacheException {
          // Get CQ Service.
          QueryService cqService = null;
@@@ -2509,6 -2464,6 +2439,7 @@@
      
      /* Test for disableCQ on Region*/
      client.invoke(new CacheSerializableRunnable("Client disableCQs()") {
++      @Override
        public void run2() throws CacheException {
          // Get CQ Service.
          QueryService cqService = null;
@@@ -2537,6 -2492,6 +2468,7 @@@
      
      /* Test for enable CQ on region */
      client.invoke(new CacheSerializableRunnable("Client enableCQs()") {
++      @Override
        public void run2() throws CacheException {
          // Get CQ Service.
          QueryService cqService = null;
@@@ -2569,9 -2524,8 +2501,8 @@@
    
    /**
     * Test for Complex queries.
--   * @throws Exception
     */
 +  @Test
    public void testQuery() throws Exception {
      final Host host = Host.getHost(0);
      VM server = host.getVM(0);
@@@ -2614,9 -2568,8 +2545,8 @@@
    
    /**
     * Test for CQ Fail over.
--   * @throws Exception
     */
 +  @Test
    public void testCQFailOver() throws Exception {
      final Host host = Host.getHost(0);
      VM server1 = host.getVM(0);
@@@ -2627,10 -2580,10 +2557,7 @@@
      
      final int port1 = server1.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
      final String host0 = NetworkUtils.getServerHostName(server1.getHost());
--    // Create client.
--//    Properties props = new Properties();
--    // Create client with redundancyLevel -1
--    
++
      final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
      
      //createClient(client, new int[] {port1, ports[0]}, host0, "-1");
@@@ -2704,9 -2657,8 +2631,8 @@@
    
    /**
     * Test for CQ Fail over/HA with redundancy level set.
--   * @throws Exception
     */
 +  @Test
    public void testCQHA() throws Exception {
      final Host host = Host.getHost(0);
      VM server1 = host.getVM(0);
@@@ -2733,8 -2685,8 +2659,7 @@@
      System.out.println("### Port on which server1 running : " + port1 + 
          " server2 running : " + thePort2 + 
          " Server3 running : " + port3);
--    
--    
++
      // Create client - With 3 server endpoints and redundancy level set to 2.
      
      String poolName = "testCQStopExecute";
@@@ -2757,10 -2709,10 +2682,8 @@@
      createValues(server1, regions[0], 10);
      createValues(server1, regions[1], 10);
      
--    
      waitForCreated(client, "testCQHA_0", KEY + 10);
      
--    
      // Clients expected initial result.
      int[] resultsCnt = new int[] {10, 1, 2};
      
@@@ -2773,10 -2725,10 +2696,8 @@@
      createValues(server2, regions[0], 10);
      createValues(server2, regions[1], 10);
      
--    
      waitForUpdated(client, "testCQHA_0", KEY + 10);
      
--    
      // Validate CQ.
      for (int i=0; i < numCQs; i++) {
        validateCQ(client, "testCQHA_" + i, noTest, resultsCnt[i], resultsCnt[i], noTest);
@@@ -2796,7 -2748,7 +2717,6 @@@
      
      waitForUpdated(client, "testCQHA_0", KEY + 10);
      
--    
      for (int i=0; i < numCQs; i++) {
        validateCQ(client, "testCQHA_" + i, noTest, resultsCnt[i], resultsCnt[i] * 2, noTest);
      }    
@@@ -2809,9 -2761,8 +2729,8 @@@
    /**
     * Test Filter registration during GII. 
     * Bug fix 39014
--   * @throws Exception
     */
 +  @Test
    public void testFilterRegistrationDuringGII() throws Exception {
      final Host host = Host.getHost(0);
      VM server1 = host.getVM(0);
@@@ -2859,6 -2810,6 +2778,7 @@@
  
      // Create server2.
      server2.invoke(new CacheSerializableRunnable("Create Cache Server") {
++      @Override
        public void run2() throws CacheException
        {
          LogWriterUtils.getLogWriter().info("### Create Cache Server. ###");
@@@ -2897,10 -2848,10 +2817,10 @@@
      );
  
      Wait.pause(3 * 1000);
--    
--    
++
      // Check if CQs are registered as part of GII.
      server2.invoke(new CacheSerializableRunnable("Create values") {
++      @Override
        public void run2() throws CacheException {
          DefaultQueryService qs = (DefaultQueryService)getCache().getQueryService();
          Collection<CacheClientProxy> proxies = CacheClientNotifier.getInstance().getClientProxies();
@@@ -2928,7 -2879,7 +2848,7 @@@
              }
            }
          } catch (Exception ex) {
--          fail("Exception while validating filter count. " + ex.getMessage());
++          fail("Exception while validating filter count. ", ex);
          }
        }
      });
@@@ -2943,10 -2894,9 +2863,9 @@@
    /**
     * Test without CQs.
     * This was added after an exception encountered with CQService, when there was
--   * no CQService intiated.
--   * @throws Exception
++   * no CQService initiated.
     */
 +  @Test
    public void testWithoutCQs() throws Exception {
      final Host host = Host.getHost(0);
      VM server1 = host.getVM(0);
@@@ -2963,6 -2913,6 +2882,7 @@@
      
      SerializableRunnable createConnectionPool =
        new CacheSerializableRunnable("Create region") {
++      @Override
        public void run2() throws CacheException {
          getCache();
          IgnoredException.addIgnoredException("java.net.ConnectException||java.net.SocketException");
@@@ -2974,12 -2924,12 +2894,12 @@@
          createRegion(regions[0], regionFactory.createRegionAttributes());
        }
      };
--    
--    
++
      // Create client.
      client.invoke(createConnectionPool);
      
      server1.invoke(new CacheSerializableRunnable("Create values") {
++      @Override
        public void run2() throws CacheException {
          Region region1 = getRootRegion().getSubregion(regions[0]);
          for (int i = 0; i < 20; i++) {
@@@ -2990,6 -2940,6 +2910,7 @@@
      
      // Put some values on the client.
      client.invoke(new CacheSerializableRunnable("Put values client") {
++      @Override
        public void run2() throws CacheException {
          Region region1 = getRootRegion().getSubregion(regions[0]);
          
@@@ -2999,13 -2949,13 +2920,11 @@@
        }
      });
      
--    
      Wait.pause(2 * 1000);
      closeServer(server1);
      closeServer(server2);
    }
--  
--  
++
    /**
     * Test getCQs for a regions
     */
@@@ -3020,9 -2969,9 +2939,6 @@@
      final int thePort = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
      final String host0 = NetworkUtils.getServerHostName(server.getHost());
      
--    // Create client.
--    // createClient(client, thePort, host0);
--    
      String poolName = "testGetCQsForARegionName";
      createPool(client, poolName, host0, thePort);
      
@@@ -3046,6 -2995,6 +2962,7 @@@
      executeCQ(client, "testQuery_8", true, null);
      
      client.invoke(new CacheSerializableRunnable("Client disableCQs()") {
++      @Override
        public void run2() throws CacheException {
          // Get CQ Service.
          QueryService cqService = null;
@@@ -3073,16 -3022,15 +2990,13 @@@
      // Close.
      closeClient(client);
      closeServer(server);
--    
    }
    
    /**
     * Tests execution of queries with NULL in where clause like where ID = NULL
     * etc.
--   * 
--   * @throws Exception
     */
 +  @Test
    public void testQueryWithNULLInWhereClause() throws Exception
    {
      final Host host = Host.getHost(0);
@@@ -3127,16 -3075,15 +3041,13 @@@
      // Close.
      closeClient(client);
      closeServer(server);
--    
    }
    
    /**
     * Tests execution of queries with NULL in where clause like where ID = NULL
     * etc.
--   * 
--   * @throws Exception
     */
 +  @Test
    public void testForSupportedRegionAttributes() throws Exception
    {
      final Host host = Host.getHost(0);
@@@ -3147,6 -3094,6 +3058,7 @@@
      // Create server with Global scope.
      SerializableRunnable createServer = new CacheSerializableRunnable(
      "Create Cache Server") {
++      @Override
        public void run2() throws CacheException
        {
          LogWriterUtils.getLogWriter().info("### Create Cache Server. ###");
@@@ -3189,9 -3136,9 +3101,6 @@@
      String poolName = "testForSupportedRegionAttributes";
      createPool(client, poolName, new String[] {host0, host0}, new int[] {port1, thePort2});
      
--    // Create client.
--    //createClient(client, new int[] {port1, thePort2}, host0, "-1");
--
      // Create CQ on region with GLOBAL SCOPE.
      createCQ(client, poolName, "testForSupportedRegionAttributes_0", cqs[0]);
      executeCQ(client, "testForSupportedRegionAttributes_0", false, null);
@@@ -3212,6 -3159,6 +3121,7 @@@
         " The CQ supported scopes are DISTRIBUTED_ACK and GLOBAL.";
      final String expectedErr = "Cq not registered on primary";
      client.invoke(new CacheSerializableRunnable("Set expect") {
++      @Override
        public void run2() {
          getCache().getLogger().info("<ExpectedException action=add>"
              + expectedErr + "</ExpectedException>");
@@@ -3221,10 -3168,10 +3131,11 @@@
      try {
        executeCQ(client, "testForSupportedRegionAttributes_1", false, "CqException");
        fail("The test should have failed with exception, " + errMsg);
--    } catch (Exception ex){
++    } catch (Exception expected){
        // Expected.
      } finally {
        client.invoke(new CacheSerializableRunnable("Remove expect") {
++        @Override
          public void run2() {
            getCache().getLogger().info("<ExpectedException action=remove>"
                + expectedErr + "</ExpectedException>");
@@@ -3236,16 -3183,16 +3147,12 @@@
      closeClient(client);
      closeServer(server1);
      closeServer(server2);
--
    }
--    
--  
--  // HELPER METHODS....
--  
--  /* For debug purpose - Compares entries in the region */
++
++  /** For debug purpose - Compares entries in the region */
    private void validateServerClientRegionEntries(VM server, VM client, final String regionName) {
--    
      server.invoke(new CacheSerializableRunnable("Server Region Entries") {
++      @Override
        public void run2() throws CacheException {
          Region region = getRootRegion().getSubregion(regionName);
          LogWriterUtils.getLogWriter().info("### Entries in Server :" + region.keys().size());
@@@ -3253,6 -3200,6 +3160,7 @@@
      });
      
      client.invoke(new CacheSerializableRunnable("Client Region Entries") {
++      @Override
        public void run2() throws CacheException {
          Region region = getRootRegion().getSubregion(regionName);
          LogWriterUtils.getLogWriter().info("### Entries in Client :" + region.keys().size()); 
@@@ -3337,6 -3284,6 +3245,4 @@@
      factory.setScope(Scope.LOCAL);
      return factory.createRegionAttributes();
    }
--  
--  
  }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd38e10f/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStateDUnitTest.java
----------------------------------------------------------------------
diff --cc geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStateDUnitTest.java
index 2d50bf0,b69d9ae..6ba1e14
--- a/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStateDUnitTest.java
+++ b/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStateDUnitTest.java
@@@ -16,14 -16,6 +16,15 @@@
   */
  package com.gemstone.gemfire.cache.query.cq.dunit;
  
++import static com.gemstone.gemfire.distributed.DistributedSystemConfigProperties.*;
 +import static org.junit.Assert.*;
 +
 +import java.util.Properties;
 +
 +import org.junit.Ignore;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
  import com.gemstone.gemfire.cache.query.CqQuery;
  import com.gemstone.gemfire.cache.query.dunit.CloseCacheAuthorization;
  import com.gemstone.gemfire.cache.query.dunit.HelperTestCase;
@@@ -104,7 -97,7 +105,6 @@@ public class CqStateDUnitTest extends H
          CqQuery cq = getCache().getQueryService().getCqs()[0];
          return cq.getState().isRunning();
        }
--      
      });
      
      assertTrue("Cq was not running on server" , isRunning);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd38e10f/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java
----------------------------------------------------------------------
diff --cc geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java
index 89211a6,acede33..378986a
--- a/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java
+++ b/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java
@@@ -16,65 -16,37 +16,58 @@@
   */
  package com.gemstone.gemfire.cache.query.cq.dunit;
  
- import org.junit.experimental.categories.Category;
- import org.junit.Test;
- 
- import static org.junit.Assert.*;
- 
- import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
- import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
- import com.gemstone.gemfire.test.junit.categories.DistributedTest;
- 
- import hydra.Log;
 -import com.gemstone.gemfire.cache.*;
++import static com.gemstone.gemfire.distributed.DistributedSystemConfigProperties.*;
++import static com.gemstone.gemfire.test.dunit.Assert.*;
++import static com.gemstone.gemfire.test.dunit.LogWriterUtils.*;
 +
 +import java.io.IOException;
 +import java.util.HashSet;
 +
++import org.junit.Test;
++import org.junit.experimental.categories.Category;
++
 +import com.gemstone.gemfire.cache.AttributesFactory;
 +import com.gemstone.gemfire.cache.Cache;
 +import com.gemstone.gemfire.cache.CacheException;
 +import com.gemstone.gemfire.cache.CacheFactory;
 +import com.gemstone.gemfire.cache.PartitionAttributes;
 +import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 +import com.gemstone.gemfire.cache.Region;
 +import com.gemstone.gemfire.cache.RegionAttributes;
 +import com.gemstone.gemfire.cache.Scope;
  import com.gemstone.gemfire.cache.client.ClientCache;
  import com.gemstone.gemfire.cache.client.ClientCacheFactory;
  import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
 -import com.gemstone.gemfire.cache.query.*;
 +import com.gemstone.gemfire.cache.query.CqAttributes;
 +import com.gemstone.gemfire.cache.query.CqAttributesFactory;
 +import com.gemstone.gemfire.cache.query.CqListener;
 +import com.gemstone.gemfire.cache.query.CqQuery;
 +import com.gemstone.gemfire.cache.query.QueryService;
 +import com.gemstone.gemfire.cache.query.SelectResults;
 +import com.gemstone.gemfire.cache.query.Struct;
  import com.gemstone.gemfire.cache.query.data.Portfolio;
  import com.gemstone.gemfire.cache.server.CacheServer;
- import com.gemstone.gemfire.cache30.ClientServerTestCase;
  import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
--import com.gemstone.gemfire.cache30.CacheTestCase;
+ import com.gemstone.gemfire.cache30.ClientServerTestCase;
  import com.gemstone.gemfire.internal.cache.LocalRegion;
 -import com.gemstone.gemfire.test.dunit.*;
 -import hydra.Log;
 -
 -import java.io.IOException;
 -import java.util.HashSet;
 -
 -import static com.gemstone.gemfire.distributed.DistributedSystemConfigProperties.*;
 +import com.gemstone.gemfire.test.dunit.Assert;
 +import com.gemstone.gemfire.test.dunit.Host;
 +import com.gemstone.gemfire.test.dunit.LogWriterUtils;
 +import com.gemstone.gemfire.test.dunit.NetworkUtils;
 +import com.gemstone.gemfire.test.dunit.SerializableRunnable;
 +import com.gemstone.gemfire.test.dunit.VM;
 +import com.gemstone.gemfire.test.dunit.Wait;
++import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
++import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+ 
  /**
   * Test class for Partitioned Region and CQs
   * 
-  * @since 5.5
+  * @since GemFire 5.5
   */
 -public class PartitionedRegionCqQueryDUnitTest extends CacheTestCase {
 +@Category(DistributedTest.class)
 +public class PartitionedRegionCqQueryDUnitTest extends JUnit4CacheTestCase {
  
--  
-   public PartitionedRegionCqQueryDUnitTest() {
-     super();
 -  public PartitionedRegionCqQueryDUnitTest(String name) {
 -    super(name);
--  }
--  
    static public final String[] regions = new String[] {
        "regionA",
        "regionB"
@@@ -119,7 -91,7 +112,6 @@@
    public final String[] cqsWithoutRoot = new String [] {
        //0 - Test for ">" 
        "SELECT ALL * FROM /" + regions[0] + " p where p.ID > 0"
--      
    };
    
    private static int bridgeServerPort;
@@@ -135,8 -106,8 +127,7 @@@
      createServer(server1);       
      createServer(server2);    
         
--    
--    // create client 
++    // create client
      
      final int port = server1.invoke(() -> PartitionedRegionCqQueryDUnitTest.getCacheServerPort());
      final String host0 = NetworkUtils.getServerHostName(server1.getHost());
@@@ -166,8 -137,8 +157,7 @@@
          /* queryUpdates: */ 0,
          /* queryDeletes: */ 0,
          /* totalEvents: */ size);
--    
--    
++
      int cc1 = server1.invoke(() -> PartitionedRegionCqQueryDUnitTest.getCqCountFromRegionProfile());
      int cc2 = server2.invoke(() -> PartitionedRegionCqQueryDUnitTest.getCqCountFromRegionProfile());
      assertEquals("Should have one", 1, cc1);
@@@ -248,7 -218,7 +238,6 @@@
          /* queryDeletes: */ 0,
          /* totalEvents: */ (size+size));
      
--    
      // destroy all the values.
      int numDestroys = size;
      cqHelper.deleteValues(server2,regions[0], numDestroys);
@@@ -257,7 -227,7 +246,6 @@@
      
      // validate cqs after destroyes on server2.
   
--        
      cqHelper.validateCQ(client, "testCQEvents_0",
          /* resultSize: */ CqQueryDUnitTest.noTest,
          /* creates: */ size,
@@@ -300,9 -270,8 +288,9 @@@
    /**
     * test for registering cqs on a bridge server with local max memory zero.
     */
 +  @Test
    public void testPartitionedCqOnAccessorBridgeServer() throws Exception {
-- // creating servers.
++    // creating servers.
      final Host host = Host.getHost(0);
      VM server1 = host.getVM(0);
      VM server2 = host.getVM(1);
@@@ -402,7 -370,7 +390,6 @@@
    public void testPartitionedCqOnSingleBridgeServer() throws Exception { 
      final Host host = Host.getHost(0);
      VM server1 = host.getVM(0);
--//    VM server2 = host.getVM(1);
      VM client = host.getVM(2);
      
      // creating an accessor vm with Bridge Server installed.
@@@ -467,9 -435,9 +454,8 @@@
        cqHelper.waitForDestroyed(client, "testCQEvents_0", KEY+i);
      }
      
--    // validate cqs after destroyes on server2.
-- 
--        
++    // validate cqs after destroys on server2.
++
      cqHelper.validateCQ(client, "testCQEvents_0",
          /* resultSize: */ CqQueryDUnitTest.noTest,
          /* creates: */ size,
@@@ -482,7 -450,7 +468,6 @@@
      
      cqHelper.closeClient(client);
      cqHelper.closeServer(server1);
--    
    }
    
    /**
@@@ -537,8 -504,8 +522,6 @@@
          /* queryDeletes: */ 0,
          /* totalEvents: */ size);
      
--    //size = 2;
--    
      // do updates
      createValues(server1, regions[0], size);
      
@@@ -556,8 -523,8 +539,7 @@@
          /* queryUpdates: */ size,
          /* queryDeletes: */ 0,
          /* totalEvents: */ (size+size));
--    
--    
++
      // destroy all the values.
      int numDestroys = size;
      cqHelper.deleteValues(server1,regions[0], numDestroys);
@@@ -566,9 -533,9 +548,8 @@@
        cqHelper.waitForDestroyed(client, "testCQEvents_0", KEY+i);
      }
      
--    // validate cqs after destroyes on server2.
-- 
--        
++    // validate cqs after destroys on server2.
++
      cqHelper.validateCQ(client, "testCQEvents_0",
          /* resultSize: */ CqQueryDUnitTest.noTest,
          /* creates: */ size,
@@@ -601,7 -567,7 +582,6 @@@
      // create another server with data store.
      createServer(server2);
      
--    
      final int port = server1.invoke(() -> PartitionedRegionCqQueryDUnitTest.getCacheServerPort());
      final String host0 = NetworkUtils.getServerHostName(server1.getHost());
      
@@@ -633,8 -599,8 +613,6 @@@
          /* queryDeletes: */ 0,
          /* totalEvents: */ size);
      
--    //size = 2;
--    
      // do updates
      createValues(server1, regions[0], size);
      
@@@ -681,11 -647,10 +659,9 @@@
    
    /**
     * test cqs with invalidates on bridge server not hosting datastores.
--   * 
     */
 +  @Test
    public void testPRCqWithInvalidatesOnAccessorBridgeServer() throws Exception {
--    
      final Host host = Host.getHost(0);
      VM server1 = host.getVM(0);
      VM server2 = host.getVM(1);
@@@ -697,8 -662,8 +673,7 @@@
       
      // create another server with data store.
      createServer(server2);
--    
--    
++
      final int port = server1.invoke(() -> PartitionedRegionCqQueryDUnitTest.getCacheServerPort());
      final String host0 = NetworkUtils.getServerHostName(server1.getHost());
      
@@@ -730,8 -695,8 +705,6 @@@
          /* queryDeletes: */ 0,
          /* totalEvents: */ size);
      
--    //size = 2;
--    
      // do updates
      createValues(server1, regions[0], size);
      
@@@ -749,8 -714,8 +722,7 @@@
          /* queryUpdates: */ size,
          /* queryDeletes: */ 0,
          /* totalEvents: */ (size+size));
--    
--    
++
      // invalidate all the values.
      int numInvalidates = size;
      cqHelper.invalidateValues(server1,regions[0], numInvalidates);
@@@ -779,9 -744,8 +751,8 @@@
     * test cqs with create updates and destroys from client on bridge server
     * hosting datastores.
     */
 +  @Test
    public void testPRCqWithUpdatesFromClients() throws Exception {
--    
      final Host host = Host.getHost(0);
      VM server1 = host.getVM(0);
      VM server2 = host.getVM(1);
@@@ -795,7 -759,7 +766,6 @@@
      // create another server with data store.
      createServer(server2, false , 1);
      
--    
      final int port = server1.invoke(() -> PartitionedRegionCqQueryDUnitTest.getCacheServerPort());
      final String host0 = NetworkUtils.getServerHostName(server1.getHost());
      
@@@ -828,8 -792,8 +798,6 @@@
          /* queryDeletes: */ 0,
          /* totalEvents: */ size);
      
--    //size = 2;
--    
      // do updates
      createValues(client2, regions[0], size);
      
@@@ -848,7 -812,7 +816,6 @@@
          /* queryDeletes: */ 0,
          /* totalEvents: */ (size+size));
      
--    
      // invalidate all the values.
      int numDelets = size;
      
@@@ -878,11 -842,10 +845,9 @@@
    
    /**
     * test cqs on multiple partitioned region hosted by bridge servers.
--   * 
     */
 +  @Test
    public void testPRCqWithMultipleRegionsOnServer() throws Exception {
--    
      final Host host = Host.getHost(0);
      VM server1 = host.getVM(0);
      VM server2 = host.getVM(1);
@@@ -943,9 -906,9 +908,6 @@@
          /* queryDeletes: */ 0,
          /* totalEvents: */ size);
      
--    
--    //size = 2;
--    
      // do updates
      createValues(client2, regions[0], size);
      createValues(client2, regions[1], size);
@@@ -983,13 -946,13 +945,11 @@@
      cqHelper.deleteValues(client2,regions[0], numInvalidates);
      cqHelper.deleteValues(client2,regions[1], numInvalidates);
      
--
      for (int i=1; i <= numInvalidates; i++){
        cqHelper.waitForDestroyed(client, "testCQEvents_0", KEY+i);
        cqHelper.waitForDestroyed(client, "testCQEvents_1", KEY+i);
      }
--    
--    
++
      // validate cqs after invalidates on server2.
           
      cqHelper.validateCQ(client, "testCQEvents_0",
@@@ -1001,8 -964,8 +961,7 @@@
          /* queryUpdates: */ size,
          /* queryDeletes: */ numInvalidates,
          /* totalEvents: */ (size+size+numInvalidates));
--    
--    
++
      cqHelper.validateCQ(client, "testCQEvents_1",
          /* resultSize: */ CqQueryDUnitTest.noTest,
          /* creates: */ size,
@@@ -1022,11 -985,10 +981,9 @@@
    /**
     * tests multiple cqs on partitioned region on bridge servers with profile update 
     * for not requiring old values.
--   * 
     */
 +  @Test
    public void testPRWithCQsAndProfileUpdates() throws Exception {
--    
      final Host host = Host.getHost(0);
      VM server1 = host.getVM(0);
      VM server2 = host.getVM(1);
@@@ -1085,9 -1047,9 +1042,6 @@@
          /* queryDeletes: */ 0,
          /* totalEvents: */ size);
      
--    
--    //size = 2;
--    
      // do updates
      createValues(client2, regions[0], size);
      createValues(client2, regions[1], size);
@@@ -1128,8 -1090,8 +1082,7 @@@
        cqHelper.waitForDestroyed(client,"testPRWithCQsAndProfileUpdates_0",KEY+i);
        cqHelper.waitForDestroyed(client,"testPRWithCQsAndProfileUpdates_1",KEY+i);
      }
--    
--    
++
      // validate cqs after invalidates on server2.
           
      cqHelper.validateCQ(client, "testPRWithCQsAndProfileUpdates_0",
@@@ -1141,8 -1103,8 +1094,7 @@@
          /* queryUpdates: */ size,
          /* queryDeletes: */ numInvalidates,
          /* totalEvents: */ (size+size+numInvalidates));
--    
--    
++
      cqHelper.validateCQ(client, "testPRWithCQsAndProfileUpdates_1",
          /* resultSize: */ CqQueryDUnitTest.noTest,
          /* creates: */ size,
@@@ -1153,86 -1115,86 +1105,9 @@@
          /* queryDeletes: */ numInvalidates,
          /* totalEvents: */ (size+size+numInvalidates));
      
--    // check for requries old value set.
--    /*
--    server1.invoke(new CacheSerializableRunnable("Check requires old value") {
--      public void run2()
--      {
--        Cache cc = getCache();
--        PartitionedRegion region1 = (PartitionedRegion)cc
--            .getRegion("/root/regionA");
--        Set requiresOldValue = region1.getRegionAdvisor()
--            .adviseRequiresOldValue();
--        getLogWriter().info(
--            "ize of requires old value at server1 before closing cqs : "
--                + requiresOldValue.size());
--        assertTrue("The size of requiresOldValue shoule be zero on server1",
--            (0 == requiresOldValue.size()));
--      }
--    });
--
--    server2.invoke(new CacheSerializableRunnable("Check requires old value") {
--      public void run2()
--      {
--        Cache cc = getCache();
--        PartitionedRegion region1 = (PartitionedRegion)cc
--            .getRegion("/root/regionA");
--        Set requiresOldValue = region1.getRegionAdvisor()
--            .adviseRequiresOldValue();
--        getLogWriter().info(
--            "size of requires old value at server2 before closing cqs :"
--                + requiresOldValue.size());
--        assertTrue("The size of requiresOldValue should be one on server2 ",
--            (1 == requiresOldValue.size()));
--      }
--    });
--    */
--    
      cqHelper.closeCQ(client, "testPRWithCQsAndProfileUpdates_0");
      cqHelper.closeCQ(client, "testPRWithCQsAndProfileUpdates_1");
      
--    
--    // check for requires old value set after closing all the cqs.
--    /*
--    REQUIRES OLD VALUES requirement is removed in the eventFilterOpt_dev_Jun09 
--    branch. The old values are no more sent to the peer, instead CQs are processed
--    at the source (where change happens). Replace requiresOldValue test with 
--    appropriate test.
--    
--    server1.invoke(new CacheSerializableRunnable("Check requires old value") {
--      public void run2()
--      {
--        Cache cc = getCache();
--        PartitionedRegion region1 = (PartitionedRegion)cc
--            .getRegion("/root/regionA");
--        Set requiresOldValue = region1.getRegionAdvisor()
--            .adviseRequiresOldValue();
--        getLogWriter().info(
--            "size of requires old value set at the end server1 : "
--                + requiresOldValue.size());
--        assertTrue("The size of requiresOldValue shoule be zero on server1",
--            (0 == requiresOldValue.size()));
--      }
--    });
--    
--    
--    server2.invoke(new CacheSerializableRunnable("Check requires old value") {
--      public void run2()
--      {
--        Cache cc = getCache();
--        PartitionedRegion region1 = (PartitionedRegion)cc
--            .getRegion("/root/regionA");
--        Set requiresOldValue = region1.getRegionAdvisor()
--            .adviseRequiresOldValue();
--        getLogWriter().info(
--            " size of requires old value set at the end server2 : "
--                + requiresOldValue.size());
--        assertTrue(
--            "The size of requiresOldValue shoule be zero on server2 as well after closing all the cqs",
--            (0 == requiresOldValue.size()));
--      }
--    });
--    */
      cqHelper.closeClient(client);
      cqHelper.closeClient(client2);
      cqHelper.closeServer(server2);
@@@ -1245,9 -1207,8 +1120,8 @@@
     * there may be possibility that the region changes during
     * that time may not be reflected in the query result set
     * thus making the query data and region data inconsistent.
--   * @throws Exception
     */
 +  @Test
    public void testEventsDuringQueryExecution() throws Exception {
      final Host host = Host.getHost(0);
      VM server1 = host.getVM(0);
@@@ -1310,14 -1271,14 +1184,9 @@@
          try {
            cqResults = cq1.executeWithInitialResults();
          } catch (Exception ex){
--          AssertionError err = new AssertionError("Failed to execute  CQ " + cqName);
--          err.initCause(ex);
--          throw err;
++          fail("Failed to execute  CQ " + cqName, ex);
          }
          
--        //getLogWriter().info("initial result size = " + cqResults.size());
--        
--        
          CqQueryTestListener cqListener = (CqQueryTestListener)cq1.getCqAttributes().getCqListener();
          // Wait for the last key to arrive.
          for (int i=0; i < 4; i++) {
@@@ -1351,13 -1312,13 +1220,6 @@@
            ids.add(s.get("key"));
          }
  
--        //Iterator iter = cqResults.asSet().iterator();
--        //while (iter.hasNext()) {
--        //  Portfolio p = (Portfolio)iter.next();
--        //  ids.add(p.getPk());
--        //  //getLogWriter().info("Result set value : " + p.getPk());
--        //}
--        
          HashSet missingIds = new HashSet();
          String key = "";
          for (int i = 1; i <= totalObjects; i++) {
@@@ -1372,8 -1333,8 +1234,7 @@@
                " Missing keys : [size : " + missingIds.size() + "]" + missingIds +
                " Ids in ResultSet and CQ Events :" + ids);
          }
--        
--      } 
++      }
      });
      
      cqHelper.closeClient(client);
@@@ -1381,9 -1342,8 +1242,7 @@@
      cqHelper.closeServer(server1);    
    }
  
--  
--  
 +  @Test
    public void testDestroyRegionEventOnClientsWithCQRegistered() throws Exception{
      final Host host = Host.getHost(0);
      VM server = host.getVM(0);
@@@ -1431,7 -1391,7 +1290,6 @@@
          assertNotNull(cqListener);
          
          cqListener.waitForTotalEvents(numObjects + 1 /*Destroy region event*/);
--                
        }
      });
      
@@@ -1458,7 -1418,7 +1316,7 @@@
          if (localRegion != null) {
  
            // REGION NULL
--          Log.getLogWriter().info("Local region is NOT null in client 1");
++          getLogWriter().info("Local region is NOT null in client 1");
            
            Wait.pause(5*1000);
            CqQuery[] cqs = getCache().getQueryService().getCqs();
@@@ -1471,7 -1431,7 +1329,6 @@@
                "Region is still available on client1 even after performing destroyRegion from client2 on server."
                    + "Client1 must have received destroyRegion message from server with CQ parts in it.",
                getCache().getRegion("/" + regions[0]));
--
          }
        }
      });
@@@ -1479,8 -1439,8 +1336,6 @@@
      cqHelper.closeServer(server);
    }
    
--  // helper methods.
--  
    /**
     * create bridge server with default attributes for partitioned region.
     */
@@@ -1521,13 -1481,13 +1376,7 @@@
        public void run2() throws CacheException
        {
            LogWriterUtils.getLogWriter().info("### Create Cache Server. ###");
--          //AttributesFactory factory = new AttributesFactory();
--          //factory.setScope(Scope.DISTRIBUTED_ACK);
--          //factory.setMirrorType(MirrorType.KEYS_VALUES);
--          
--          //int maxMem = 0;
            AttributesFactory attr = new AttributesFactory();
--          //attr.setValueConstraint(valueConstraint);
            PartitionAttributesFactory paf = new PartitionAttributesFactory();
            if (isAccessor){
              paf.setLocalMaxMemory(0);
@@@ -1541,15 -1501,15 +1390,12 @@@
              Region r = createRegion(regions[i], attr.create());
              LogWriterUtils.getLogWriter().info("Server created the region: "+r);
            }
--//          pause(2000);
            try {
              startBridgeServer(port, true);
            }
            catch (Exception ex) {
              Assert.fail("While starting CacheServer", ex);
            }
--//          pause(2000);
--       
        }
      };
  
@@@ -1570,13 -1530,13 +1416,7 @@@
        public void run2() throws CacheException
        {
            LogWriterUtils.getLogWriter().info("### Create Cache Server. ###");
--          //AttributesFactory factory = new AttributesFactory();
--          //factory.setScope(Scope.DISTRIBUTED_ACK);
--          //factory.setMirrorType(MirrorType.KEYS_VALUES);
--          
--          //int maxMem = 0;
            AttributesFactory attr = new AttributesFactory();
--          //attr.setValueConstraint(valueConstraint);
            PartitionAttributesFactory paf = new PartitionAttributesFactory();
            if (isAccessor){
              paf.setLocalMaxMemory(0);
@@@ -1590,14 -1550,14 +1430,12 @@@
              Region r = createRegionWithoutRoot(regions[i], attr.create());
              LogWriterUtils.getLogWriter().info("Server created the region: "+r);
            }
--//          pause(2000);
            try {
              startBridgeServer(port, true);
            }
            catch (Exception ex) {
              Assert.fail("While starting CacheServer", ex);
            }
--//          pause(2000);
        }
  
        private Region createRegionWithoutRoot(String regionName,
@@@ -1609,6 -1569,6 +1447,7 @@@
  
      server.invoke(createServer);
    }
++
    /**
     * Starts a bridge server on the given port, using the given
     * deserializeValues and notifyBySubscription to serve up the
@@@ -1618,7 -1578,7 +1457,6 @@@
     */
    protected void startBridgeServer(int port, boolean notifyBySubscription)
    throws IOException {
--    
      Cache cache = getCache();
      CacheServer bridge = cache.addCacheServer();
      bridge.setPort(port);
@@@ -1632,8 -1592,8 +1470,7 @@@
      int[] serverPorts = new int[] {serverPort};
      createClient(client, serverPorts, serverHost, null); 
    }
--  
--  
++
    /* Create Client */
    public void createClient(VM client, final int[] serverPorts, final String serverHost, final String redundancyLevel) {
      SerializableRunnable createQService =
@@@ -1648,8 -1608,8 +1485,7 @@@
          try {
            getCache().getQueryService();
          } catch (Exception cqe) {
--          cqe.printStackTrace();
--          fail("Failed to getCQService.");
++          fail("Failed to getCQService.", cqe);
          }
          
          AttributesFactory regionFactory = new AttributesFactory();
@@@ -1664,7 -1624,7 +1500,6 @@@
          for (int i=0; i < regions.length; i++) {        
            Region clientRegion = createRegion(regions[i], regionFactory.createRegionAttributes());
            LogWriterUtils.getLogWriter().info("### Successfully Created Region on Client :" + clientRegion);
--          //region1.getAttributesMutator().setCacheListener(new CqListener());
          }
        }
      };
@@@ -1675,18 -1635,18 +1510,14 @@@
    public void createCQ(VM vm, final String cqName, final String queryStr) {
      vm.invoke(new CacheSerializableRunnable("Create CQ :" + cqName) {
        public void run2() throws CacheException {
--        //pause(60 * 1000);
--        //getLogWriter().info("### DEBUG CREATE CQ START ####");
--        //pause(20 * 1000);
--        
++
          LogWriterUtils.getLogWriter().info("### Create CQ. ###" + cqName);
          // Get CQ Service.
          QueryService cqService = null;
          try {
            cqService = getCache().getQueryService();
          } catch (Exception cqe) {
--          cqe.printStackTrace();
--          fail("Failed to getCQService.");
++          fail("Failed to getCQService.", cqe);
          }
          // Create CQ Attributes.
          CqAttributesFactory cqf = new CqAttributesFactory();
@@@ -1702,16 -1662,16 +1533,13 @@@
            assertTrue("newCq() state mismatch", cq1.getState().isStopped());
            LogWriterUtils.getLogWriter().info("Created a new CqQuery : "+cq1);
          } catch (Exception ex){
--          AssertionError err = new AssertionError("Failed to create CQ " + cqName + " . ");
--          err.initCause(ex);
--          LogWriterUtils.getLogWriter().info("CqService is :" + cqService, err);
--          throw err;
++          fail("Failed to create CQ " + cqName, ex);
          }
        }
      });   
    } 
    
--  /* Returs Cache Server Port */
++  /** Return Cache Server Port */
    protected static int getCacheServerPort() {
      return bridgeServerPort;
    }
@@@ -1729,9 -1689,9 +1557,7 @@@
    }
    
    private static int getCqCountFromRegionProfile() {
--    
      LocalRegion region1 = (LocalRegion)CacheFactory.getAnyInstance().getRegion("/root/regionA");
--    
      return region1.getFilterProfile().getCqCount();
    }
    
@@@ -1761,8 -1721,8 +1587,7 @@@
        }
      });
    }
--  
--  
++
    public void createCacheClient(VM client, final int serverPort, final String serverHost){
      createCacheClient(client, new String[]{serverHost}, new int[]{serverPort}, null);
    }
@@@ -1786,8 -1746,8 +1611,7 @@@
          try {
            getCache().getQueryService();
          } catch (Exception cqe) {
--          cqe.printStackTrace();
--          fail("Failed to getCQService.");
++          fail("Failed to getCQService.", cqe);
          }
          
          AttributesFactory regionFactory = new AttributesFactory();
@@@ -1803,7 -1763,7 +1627,6 @@@
            Region clientRegion = ((ClientCache)getCache()).createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
            .create(regions[i]);
            LogWriterUtils.getLogWriter().info("### Successfully Created Region on Client :" + clientRegion);
--          //region1.getAttributesMutator().setCacheListener(new CqListener());
          }
        }
      });   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd38e10f/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PartitionedRegionCqQueryOptimizedExecuteDUnitTest.java
----------------------------------------------------------------------
diff --cc geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PartitionedRegionCqQueryOptimizedExecuteDUnitTest.java
index 9d43ef6,7b25430..730aec9
--- a/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PartitionedRegionCqQueryOptimizedExecuteDUnitTest.java
+++ b/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PartitionedRegionCqQueryOptimizedExecuteDUnitTest.java
@@@ -31,19 -22,14 +31,15 @@@ import com.gemstone.gemfire.cache.query
  import com.gemstone.gemfire.cache.query.internal.cq.CqServiceImpl;
  import com.gemstone.gemfire.cache.query.internal.cq.CqServiceProvider;
  import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+ import com.gemstone.gemfire.distributed.internal.DistributionConfig;
  import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
- import com.gemstone.gemfire.test.dunit.Host;
- import com.gemstone.gemfire.test.dunit.Invoke;
- import com.gemstone.gemfire.test.dunit.LogWriterUtils;
- import com.gemstone.gemfire.test.dunit.NetworkUtils;
- import com.gemstone.gemfire.test.dunit.SerializableRunnable;
- import com.gemstone.gemfire.test.dunit.VM;
+ import com.gemstone.gemfire.test.dunit.*;
  
 +@Category(DistributedTest.class)
  public class PartitionedRegionCqQueryOptimizedExecuteDUnitTest extends PartitionedRegionCqQueryDUnitTest{
  
 -  public PartitionedRegionCqQueryOptimizedExecuteDUnitTest(String name) {
 -    super(name);
 +  public PartitionedRegionCqQueryOptimizedExecuteDUnitTest() {
 +    super();
    }
  
    @Override

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd38e10f/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PrCqUsingPoolDUnitTest.java
----------------------------------------------------------------------
diff --cc geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PrCqUsingPoolDUnitTest.java
index b615156,ffdd734..49fbfac
--- a/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PrCqUsingPoolDUnitTest.java
+++ b/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PrCqUsingPoolDUnitTest.java
@@@ -65,13 -56,12 +65,13 @@@ import com.gemstone.gemfire.test.junit.
  /**
   * Test class for Partitioned Region and CQs
   * 
-  * @since 5.5
+  * @since GemFire 5.5
   */
 -public class PrCqUsingPoolDUnitTest extends CacheTestCase {
 +@Category(DistributedTest.class)
 +public class PrCqUsingPoolDUnitTest extends JUnit4CacheTestCase {
  
 -  public PrCqUsingPoolDUnitTest(String name) {
 -    super(name);
 +  public PrCqUsingPoolDUnitTest() {
 +    super();
    }
    
    static public final String[] regions = new String[] {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd38e10f/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/dunit/PdxQueryCQTestBase.java
----------------------------------------------------------------------
diff --cc geode-cq/src/test/java/com/gemstone/gemfire/cache/query/dunit/PdxQueryCQTestBase.java
index b80dde5,ab4c203..06f1383
--- a/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/dunit/PdxQueryCQTestBase.java
+++ b/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/dunit/PdxQueryCQTestBase.java
@@@ -16,43 -16,16 +16,36 @@@
   */
  package com.gemstone.gemfire.cache.query.dunit;
  
- import org.junit.experimental.categories.Category;
- import org.junit.Test;
- 
++import static com.gemstone.gemfire.distributed.DistributedSystemConfigProperties.*;
 +import static org.junit.Assert.*;
 +
- import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
- import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
- import com.gemstone.gemfire.test.junit.categories.DistributedTest;
- 
 +import java.io.IOException;
 +import java.util.Comparator;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.Map;
 +import java.util.Properties;
 +
  import com.gemstone.gemfire.LogWriter;
 -import com.gemstone.gemfire.cache.*;
 +import com.gemstone.gemfire.cache.AttributesFactory;
 +import com.gemstone.gemfire.cache.Cache;
 +import com.gemstone.gemfire.cache.CacheException;
 +import com.gemstone.gemfire.cache.CacheFactory;
 +import com.gemstone.gemfire.cache.DataPolicy;
 +import com.gemstone.gemfire.cache.PartitionAttributes;
 +import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 +import com.gemstone.gemfire.cache.Scope;
  import com.gemstone.gemfire.cache.client.PoolFactory;
  import com.gemstone.gemfire.cache.client.PoolManager;
 -import com.gemstone.gemfire.cache.query.*;
 +import com.gemstone.gemfire.cache.query.CacheUtils;
 +import com.gemstone.gemfire.cache.query.Query;
 +import com.gemstone.gemfire.cache.query.QueryService;
 +import com.gemstone.gemfire.cache.query.SelectResults;
 +import com.gemstone.gemfire.cache.query.Struct;
  import com.gemstone.gemfire.cache.query.data.PortfolioPdx;
  import com.gemstone.gemfire.cache.query.data.PositionPdx;
  import com.gemstone.gemfire.cache.server.CacheServer;
  import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
--import com.gemstone.gemfire.cache30.CacheTestCase;
  import com.gemstone.gemfire.compression.Compressor;
  import com.gemstone.gemfire.compression.SnappyCompressor;
  import com.gemstone.gemfire.i18n.LogWriterI18n;
@@@ -64,8 -37,14 +57,9 @@@ import com.gemstone.gemfire.test.dunit.
  import com.gemstone.gemfire.test.dunit.Host;
  import com.gemstone.gemfire.test.dunit.SerializableRunnable;
  import com.gemstone.gemfire.test.dunit.VM;
++import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
  
 -import java.io.IOException;
 -import java.util.*;
 -
 -import static com.gemstone.gemfire.distributed.DistributedSystemConfigProperties.LOCATORS;
 -import static com.gemstone.gemfire.distributed.DistributedSystemConfigProperties.MCAST_PORT;
 -
 -public abstract class PdxQueryCQTestBase extends CacheTestCase {
 +public abstract class PdxQueryCQTestBase extends JUnit4CacheTestCase {
  
    /** The port on which the bridge server was started in this VM */
    private static int bridgeServerPort;
@@@ -114,8 -93,8 +108,8 @@@
  
    public void createPool(VM vm, String poolName, String server, int port,
        boolean subscriptionEnabled) {
--        createPool(vm, poolName, new String[]{server}, new int[]{port}, subscriptionEnabled);  
--      }
++      createPool(vm, poolName, new String[]{server}, new int[]{port}, subscriptionEnabled);
++    }
  
    public void createPool(VM vm, String poolName, String server, int port) {
      createPool(vm, poolName, new String[]{server}, new int[]{port}, false);  

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd38e10f/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/dunit/QueryIndexUpdateRIDUnitTest.java
----------------------------------------------------------------------