You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by rv...@apache.org on 2016/01/20 03:22:53 UTC

[47/51] [partial] incubator-geode git commit: WAN and CQ code drop under the Pivotal SGA

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/CQJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/CQJUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/CQJUnitTest.java
new file mode 100644
index 0000000..66b23c2
--- /dev/null
+++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/CQJUnitTest.java
@@ -0,0 +1,147 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+
+//
+//  CQJUnitTest.java
+//
+//  Created by Eric Zoerner on 6/28/07.
+
+package com.gemstone.gemfire.cache.query.cq;
+
+import java.util.Properties;
+
+import org.junit.experimental.categories.Category;
+
+import junit.framework.TestCase;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.query.CacheUtils;
+import com.gemstone.gemfire.cache.query.CqAttributes;
+import com.gemstone.gemfire.cache.query.CqAttributesFactory;
+import com.gemstone.gemfire.cache.query.QueryInvalidException;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class CQJUnitTest extends TestCase {
+  private DistributedSystem ds;
+  private Cache cache;
+  private QueryService qs;
+  
+  /////////////////////////////////////
+  // Methods for setUp and tearDown
+  /////////////////////////////////////
+  
+  public CQJUnitTest(String name) {
+    super(name);
+  }
+  
+  public void setUp() throws Exception {
+    Properties props = new Properties();
+    props.setProperty("mcast-port", "0");
+    props.setProperty("log-level", "config");
+    this.ds = DistributedSystem.connect(props);
+    this.cache = CacheFactory.create(ds);
+    this.qs = cache.getQueryService();
+  }
+  
+  public void tearDown() throws Exception {
+    this.cache.close();
+    this.ds.disconnect();
+  }
+
+  /////////////////////////////////////
+  // Test Methods
+  /////////////////////////////////////
+  
+  
+  /**
+   * Test to make sure CQs that have invalid syntax
+   * throw QueryInvalidException, and CQs that have unsupported
+   * CQ features throw UnsupportedOperationException
+   */
+  public void testValidateCQ() throws Exception {
+   
+    AttributesFactory attributesFactory = new AttributesFactory();
+    RegionAttributes regionAttributes = attributesFactory.create();
+   //The order by query computes dependency after compilation so the region has to be present
+    // for the query to progress further to throw UnsupportedOperationException
+    cache.createRegion("region",regionAttributes);
+    
+    // default attributes
+    CqAttributes attrs = new CqAttributesFactory().create();
+    
+    // valid CQ
+    this.qs.newCq("SELECT * FROM /region WHERE status = 'active'",
+                              attrs);
+    
+    // invalid syntax
+    try {
+      this.qs.newCq("this query is garbage", attrs);
+      fail("should have thrown a QueryInvalidException");
+    }
+    catch (QueryInvalidException e) {
+      // pass
+    }
+    
+    String[] unsupportedCQs = new String[] {
+      // not "just" a select statement
+      "(select * from /region where status = 'active').isEmpty",
+      
+      // cannot be DISTINCT
+      "select DISTINCT * from /region WHERE status = 'active'",
+      
+      // references more than one region
+      "select * from /region1 r1, /region2 r2 where r1 = r2",
+      
+      // where clause refers to a region
+      "select * from /region r where r.val = /region.size",
+      
+      // more than one iterator in FROM clause
+      "select * from /portfolios p1, p1.positions p2 where p2.id = 'IBM'",
+      
+      // first iterator in FROM clause is not just a region path
+      "select * from /region.entries e where e.value.id = 23",
+      
+      // has projections
+      "select id from /region where status = 'active'",
+      
+      // has ORDER BY
+      "select * from /region where status = 'active' ORDER BY id",
+    };
+    
+    for (int i = 0; i < unsupportedCQs.length; i++) {      
+      try {
+        this.qs.newCq(unsupportedCQs[i], attrs);
+        fail("should have thrown UnsupportedOperationException for query #" + i);
+      }
+      catch (UnsupportedOperationException e) {
+        // pass
+      }
+    }
+  }
+  
+  /* would need to make the constructServerSideQuery method package
+   * accessible and move this to the internal package in order
+   * to test that method
+   * 
+  public void testConstructServerSideQuery() throws Exception {
+    // default attributes
+    CqAttributes attrs = new CqAttributesFactory().create();
+    
+    // valid CQ
+    CqQuery cq = this.qs.newCq("SELECT * FROM /region WHERE status = 'active'",
+                              attrs);
+    Query serverSideQuery = ((CqQueryImpl)cq).constructServerSideQuery();
+  }
+  */
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqDataDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqDataDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqDataDUnitTest.java
new file mode 100644
index 0000000..1a1fc77
--- /dev/null
+++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqDataDUnitTest.java
@@ -0,0 +1,1154 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.query.cq.dunit;
+
+import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.EvictionAttributes;
+import com.gemstone.gemfire.cache.MirrorType;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.query.CqQuery;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.Struct;
+import com.gemstone.gemfire.cache.query.data.Portfolio;
+import com.gemstone.gemfire.cache.query.internal.cq.CqQueryImpl;
+import com.gemstone.gemfire.cache30.ClientServerTestCase;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.cache30.CertifiableTestCacheListener;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+
+import dunit.AsyncInvocation;
+import dunit.DistributedTestCase;
+import dunit.Host;
+import dunit.SerializableRunnable;
+import dunit.VM;
+
+/**
+ * This class tests the ContiunousQuery mechanism in GemFire.
+ * This includes the test with different data activities.
+ *
+ * @author anil
+ */
+public class CqDataDUnitTest extends CacheTestCase {
+
+  protected CqQueryDUnitTest cqDUnitTest = new CqQueryDUnitTest("CqDataDUnitTest");
+  
+  public CqDataDUnitTest(String name) {
+    super(name);
+  }
+  
+  public void setUp() throws Exception {
+    super.setUp();
+    
+    // avoid IllegalStateException from HandShake by connecting all vms tor
+    // system before creating ConnectionPools
+    getSystem();
+    invokeInEveryVM(new SerializableRunnable("getSystem") {
+      public void run() {
+        getSystem();
+      }
+    });
+    
+  }
+    
+  /**
+   * Tests with client acting as feeder/publisher and registering cq.
+   * Added wrt bug 37161.
+   * In case of InterestList the events are not sent back to the client
+   * if its the originator, this is not true for cq.
+   * 
+   * @throws Exception
+   */
+  public void testClientWithFeederAndCQ() throws Exception
+  {
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client = host.getVM(1);
+
+    cqDUnitTest.createServer(server);
+
+    final int port = server.invokeInt(CqQueryDUnitTest.class,
+        "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+
+    // Create client.
+    cqDUnitTest.createClient(client, port, host0);
+
+
+    cqDUnitTest.createCQ(client, "testClientWithFeederAndCQ_0", cqDUnitTest.cqs[0]);
+    cqDUnitTest.executeCQ(client, "testClientWithFeederAndCQ_0", false, null);
+
+    final int size = 10;
+    cqDUnitTest.createValues(client, cqDUnitTest.regions[0], size);
+    cqDUnitTest.waitForCreated(client, "testClientWithFeederAndCQ_0", CqQueryDUnitTest.KEY+size);
+
+    cqDUnitTest.validateCQ(client, "testClientWithFeederAndCQ_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ 0,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ 0,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size);
+    
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server);
+    
+  }
+
+  /**
+   * Test for CQ Fail over/HA with redundancy level set.
+   * @throws Exception
+   */
+  public void testCQHAWithState() throws Exception {
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM server3 = host.getVM(2);
+    
+    VM client = host.getVM(3);
+    
+    //Killing servers can cause this message on the client side.
+    addExpectedException("Could not find any server");
+    cqDUnitTest.createServer(server1);
+    
+    final int port1 = server1.invokeInt(CqQueryDUnitTest.class, "getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+    
+    final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(2);
+    
+    cqDUnitTest.createServer(server2, ports[0]);
+    final int port2 = server2.invokeInt(CqQueryDUnitTest.class, "getCacheServerPort");
+            
+    // Create client - With 3 server endpoints and redundancy level set to 2.
+    
+    // Create client with redundancyLevel 1
+    cqDUnitTest.createClient(client, new int[] {port1, port2, ports[1]}, host0, "1");
+    
+    // Create CQs.
+    int numCQs = 1;
+    for (int i=0; i < numCQs; i++) {
+      // Create CQs.
+      cqDUnitTest.createCQ(client, "testCQHAWithState_" + i, cqDUnitTest.cqs[i]);
+      cqDUnitTest.executeCQ(client, "testCQHAWithState_" + i, false, null);
+    }
+    
+    pause(1 * 1000);
+    
+    int size = 10;
+    
+    // CREATE.
+    cqDUnitTest.createValues(server1, cqDUnitTest.regions[0], size);
+    cqDUnitTest.createValues(server1, cqDUnitTest.regions[1], size);
+    
+    for (int i=1; i <= size; i++) {
+      cqDUnitTest.waitForCreated(client, "testCQHAWithState_0", CqQueryDUnitTest.KEY + i);
+    }
+    
+    // Clients expected initial result.
+    int[] resultsCnt = new int[] {10, 1, 2};
+
+    for (int i=0; i < numCQs; i++) {
+      cqDUnitTest.validateCQ(client, "testCQHAWithState_" + i, CqQueryDUnitTest.noTest, resultsCnt[i], 0, 0);
+    }    
+
+    // Close server1.
+    // To maintain the redundancy; it will make connection to endpoint-3.
+    cqDUnitTest.closeServer(server1);
+    pause(3 * 1000);
+    
+    
+    // UPDATE-1.
+    cqDUnitTest.createValues(server2, cqDUnitTest.regions[0], 10);
+    cqDUnitTest.createValues(server2, cqDUnitTest.regions[1], 10);
+    
+    for (int i=1; i <= size; i++) {
+      cqDUnitTest.waitForUpdated(client, "testCQHAWithState_0", CqQueryDUnitTest.KEY + size);
+    }
+    
+    for (int i=0; i < numCQs; i++) {
+      cqDUnitTest.validateCQ(client, "testCQHAWithState_" + i, CqQueryDUnitTest.noTest, resultsCnt[i], resultsCnt[i], CqQueryDUnitTest.noTest);
+    }    
+
+    //Stop cq.
+    cqDUnitTest.stopCQ(client, "testCQHAWithState_0");
+    
+    pause(2 * 1000);
+    
+    // UPDATE with stop.
+    cqDUnitTest.createServer(server3, ports[1]);
+    server3.invokeInt(CqQueryDUnitTest.class, "getCacheServerPort");
+    pause(2 * 1000);
+    
+    cqDUnitTest.clearCQListenerEvents(client, "testCQHAWithState_0");
+    
+    cqDUnitTest.createValues(server2, cqDUnitTest.regions[0], 10);    
+    cqDUnitTest.createValues(server2, cqDUnitTest.regions[1], 10);
+    
+    // Wait for events at client.
+    try {
+      cqDUnitTest.waitForUpdated(client, "testCQHAWithState_0", CqQueryDUnitTest.KEY + 1);
+      fail("Events not expected since CQ is in stop state.");
+    } catch (Exception ex) {
+      // Success.
+    }
+    
+    cqDUnitTest.executeCQ(client, "testCQHAWithState_0", false, null);
+    pause(2 * 1000);
+    
+    // Update - 2 
+    cqDUnitTest.createValues(server3, cqDUnitTest.regions[0], 10);    
+    cqDUnitTest.createValues(server3, cqDUnitTest.regions[1], 10);
+    
+    for (int i=1; i <= size; i++) {
+      cqDUnitTest.waitForUpdated(client, "testCQHAWithState_0", CqQueryDUnitTest.KEY + size);
+    }
+
+    for (int i=0; i < numCQs; i++) {
+      cqDUnitTest.validateCQ(client, "testCQHAWithState_" + i, CqQueryDUnitTest.noTest, resultsCnt[i], resultsCnt[i] * 2, CqQueryDUnitTest.noTest);
+    }    
+    
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server2);
+    cqDUnitTest.closeServer(server3);
+  }
+
+  
+  
+  /**
+   * Tests propogation of invalidates and destorys to the clients. Bug 37242.
+   * 
+   * @throws Exception
+   */
+  public void testCQWithDestroysAndInvalidates() throws Exception
+  {
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client = host.getVM(1);
+    VM producer = host.getVM(2);
+    cqDUnitTest.createServer(server, 0, true);
+    final int port = server.invokeInt(CqQueryDUnitTest.class,
+        "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+
+    // Create client.
+    cqDUnitTest.createClient(client, port, host0);
+    // producer is not doing any thing.
+    cqDUnitTest.createClient(producer, port, host0);
+
+    final int size = 10;
+    final String name = "testQuery_4";
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+
+    cqDUnitTest.createCQ(client, name, cqDUnitTest.cqs[4]);
+    cqDUnitTest.executeCQ(client, name, true, null);
+    
+    // do destroys and invalidates.
+    server.invoke(new CacheSerializableRunnable("Create values") {
+      public void run2() throws CacheException
+      {
+        Region region1 = getRootRegion().getSubregion(cqDUnitTest.regions[0]);
+        for (int i = 1; i <= 5; i++) {
+          region1.destroy( CqQueryDUnitTest.KEY + i);
+        }
+      }
+    });
+    for (int i = 1; i <= 5; i++) {
+      cqDUnitTest.waitForDestroyed(client, name , CqQueryDUnitTest.KEY+i);
+    }
+    // recreate the key values from 1 - 5
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 5);
+    // wait for all creates to arrive.
+    for (int i = 1; i <= 5; i++) {
+      cqDUnitTest.waitForCreated(client, name , CqQueryDUnitTest.KEY+i);
+    }
+    
+    // do more puts to push first five key-value to disk.
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 10);
+    // do invalidates on fisrt five keys.
+    server.invoke(new CacheSerializableRunnable("Create values") {
+      public void run2() throws CacheException
+      {
+        Region region1 = getRootRegion().getSubregion(cqDUnitTest.regions[0]);
+        for (int i = 1; i <= 5; i++) {
+          region1.invalidate( CqQueryDUnitTest.KEY + i);
+        }
+      }
+    });
+    // wait for invalidates now.
+    for (int i = 1; i <= 5; i++) {
+      cqDUnitTest.waitForInvalidated(client, name , CqQueryDUnitTest.KEY+i);
+    }
+        
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server);
+  
+  }
+  
+  /**
+   * Tests make sure that the second client doesnt get more
+   * events then there should be. This will test the fix for 
+   * bug 37295.
+   * 
+   * @author rdubey
+   */
+  public void testCQWithMultipleClients() throws Exception {
+    
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client1 = host.getVM(1);
+    VM client2 = host.getVM(2);
+    VM client3 = host.getVM(3);
+    
+    /* Create Server and Client */
+    cqDUnitTest.createServer(server);
+    final int port = server.invokeInt(CqQueryDUnitTest.class, "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+    cqDUnitTest.createClient(client1, port, host0);
+    cqDUnitTest.createClient(client2, port, host0);
+    
+    /* Create CQs. and initialize the region */
+    // this should statisfy every thing since id is always greater than 
+    // zero.
+    cqDUnitTest.createCQ(client1, "testCQWithMultipleClients_0", cqDUnitTest.cqs[0]);
+    cqDUnitTest.executeCQ(client1, "testCQWithMultipleClients_0", false, null);
+    // should only satisfy one key-value pair in the region.
+    cqDUnitTest.createCQ(client2, "testCQWithMultipleClients_0", cqDUnitTest.cqs[1]);
+    cqDUnitTest.executeCQ(client2, "testCQWithMultipleClients_0", false, null);
+    
+    int size = 10;
+    
+    // Create Values on Server.
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+    
+    
+    cqDUnitTest.waitForCreated(client1, "testCQWithMultipleClients_0", CqQueryDUnitTest.KEY + 10);
+    
+    
+    /* Validate the CQs */
+    cqDUnitTest.validateCQ(client1, "testCQWithMultipleClients_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ 0,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ 0,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size);
+    
+    
+    cqDUnitTest.waitForCreated(client2, "testCQWithMultipleClients_0", CqQueryDUnitTest.KEY + 2 );
+    
+    
+    cqDUnitTest.validateCQ(client2, "testCQWithMultipleClients_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ 1,
+        /* updates: */ 0,
+        /* deletes; */ 0,
+        /* queryInserts: */ 1,
+        /* queryUpdates: */ 0,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ 1);
+         
+        
+    /* Close Server and Client */
+    cqDUnitTest.closeClient(client2);
+    cqDUnitTest.closeClient(client3);
+    cqDUnitTest.closeServer(server);
+  }
+
+  /**
+   * Test for CQ when region is populated with net load.
+   * @throws Exception
+   */
+  public void testCQWithLoad() throws Exception {
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    
+    VM client = host.getVM(2);
+    
+    cqDUnitTest.createServer(server1, 0, false, MirrorType.KEYS_VALUES);
+    cqDUnitTest.createServer(server2, 0, false, MirrorType.KEYS);
+        
+    final int port1 = server1.invokeInt(CqQueryDUnitTest.class, "getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+      
+    cqDUnitTest.createClient(client, port1, host0);
+    
+    // Create CQs.
+    cqDUnitTest.createCQ(client, "testCQWithLoad_0", cqDUnitTest.cqs[0]);  
+    cqDUnitTest.executeCQ(client, "testCQWithLoad_0", false, null); 
+    
+    pause(2 * 1000);
+    
+    final int size = 10;
+    
+    // CREATE VALUES.
+    cqDUnitTest.createValues(server2, cqDUnitTest.regions[0], size);
+    
+    server1.invoke(new CacheSerializableRunnable("Load from second server") {
+      public void run2() throws CacheException {
+        Region region1 = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
+        for (int i=1; i <= size; i++){
+          region1.get(CqQueryDUnitTest.KEY + i);
+        }
+      }
+    });
+    
+    for (int i=1; i <= size; i++) {
+      cqDUnitTest.waitForCreated(client, "testCQWithLoad_0", CqQueryDUnitTest.KEY + i);
+    }
+        
+    cqDUnitTest.validateCQ(client, "testCQWithLoad_0", CqQueryDUnitTest.noTest, size, 0, 0);
+        
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server1);
+    cqDUnitTest.closeServer(server2);
+  }
+
+  /**
+   * Test for CQ when entries are evicted from region.
+   * @throws Exception
+   */
+  public void testCQWithEviction() throws Exception {
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM client = host.getVM(2);
+    
+    final int evictionThreshold = 1;
+    server1.invoke(new CacheSerializableRunnable("Create Cache Server") {
+      public void run2() throws CacheException {
+        getLogWriter().info("### Create Cache Server. ###");
+        AttributesFactory factory = new AttributesFactory();
+        factory.setScope(Scope.DISTRIBUTED_ACK);
+        factory.setDataPolicy(DataPolicy.REPLICATE);
+        //factory.setMirrorType(MirrorType.NONE);
+        // setting the eviction attributes.
+        EvictionAttributes evictAttrs = EvictionAttributes.createLRUEntryAttributes(evictionThreshold, 
+            EvictionAction.OVERFLOW_TO_DISK);
+        factory.setEvictionAttributes(evictAttrs);
+      
+        for (int i = 0; i < cqDUnitTest.regions.length; i++) { 
+          Region region = createRegion(cqDUnitTest.regions[i], factory.createRegionAttributes());
+          // Set CacheListener.
+          region.getAttributesMutator().setCacheListener(new CertifiableTestCacheListener(getLogWriter()));  
+        } 
+        pause(2000);
+        
+        try {
+          cqDUnitTest.startBridgeServer(0, true);
+        } catch (Exception ex) {
+          fail("While starting CacheServer", ex);
+        }
+        pause(2000);
+
+      }
+    });
+        
+    final int port1 = server1.invokeInt(CqQueryDUnitTest.class, "getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+      
+    cqDUnitTest.createClient(client, port1, host0);
+    
+    // Create CQs.
+    cqDUnitTest.createCQ(client, "testCQWithEviction_0", cqDUnitTest.cqs[0]);  
+    
+    final int size = 10;
+    
+    // CREATE VALUES.
+    cqDUnitTest.createValues(server1, cqDUnitTest.regions[0], size);
+    
+    cqDUnitTest.executeCQ(client, "testCQWithEviction_0", false, "CqException"); 
+    
+    pause(1 * 1000);
+
+    // Update VALUES.
+    cqDUnitTest.createValues(server1, cqDUnitTest.regions[0], size);
+
+    for (int i=1; i <= size; i++) {
+      cqDUnitTest.waitForUpdated(client, "testCQWithEviction_0", cqDUnitTest.KEY + i);
+    }
+
+    cqDUnitTest.validateCQ(client, "testCQWithEviction_0", cqDUnitTest.noTest, 0, 10, 0);
+        
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server1);
+  }
+
+  /**
+   * Test for CQ with ConnectionPool.
+   * @throws Exception
+   */
+  public void testCQWithConnectionPool() throws Exception {
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM client = host.getVM(1);
+
+    cqDUnitTest.createServer(server1, 0, false, MirrorType.KEYS_VALUES);
+
+    final int port1 = server1.invokeInt(CqQueryDUnitTest.class, "getCacheServerPort");
+    final String serverHost = getServerHostName(server1.getHost());
+
+    final String[] regions = cqDUnitTest.regions;
+    final int[] serverPorts = new int[] {port1};
+
+    // createClientWithConnectionPool
+    SerializableRunnable createClientWithPool =
+      new CacheSerializableRunnable("createClientWithPool") {
+      public void run2() throws CacheException {
+        getLogWriter().info("### Create Client. ###");
+        // Initialize CQ Service.
+        try {
+          getCache().getQueryService();
+        } catch (Exception cqe) {
+          cqe.printStackTrace();
+          fail("Failed to getCQService.");
+        }
+
+        AttributesFactory regionFactory = new AttributesFactory();
+        regionFactory.setScope(Scope.LOCAL);
+        ClientServerTestCase.configureConnectionPool(regionFactory, serverHost, serverPorts[0], -1, false, -1, -1, null);
+        for (int i=0; i < regions.length; i++) {        
+          createRegion(regions[i], regionFactory.create() );
+          getLogWriter().info("### Successfully Created Region on Client :" + regions[i]);
+        }
+      }
+    };
+
+    client.invoke(createClientWithPool);
+
+    // Create CQs.
+    cqDUnitTest.createCQ(client, "testCQWithPool_0", cqDUnitTest.cqs[0]);  
+
+    // This should fail as Region doesn't have ConnectionPool
+    try {
+      cqDUnitTest.executeCQ(client, "testCQWithPool_0", false,"CqException");
+      fail("CQ Execution should have failed with BridgeClient/Writer not found.");
+    } catch (Exception ex) {
+      // Expected.
+    }
+
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server1);
+  }
+  
+  /**
+   * Test for CQ with BridgeClient.
+   * @throws Exception
+   */
+  public void testCQWithBridgeClient() throws Exception {
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM client = host.getVM(1);
+
+    cqDUnitTest.createServer(server1, 0, false, MirrorType.KEYS_VALUES);
+
+    final int port1 = server1.invokeInt(CqQueryDUnitTest.class, "getCacheServerPort");
+    final String serverHost = getServerHostName(server1.getHost());
+
+    final String[] regions = cqDUnitTest.regions;
+    final int[] serverPorts = new int[] {port1};
+
+    // createClientWithBridgeClient
+    SerializableRunnable createClientWithPool =
+      new CacheSerializableRunnable("createClientWithPool") {
+      public void run2() throws CacheException {
+        getLogWriter().info("### Create Client. ###");
+        //Region region1 = null;
+        // Initialize CQ Service.
+        try {
+          getCache().getQueryService();
+        } catch (Exception cqe) {
+          cqe.printStackTrace();
+          fail("Failed to getCQService.");
+        }
+
+        AttributesFactory regionFactory = new AttributesFactory();
+        regionFactory.setScope(Scope.LOCAL);
+        ClientServerTestCase.configureConnectionPool(regionFactory, serverHost, serverPorts[0], -1, true, -1, -1, null);
+
+        for (int i=0; i < regions.length; i++) {        
+          createRegion(regions[i], regionFactory.createRegionAttributes());
+          getLogWriter().info("### Successfully Created Region on Client :" + regions[i]);
+        }
+      }
+    };
+
+    client.invoke(createClientWithPool);
+
+    // Create CQs.
+    cqDUnitTest.createCQ(client, "testCQWithPool_1", cqDUnitTest.cqs[0]);  
+
+    // This should pass.
+    cqDUnitTest.executeCQ(client, "testCQWithPool_1", false,null);
+
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server1);
+  }
+  
+  /**
+   * Test for CQ with ConnectionPool.
+   * @throws Exception
+   */
+  public void testCQWithPool() throws Exception {
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM client = host.getVM(1);
+
+    cqDUnitTest.createServer(server1, 0, false, MirrorType.KEYS_VALUES);
+
+    final int port1 = server1.invokeInt(CqQueryDUnitTest.class, "getCacheServerPort");
+    final String serverHost = getServerHostName(server1.getHost());
+
+    final String[] regions = cqDUnitTest.regions;
+    final int[] serverPorts = new int[] {port1};
+
+    // createClientWithConnectionPool
+    SerializableRunnable createClientWithConnectionPool =
+      new CacheSerializableRunnable("createClientWithConnectionPool") {
+      public void run2() throws CacheException {
+        getLogWriter().info("### Create Client. ###");
+        //Region region1 = null;
+        // Initialize CQ Service.
+        try {
+          getCache().getQueryService();
+        } catch (Exception cqe) {
+          cqe.printStackTrace();
+          fail("Failed to getCQService.");
+        }
+
+        AttributesFactory regionFactory = new AttributesFactory();
+        regionFactory.setScope(Scope.LOCAL);
+        ClientServerTestCase.configureConnectionPool(regionFactory, serverHost, serverPorts[0], -1, true, -1, -1, null);
+        for (int i=0; i < regions.length; i++) {        
+          createRegion(regions[i], regionFactory.createRegionAttributes());
+          getLogWriter().info("### Successfully Created Region on Client :" + regions[i]);
+        }
+      }
+    };
+
+    client.invoke(createClientWithConnectionPool);
+
+    // Create CQs.
+    cqDUnitTest.createCQ(client, "testCQWithPool_2", cqDUnitTest.cqs[0]);  
+
+    // This should pass.
+    cqDUnitTest.executeCQ(client, "testCQWithPool_2", false, null);
+
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server1);
+  }
+
+  /**
+   * Test for CQ with establishCallBackConnection.
+   * @throws Exception
+   */
+  public void testCQWithEstablishCallBackConnection() throws Exception {
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM client = host.getVM(1);
+
+    cqDUnitTest.createServer(server1, 0, false, MirrorType.KEYS_VALUES);
+
+    final int port1 = server1.invokeInt(CqQueryDUnitTest.class, "getCacheServerPort");
+    final String serverHost = getServerHostName(server1.getHost());
+
+    final String[] regions = cqDUnitTest.regions;
+    final int[] serverPorts = new int[] {port1};
+
+    // createClientWithPool
+    SerializableRunnable createClientWithPool =
+      new CacheSerializableRunnable("createClientWithPool") {
+      public void run2() throws CacheException {
+        getLogWriter().info("### Create Client. ###");
+        //Region region1 = null;
+        // Initialize CQ Service.
+        try {
+          getCache().getQueryService();
+        } catch (Exception cqe) {
+          cqe.printStackTrace();
+          fail("Failed to getCQService.");
+        }
+
+        AttributesFactory regionFactory = new AttributesFactory();
+        regionFactory.setScope(Scope.LOCAL);
+        
+        ClientServerTestCase.configureConnectionPool(regionFactory, serverHost, serverPorts[0], -1, false, -1, -1, null);
+
+        for (int i=0; i < regions.length; i++) {        
+          createRegion(regions[i], regionFactory.createRegionAttributes());
+          getLogWriter().info("### Successfully Created Region on Client :" + regions[i]);
+        }
+      }
+    };
+
+    
+    client.invoke(createClientWithPool);
+
+    // Create CQs.
+    cqDUnitTest.createCQ(client, "testCQWithEstablishCallBackConnection_0", cqDUnitTest.cqs[0]);  
+
+    // This should fail.
+    try {
+      cqDUnitTest.executeCQ(client, "testCQWithEstablishCallBackConnection_0", false, "CqException");
+      fail("Test should have failed with connection with establishCallBackConnection not found.");
+    } catch (Exception ex) {
+      // Expected.
+    }
+
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server1);
+  }
+  
+  /**
+   * Test for:
+   * Region destroy, calls close on the server.
+   * Region clear triggers cqEvent with query op region clear.
+   * Region invalidate triggers cqEvent with query op region invalidate.
+   * @throws Exception
+   */
+  public void testRegionEvents() throws Exception {
+    
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client = host.getVM(1);
+    
+    cqDUnitTest.createServer(server);
+    final int port = server.invokeInt(CqQueryDUnitTest.class, "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+    
+    cqDUnitTest.createClient(client, port, host0);
+    
+    // Create CQ on regionA
+    cqDUnitTest.createCQ(client, "testRegionEvents_0", cqDUnitTest.cqs[0]);
+    cqDUnitTest.executeCQ(client, "testRegionEvents_0", false, null);
+    
+    // Create CQ on regionB
+    cqDUnitTest.createCQ(client, "testRegionEvents_1", cqDUnitTest.cqs[2]);
+    cqDUnitTest.executeCQ(client, "testRegionEvents_1", false, null);
+
+    // Test for Event on Region Clear.
+    server.invoke(new CacheSerializableRunnable("testRegionEvents"){
+      public void run2()throws CacheException {
+        getLogWriter().info("### Clearing the region on the server ###");
+        Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
+        for (int i = 1; i <=5; i++) {
+          region.put(CqQueryDUnitTest.KEY+i, new Portfolio(i));
+        }
+        region.clear();
+      }
+    });
+    
+    cqDUnitTest.waitForRegionClear(client,"testRegionEvents_0");
+
+    // Test for Event on Region invalidate.
+    server.invoke(new CacheSerializableRunnable("testRegionEvents"){
+      public void run2()throws CacheException {
+        getLogWriter().info("### Invalidate the region on the server ###");
+        Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
+        for (int i = 1; i <=5; i++) {
+          region.put(CqQueryDUnitTest.KEY+i, new Portfolio(i));
+        }
+        region.invalidateRegion();
+      }
+    });
+
+    cqDUnitTest.waitForRegionInvalidate(client,"testRegionEvents_0");
+
+    // Test for Event on Region destroy.
+    server.invoke(new CacheSerializableRunnable("testRegionEvents"){
+      public void run2()throws CacheException {
+        getLogWriter().info("### Destroying the region on the server ###");
+        Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[1]);
+        for (int i = 1; i <=5; i++) {
+          region.put(CqQueryDUnitTest.KEY+i, new Portfolio(i));
+        }
+        // this should close one cq on client.
+        region.destroyRegion();
+      }
+    });
+
+    pause(1000); // wait for cq to close becuse of region destroy on server.
+    //cqDUnitTest.waitForClose(client,"testRegionEvents_1");
+    cqDUnitTest.validateCQCount(client,1);
+
+        
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server);
+
+  }
+
+
+  /**
+   * Test for events created during the CQ query execution.
+   * When CQs are executed using executeWithInitialResults 
+   * there may be possibility that the region changes during
+   * that time may not be reflected in the query result set
+   * thus making the query data and region data inconsistent.
+   * @throws Exception
+   */
+  public void testEventsDuringQueryExecution() throws Exception {
+    
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client = host.getVM(1);
+    final String cqName = "testEventsDuringQueryExecution_0";
+    cqDUnitTest.createServer(server);
+    final int port = server.invokeInt(CqQueryDUnitTest.class, "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+    
+    // Initialize Client.
+    cqDUnitTest.createClient(client, port, host0);
+    
+    // create CQ.
+    cqDUnitTest.createCQ(client, cqName, cqDUnitTest.cqs[0]);
+    
+    final int numObjects = 200;
+    final int totalObjects = 500;
+    
+    // initialize Region.
+    server.invoke(new CacheSerializableRunnable("Update Region"){
+      public void run2()throws CacheException {
+        Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
+        for (int i = 1; i <= numObjects; i++) {
+          Portfolio p = new Portfolio(i);
+          region.put(""+i, p);
+        }
+      }
+    });
+    
+    // Execute CQ while update is in progress.
+    AsyncInvocation processCqs = client.invokeAsync(new CacheSerializableRunnable("Execute CQ") {
+      public void run2()throws CacheException {
+        QueryService cqService = getCache().getQueryService();
+        // Get CqQuery object.
+        CqQuery cq1 = cqService.getCq(cqName);
+        if (cq1 == null) {
+          fail("Failed to get CQ " + cqName);
+        }
+        
+        SelectResults cqResults = null;
+
+        try {
+          cqResults = cq1.executeWithInitialResults();
+        } catch (Exception ex){
+          AssertionError err = new AssertionError("Failed to execute  CQ " + cqName);
+          err.initCause(ex);
+          throw err;
+        }
+        
+        //getLogWriter().info("initial result size = " + cqResults.size());
+        
+        CqQueryTestListener cqListener = (CqQueryTestListener)cq1.getCqAttributes().getCqListener();
+        // Wait for the last key to arrive.
+        cqListener.waitForCreated("" + totalObjects);
+        
+        // Check if the events from CqListener are in order.
+        int oldId = 0;
+        for (Object cqEvent : cqListener.events.toArray()) { 
+          int newId = new Integer(cqEvent.toString()).intValue();
+          if (oldId > newId){
+            fail("Queued events for CQ Listener during execution with " + 
+                "Initial results is not in the order in which they are created.");
+          }
+          oldId = newId;
+        }
+        
+        // Check if all the IDs are present as part of Select Results and CQ Events.
+        HashSet ids = new HashSet(cqListener.events);
+        for (Object o : cqResults.asList()) {
+          Struct s = (Struct)o;
+          ids.add(s.get("key"));
+        }
+
+        //Iterator iter = cqResults.asSet().iterator();
+        //while (iter.hasNext()) {
+        //  Portfolio p = (Portfolio)iter.next();
+        //  ids.add(p.getPk());
+        //  //getLogWriter().info("Result set value : " + p.getPk());
+        //}
+        
+        HashSet missingIds = new HashSet();
+        String key = "";
+        for (int i = 1; i <= totalObjects; i++) {
+          key = "" + i;
+          if (!(ids.contains(key))){
+            missingIds.add(key);
+          }
+        }
+        
+        if (!missingIds.isEmpty()) {
+          fail("Missing Keys in either ResultSet or the Cq Event list. " +
+              " Missing keys : [size : " + missingIds.size() + "]" + missingIds +
+              " Ids in ResultSet and CQ Events :" + ids);
+        }
+        
+      } 
+    });
+    
+    // Keep updating region (async invocation).
+    server.invokeAsync(new CacheSerializableRunnable("Update Region"){
+      public void run2()throws CacheException {
+        //Wait to allow client a chance to register the cq
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+        Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
+        for (int i = numObjects + 1; i <= totalObjects; i++) {
+          Portfolio p = new Portfolio(i);
+          region.put(""+i, p);
+        }
+      }
+    });
+
+    //wait for 60 seconds for test to complete
+    DistributedTestCase.join(processCqs, 60 * 1000, getLogWriter());
+    
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server);
+  }
+  
+  /**
+   * This test was created to test executeWithInitialResults being called
+   * multiple times. Previously, the queueEvents would be overwritten and we
+   * would lose data. This test will execute the method twice. The first time,
+   * the first execution will block it's own child thread (TC1). The second
+   * execution will block until TC1 is completed (based on how
+   * executeWithInitialResults is implemented) A third thread will be awaken and
+   * release the latch in the testhook for TC1 to complete.
+   * 
+   * @throws Exception
+   */
+  public void testMultipleExecuteWithInitialResults() throws Exception {
+    final int numObjects = 200;
+    final int totalObjects = 500;
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client = host.getVM(1);
+    client.invoke(setTestHook());
+    final String cqName = "testMultiExecuteWithInitialResults";
+
+    // initialize server and retreive host and port values
+    cqDUnitTest.createServer(server);
+    final int port = server.invokeInt(CqQueryDUnitTest.class,
+        "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+
+    // Initialize Client.
+    cqDUnitTest.createClient(client, port, host0);
+
+    // create CQ.
+    cqDUnitTest.createCQ(client, cqName, cqDUnitTest.cqs[0]);
+
+    // initialize Region.
+    server.invoke(new CacheSerializableRunnable("Update Region") {
+      public void run2() throws CacheException {
+        Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
+        for (int i = 1; i <= numObjects; i++) {
+          Portfolio p = new Portfolio(i);
+          region.put("" + i, p);
+        }
+      }
+    });
+
+    // Keep updating region (async invocation).
+    server.invokeAsync(new CacheSerializableRunnable("Update Region") {
+      public void run2() throws CacheException {
+        //Wait to give client a chance to register the cq
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+        Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
+        for (int i = numObjects + 1; i <= totalObjects; i++) {
+          Portfolio p = new Portfolio(i);
+          region.put("" + i, p);
+        }
+      }
+    });
+    
+    // the thread that validates all results and executes first
+    AsyncInvocation processCqs =client.invokeAsync(new CacheSerializableRunnable("Execute CQ first") {
+      public void run2() throws CacheException {
+        SelectResults cqResults = null;
+        QueryService cqService = getCache().getQueryService();
+        // Get CqQuery object.
+        CqQuery cq1 = cqService.getCq(cqName);
+        if (cq1 == null) {
+          fail("Failed to get CQ " + cqName);
+        }
+        try {
+          cqResults = cq1.executeWithInitialResults();
+
+        } catch (Exception e) {
+          AssertionError err = new AssertionError("Failed to execute  CQ "
+              + cqName);
+          err.initCause(e);
+          throw err;
+        }
+
+        CqQueryTestListener cqListener = (CqQueryTestListener) cq1
+            .getCqAttributes().getCqListener();
+        // Wait for the last key to arrive.
+        cqListener.waitForCreated("" + totalObjects);
+        // Check if the events from CqListener are in order.
+        int oldId = 0;
+        for (Object cqEvent : cqListener.events.toArray()) {
+          int newId = new Integer(cqEvent.toString()).intValue();
+          if (oldId > newId) {
+            fail("Queued events for CQ Listener during execution with "
+                + "Initial results is not in the order in which they are created.");
+          }
+          oldId = newId;
+        }
+
+        // Check if all the IDs are present as part of Select Results and CQ
+        // Events.
+        HashSet ids = new HashSet(cqListener.events);
+        for (Object o : cqResults.asList()) {
+          Struct s = (Struct) o;
+          ids.add(s.get("key"));
+        }
+
+        HashSet missingIds = new HashSet();
+        String key = "";
+        for (int i = 1; i <= totalObjects; i++) {
+          key = "" + i;
+          if (!(ids.contains(key))) {
+            missingIds.add(key);
+          }
+        }
+
+        if (!missingIds.isEmpty()) {
+          fail("Missing Keys in either ResultSet or the Cq Event list. "
+              + " Missing keys : [size : " + missingIds.size() + "]"
+              + missingIds + " Ids in ResultSet and CQ Events :" + ids);
+        }
+      }
+    });
+
+    // the second call to executeWithInitialResults. Goes to sleep hopefully
+    // long enough
+    // for the first call to executeWithInitialResults first
+    client.invokeAsync(new CacheSerializableRunnable("Execute CQ second") {
+      public void run2() throws CacheException {
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+        QueryService cqService = getCache().getQueryService();
+        // Get CqQuery object.
+        CqQuery cq1 = cqService.getCq(cqName);
+        if (cq1 == null) {
+          fail("Failed to get CQ " + cqName);
+        }
+        try {
+          cq1.executeWithInitialResults();
+        } catch (IllegalStateException e) {
+          // we expect an error due to the cq having already being in run state
+        } catch (Exception e) {
+          AssertionError err = new AssertionError("test hook lock interrupted"
+              + cqName);
+          err.initCause(e);
+          throw err;
+        }
+      }
+    });
+
+    // thread that unlatches the test hook, sleeping long enough for both
+    // the other two threads to execute first
+    client.invokeAsync(new CacheSerializableRunnable("Release latch") {
+      public void run2() throws CacheException {
+        // we wait to release the testHook and hope the other two threads have
+        // had a chance to invoke executeWithInitialResults
+        try {
+          Thread.sleep(5000);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          AssertionError err = new AssertionError("test hook lock interrupted"
+              + cqName);
+          err.initCause(e);
+          throw err;
+        }
+        CqQueryImpl.testHook.ready();
+      }
+    });
+
+    //wait for 60 seconds for test to complete
+    DistributedTestCase.join(processCqs, 60 * 1000, getLogWriter());
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server);
+  }
+
+  public CacheSerializableRunnable setTestHook() {
+    SerializableRunnable sr = new CacheSerializableRunnable("TestHook") {
+      public void run2() {
+        class CqQueryTestHook implements CqQueryImpl.TestHook {
+
+          CountDownLatch latch = new CountDownLatch(1);
+
+          public void pauseUntilReady() {
+            try {
+              latch.await();
+            } catch (Exception e) {
+              e.printStackTrace();
+              Thread.currentThread().interrupt();
+            }
+          }
+
+          public void ready() {
+            latch.countDown();
+          }
+
+          @Override
+          public int numQueuedEvents() {
+            // TODO Auto-generated method stub
+            return 0;
+          }
+
+          @Override
+          public void setEventCount(int count) {
+            // TODO Auto-generated method stub
+            
+          }
+
+        }
+        ;
+        CqQueryImpl.testHook = new CqQueryTestHook();
+      }
+    };
+    return (CacheSerializableRunnable) sr;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqDataOptimizedExecuteDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqDataOptimizedExecuteDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqDataOptimizedExecuteDUnitTest.java
new file mode 100644
index 0000000..9ea420d
--- /dev/null
+++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqDataOptimizedExecuteDUnitTest.java
@@ -0,0 +1,46 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.query.cq.dunit;
+
+
+import com.gemstone.gemfire.cache.query.internal.cq.CqServiceImpl;
+
+import dunit.SerializableRunnable;
+
+/**
+ * Test class for testing {@link CqServiceImpl#EXECUTE_QUERY_DURING_INIT} flag
+ *
+ */
+public class CqDataOptimizedExecuteDUnitTest extends CqDataDUnitTest{
+
+  public CqDataOptimizedExecuteDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+    //We're seeing this on the server when the client
+    //disconnects.
+    addExpectedException("Connection reset");
+    invokeInEveryVM(new SerializableRunnable("getSystem") {
+      public void run() {
+        CqServiceImpl.EXECUTE_QUERY_DURING_INIT = false;
+      }
+    });
+  }
+  
+  @Override
+  public void tearDown2() throws Exception {
+    invokeInEveryVM(new SerializableRunnable("getSystem") {
+      public void run() {
+        CqServiceImpl.EXECUTE_QUERY_DURING_INIT = true;
+      }
+    });
+    super.tearDown2();
+  }
+}