You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2018/08/06 20:28:48 UTC

[geode] branch develop updated: GEODE-577: rewrite QueryMonitorDUnitTest (#2179)

This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 38e1714  GEODE-577: rewrite QueryMonitorDUnitTest (#2179)
38e1714 is described below

commit 38e1714b54894caafa508dab9634d62a9c4c42fc
Author: jinmeiliao <ji...@pivotal.io>
AuthorDate: Mon Aug 6 13:28:44 2018 -0700

    GEODE-577: rewrite QueryMonitorDUnitTest (#2179)
    
    * code cleanup.
    * add QueryMonitor unit test
    * do not add cq query to the monitor queue
---
 .../ResourceManagerWithQueryMonitorDUnitTest.java  |   23 +-
 .../geode/cache/query/internal/DefaultQuery.java   |   10 +-
 .../geode/cache/query/internal/QueryMonitor.java   |  102 +-
 .../cache/PartitionedRegionQueryEvaluator.java     |    6 +-
 .../cache/query/internal/QueryMonitorTest.java     |   91 ++
 .../cache/query/dunit/QueryMonitorDUnitTest.java   | 1299 +++++---------------
 6 files changed, 457 insertions(+), 1074 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java
index 598bde1..80f2192 100755
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java
@@ -870,7 +870,7 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
             ServerOperationException soe = (ServerOperationException) e;
             if (soe.getRootCause() instanceof QueryException) {
               QueryException qe = (QueryException) soe.getRootCause();
-              if (isExceptionDueToTimeout(qe, queryTimeout)) {
+              if (isExceptionDueToTimeout(qe)) {
                 logger.info("Query Execution must be terminated by a timeout. Expected behavior");
                 return 0;
               }
@@ -952,7 +952,7 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
           // meaning the query should not be canceled due to low memory
           throw new CacheException("Query should not have been canceled due to memory") {};
         }
-      } else if (isExceptionDueToTimeout((QueryException) e, queryTimeout)) {
+      } else if (isExceptionDueToTimeout((QueryException) e)) {
 
         if (queryTimeout == -1) {
           // no time out set, this should not be thrown
@@ -976,7 +976,7 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
             // meaning the query should not be canceled due to low memory
             throw new CacheException("Query should not have been canceled due to memory") {};
           }
-        } else if (isExceptionDueToTimeout(qe, queryTimeout)) {
+        } else if (isExceptionDueToTimeout(qe)) {
           if (queryTimeout == -1) {
             e.printStackTrace();
             // no time out set, this should not be thrown
@@ -1219,13 +1219,12 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
                 .toLocalizedString()));
   }
 
-  private boolean isExceptionDueToTimeout(QueryException e, long queryTimeout) {
+  private boolean isExceptionDueToTimeout(QueryException e) {
     String message = e.getMessage();
     // -1 needs to be matched due to client/server set up, BaseCommand uses the
     // MAX_QUERY_EXECUTION_TIME and not the testMaxQueryExecutionTime
     return (message.contains("The QueryMonitor thread may be sleeping longer than")
-        || message.contains(LocalizedStrings.QueryMonitor_LONG_RUNNING_QUERY_CANCELED
-            .toLocalizedString(queryTimeout))
+        || message.contains("Query execution cancelled after exceeding max execution time")
         || message.contains(
             LocalizedStrings.QueryMonitor_LONG_RUNNING_QUERY_CANCELED.toLocalizedString(-1)));
   }
@@ -1261,10 +1260,6 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
       }
     }
 
-    public void doTestHook(String description) {
-
-    }
-
     public void countDown() {
       latch.countDown();
     }
@@ -1288,10 +1283,6 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
         }
       }
     }
-
-    public void doTestHook(String description) {
-
-    }
   }
 
   private class CancelDuringAddResultsHook implements DefaultQuery.TestHook {
@@ -1315,9 +1306,5 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
         rejectedObjects = true;
       }
     }
-
-    public void doTestHook(String description) {
-
-    }
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
index fad0e0f..1053d8d 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
@@ -716,11 +716,9 @@ public class DefaultQuery implements Query {
 
   /**
    * The query gets canceled by the QueryMonitor with the reason being specified
-   * <p>
-   * TODO: parameter isCanceled is always true
    */
-  public void setCanceled(boolean isCanceled, CacheRuntimeException canceledException) {
-    this.isCanceled = isCanceled;
+  public void setCanceled(CacheRuntimeException canceledException) {
+    this.isCanceled = true;
     this.canceledException = canceledException;
   }
 
@@ -985,8 +983,8 @@ public class DefaultQuery implements Query {
   }
 
   public interface TestHook {
-    void doTestHook(int spot);
+    default void doTestHook(int spot) {};
 
-    void doTestHook(String spot);
+    default void doTestHook(String spot) {};
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java
index ccba334..f7750bc 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java
@@ -14,21 +14,16 @@
  */
 package org.apache.geode.cache.query.internal;
 
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.logging.log4j.Logger;
 
-import org.apache.geode.cache.query.Query;
 import org.apache.geode.cache.query.QueryExecutionLowMemoryException;
 import org.apache.geode.cache.query.QueryExecutionTimeoutException;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.logging.log4j.LocalizedMessage;
 
 /**
  * QueryMonitor class, monitors the query execution time. Instantiated based on the system property
@@ -46,13 +41,12 @@ public class QueryMonitor implements Runnable {
   private static final Logger logger = LogService.getLogger();
 
   private final InternalCache cache;
-  private boolean testingQueryMonitor = false;
   /**
    * Holds the query execution status for the thread executing the query. FALSE if the query is not
    * canceled due to max query execution timeout. TRUE it the query is canceled due to max query
    * execution timeout timeout.
    */
-  private static final ThreadLocal<AtomicBoolean> queryExecutionStatus =
+  private static final ThreadLocal<AtomicBoolean> queryCancelled =
       ThreadLocal.withInitial(() -> new AtomicBoolean(Boolean.FALSE));
 
   private final long maxQueryExecutionTime;
@@ -63,9 +57,6 @@ public class QueryMonitor implements Runnable {
 
   private final AtomicBoolean stopped = new AtomicBoolean(Boolean.FALSE);
 
-  /** For DUnit test purpose TODO: delete this ConcurrentMap */
-  private ConcurrentMap queryMonitorTasks = null;
-
   // Variables for cancelling queries due to low memory
   private static volatile Boolean LOW_MEMORY = Boolean.FALSE;
 
@@ -82,14 +73,19 @@ public class QueryMonitor implements Runnable {
    * @param queryThread Thread executing the query.
    * @param query Query.
    */
-  public void monitorQueryThread(Thread queryThread, Query query) {
+  public void monitorQueryThread(Thread queryThread, DefaultQuery query) {
+    // cq query is not monitored
+    if (query.isCqQuery()) {
+      return;
+    }
+
     if (LOW_MEMORY) {
       String reason = LocalizedStrings.QueryMonitor_LOW_MEMORY_CANCELED_QUERY
           .toLocalizedString(LOW_MEMORY_USED_BYTES);
-      ((DefaultQuery) query).setCanceled(true, new QueryExecutionLowMemoryException(reason));
+      query.setCanceled(new QueryExecutionLowMemoryException(reason));
       throw new QueryExecutionLowMemoryException(reason);
     }
-    QueryThreadTask queryTask = new QueryThreadTask(queryThread, query, queryExecutionStatus.get());
+    QueryThreadTask queryTask = new QueryThreadTask(queryThread, query, queryCancelled.get());
     synchronized (queryThreads) {
       queryThreads.add(queryTask);
       queryThreads.notifyAll();
@@ -101,28 +97,19 @@ public class QueryMonitor implements Runnable {
           queryThreads.size(), queryThread.getId(), query.getQueryString(), queryThread);
     }
 
-    // For dunit test purpose
-    if (cache != null && testingQueryMonitor) {
-      if (this.queryMonitorTasks == null) {
-        this.queryMonitorTasks = new ConcurrentHashMap();
-      }
-      this.queryMonitorTasks.put(queryThread, queryTask);
-    }
   }
 
   /**
    * Stops monitoring the query. Removes the passed thread from QueryMonitor queue.
    */
-  public void stopMonitoringQueryThread(Thread queryThread, Query query) {
+  public void stopMonitoringQueryThread(Thread queryThread, DefaultQuery query) {
     // Re-Set the queryExecution status on the LocalThread.
     QueryExecutionTimeoutException testException = null;
-    DefaultQuery defaultQuery = (DefaultQuery) query;
-    boolean[] queryCompleted = defaultQuery.getQueryCompletedForMonitoring();
+    boolean[] queryCompleted = query.getQueryCompletedForMonitoring();
 
     synchronized (queryCompleted) {
-      queryExecutionStatus.get().getAndSet(Boolean.FALSE);
-
-      defaultQuery.setQueryCompletedForMonitoring(true);
+      queryCancelled.get().getAndSet(Boolean.FALSE);
+      query.setQueryCompletedForMonitoring(true);
       // Remove the query task from the queue.
       queryThreads.remove(new QueryThreadTask(queryThread, null, null));
     }
@@ -147,7 +134,7 @@ public class QueryMonitor implements Runnable {
    * gemfire.Cache.MAX_QUERY_EXECUTION_TIME
    */
   public static void isQueryExecutionCanceled() {
-    if (queryExecutionStatus.get() != null && queryExecutionStatus.get().get()) {
+    if (queryCancelled.get() != null && queryCancelled.get().get()) {
       throw new QueryExecutionCanceledException();
     }
   }
@@ -182,24 +169,21 @@ public class QueryMonitor implements Runnable {
     try {
       QueryThreadTask queryTask;
       long sleepTime;
-      // TODO: while-block cannot complete without throwing
       while (true) {
         // Get the first query task from the queue. This query will have the shortest
         // remaining time that needs to canceled first.
         queryTask = (QueryThreadTask) queryThreads.peek();
         if (queryTask == null) {
-          // Empty queue.
           synchronized (queryThreads) {
             queryThreads.wait();
           }
           continue;
         }
 
-        long currentTime = System.currentTimeMillis();
-
+        long executionTime = System.currentTimeMillis() - queryTask.StartTime;
         // Check if the sleepTime is greater than the remaining query execution time.
-        if (currentTime - queryTask.StartTime < this.maxQueryExecutionTime) {
-          sleepTime = this.maxQueryExecutionTime - (currentTime - queryTask.StartTime);
+        if (executionTime < this.maxQueryExecutionTime) {
+          sleepTime = this.maxQueryExecutionTime - executionTime;
           // Its been noted that the sleep is not guaranteed to wait for the specified
           // time (as stated in Suns doc too), it depends on the OSs thread scheduling
           // behavior, hence thread may sleep for longer than the specified time.
@@ -207,33 +191,23 @@ public class QueryMonitor implements Runnable {
           Thread.sleep(sleepTime);
           continue;
         }
-
-        // Query execution has taken more than the max time, Set queryExecutionStatus flag
-        // to canceled (TRUE).
-        boolean[] queryCompleted =
-            ((DefaultQuery) queryTask.query).getQueryCompletedForMonitoring();
+        // Query execution has taken more than the max time, Set queryCancelled flag
+        // to true.
+        boolean[] queryCompleted = queryTask.query.getQueryCompletedForMonitoring();
         synchronized (queryCompleted) {
-          if (!queryCompleted[0] && !((DefaultQuery) queryTask.query).isCqQuery()) { // Check if the
-                                                                                     // query is
-                                                                                     // already
-                                                                                     // completed.
-            ((DefaultQuery) queryTask.query).setCanceled(true,
-                new QueryExecutionTimeoutException(
-                    LocalizedStrings.QueryMonitor_LONG_RUNNING_QUERY_CANCELED
-                        .toLocalizedString(GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME)));
-            queryTask.queryExecutionStatus.set(Boolean.TRUE);
-            // Remove the task from queue.
+          // check if query is already completed
+          if (!queryCompleted[0]) {
+            queryTask.query.setCanceled(new QueryExecutionTimeoutException(String
+                .format("Query execution cancelled after exceeding max execution time %sms.",
+                    this.maxQueryExecutionTime)));
+            queryTask.queryCancelled.set(Boolean.TRUE);
+            // remove the query threads from monitoring queue
             queryThreads.poll();
+            logger.info(String.format(
+                "%s is set as canceled after %s milliseconds", queryTask.toString(),
+                executionTime));
           }
         }
-
-        logger.info(LocalizedMessage.create(
-            LocalizedStrings.GemFireCache_LONG_RUNNING_QUERY_EXECUTION_CANCELED,
-            new Object[] {queryTask.query.getQueryString(), queryTask.queryThread.getId()}));
-
-        if (logger.isDebugEnabled()) {
-          logger.debug("Query Execution for the thread {} got canceled.", queryTask.queryThread);
-        }
       }
     } catch (InterruptedException ignore) {
       if (logger.isDebugEnabled()) {
@@ -275,15 +249,15 @@ public class QueryMonitor implements Runnable {
   }
 
   private void cancelQueryDueToLowMemory(QueryThreadTask queryTask, long memoryThreshold) {
-    boolean[] queryCompleted = ((DefaultQuery) queryTask.query).getQueryCompletedForMonitoring();
+    boolean[] queryCompleted = queryTask.query.getQueryCompletedForMonitoring();
     synchronized (queryCompleted) {
       if (!queryCompleted[0]) {
         // cancel if query is not completed
         String reason = LocalizedStrings.QueryMonitor_LOW_MEMORY_CANCELED_QUERY
             .toLocalizedString(memoryThreshold);
-        ((DefaultQuery) queryTask.query).setCanceled(true,
+        queryTask.query.setCanceled(
             new QueryExecutionLowMemoryException(reason));
-        queryTask.queryExecutionStatus.set(Boolean.TRUE);
+        queryTask.queryCancelled.set(Boolean.TRUE);
       }
     }
   }
@@ -305,16 +279,16 @@ public class QueryMonitor implements Runnable {
     final Thread queryThread;
 
     // package-private to avoid synthetic accessor
-    final Query query;
+    final DefaultQuery query;
 
     // package-private to avoid synthetic accessor
-    final AtomicBoolean queryExecutionStatus;
+    final AtomicBoolean queryCancelled;
 
-    QueryThreadTask(Thread queryThread, Query query, AtomicBoolean queryExecutionStatus) {
+    QueryThreadTask(Thread queryThread, DefaultQuery query, AtomicBoolean queryCancelled) {
       this.StartTime = System.currentTimeMillis();
       this.queryThread = queryThread;
       this.query = query;
-      this.queryExecutionStatus = queryExecutionStatus;
+      this.queryCancelled = queryCancelled;
     }
 
     @Override
@@ -341,7 +315,7 @@ public class QueryMonitor implements Runnable {
       return new StringBuilder().append("QueryThreadTask[StartTime:").append(this.StartTime)
           .append(", queryThread:").append(this.queryThread).append(", threadId:")
           .append(this.queryThread.getId()).append(", query:").append(this.query.getQueryString())
-          .append(", queryExecutionStatus:").append(this.queryExecutionStatus).append(']')
+          .append(", queryCancelled:").append(this.queryCancelled).append(']')
           .toString();
     }
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
index 8a8c61c..76262fc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
@@ -241,7 +241,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
           String reason =
               LocalizedStrings.QueryMonitor_LOW_MEMORY_WHILE_GATHERING_RESULTS_FROM_PARTITION_REGION
                   .toLocalizedString();
-          query.setCanceled(true, new QueryExecutionLowMemoryException(reason));
+          query.setCanceled(new QueryExecutionLowMemoryException(reason));
         } else {
           if (logger.isDebugEnabled()) {
             logger.debug("query cancelled while gathering results, aborting due to exception "
@@ -762,7 +762,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
       String reason =
           LocalizedStrings.QueryMonitor_LOW_MEMORY_WHILE_GATHERING_RESULTS_FROM_PARTITION_REGION
               .toLocalizedString();
-      query.setCanceled(true, new QueryExecutionLowMemoryException(reason));
+      query.setCanceled(new QueryExecutionLowMemoryException(reason));
       if (DefaultQuery.testHook != null) {
         DefaultQuery.testHook.doTestHook(5);
       }
@@ -1103,7 +1103,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
           String reason =
               LocalizedStrings.QueryMonitor_LOW_MEMORY_WHILE_GATHERING_RESULTS_FROM_PARTITION_REGION
                   .toLocalizedString();
-          query.setCanceled(true, new QueryExecutionLowMemoryException(reason));
+          query.setCanceled(new QueryExecutionLowMemoryException(reason));
           this.abort = true;
         }
 
diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java
new file mode 100644
index 0000000..ea04c18
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.awaitility.Awaitility;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.geode.internal.cache.InternalCache;
+
+/**
+ * although max_execution_time is set as 10ms, the monitor thread can sleep more than the specified
+ * time, so query will be cancelled at un-deterministic time after 10ms. We cannot assert on
+ * specific time at which the query will be cancelled. We can only assert that the query will be
+ * cancelled at one point after 10ms.
+ */
+public class QueryMonitorTest {
+
+  private static InternalCache cache;
+  private static QueryMonitor monitor;
+  private static long max_execution_time = 5;
+
+  @BeforeClass
+  public static void setUp() {
+    cache = mock(InternalCache.class);
+    monitor = new QueryMonitor(cache, max_execution_time);
+    Thread monitorThread = new Thread(() -> monitor.run(), "query monitor thread");
+    monitorThread.setDaemon(true);
+    monitorThread.start();
+  }
+
+  @Test
+  public void queryIsCancelled() {
+    List<DefaultQuery> queries = new ArrayList<>();
+    List<Thread> threads = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      DefaultQuery query = new DefaultQuery("query" + i, cache, false);
+      queries.add(query);
+      Thread queryExecutionThread = createQueryExecutionThread(i);
+      threads.add(queryExecutionThread);
+      monitor.monitorQueryThread(queryExecutionThread, query);
+    }
+
+    for (DefaultQuery query : queries) {
+      // make sure the isCancelled flag in Query is set correctly
+      Awaitility.await().until(() -> query.isCanceled());
+    }
+    Awaitility.await().until(() -> monitor.getQueryMonitorThreadCount() == 0);
+    // make sure all thread died
+    for (Thread thread : threads) {
+      Awaitility.await().until(() -> !thread.isAlive());
+    }
+  }
+
+  @Test
+  public void cqQueryIsNotMonitored() {
+    DefaultQuery query = mock(DefaultQuery.class);
+    when(query.isCqQuery()).thenReturn(true);
+    monitor.monitorQueryThread(mock(Thread.class), query);
+    assertThat(monitor.getQueryMonitorThreadCount()).isEqualTo(0);
+  }
+
+  private Thread createQueryExecutionThread(int i) {
+    Thread thread = new Thread(() -> {
+      // make sure the threadlocal variable is updated
+      Awaitility.await().until(() -> QueryMonitor.isQueryExecutionCanceled());
+    });
+    thread.setName("query" + i);
+    return thread;
+  }
+
+}
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryMonitorDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryMonitorDUnitTest.java
index 4bcfd01..871c0b4 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryMonitorDUnitTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryMonitorDUnitTest.java
@@ -12,31 +12,23 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.geode.cache.query.dunit;
 
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.logging.log4j.Logger;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.geode.cache.AttributesFactory;
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.DiskStoreFactory;
 import org.apache.geode.cache.EvictionAction;
 import org.apache.geode.cache.EvictionAttributes;
-import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.cache.query.CqAttributes;
@@ -49,804 +41,211 @@ import org.apache.geode.cache.query.QueryService;
 import org.apache.geode.cache.query.cq.dunit.CqQueryTestListener;
 import org.apache.geode.cache.query.data.Portfolio;
 import org.apache.geode.cache.query.internal.DefaultQuery;
-import org.apache.geode.cache.query.internal.QueryMonitor;
-import org.apache.geode.cache.server.CacheServer;
-import org.apache.geode.cache30.CacheSerializableRunnable;
-import org.apache.geode.cache30.CacheTestCase;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.test.dunit.Assert;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.test.dunit.AsyncInvocation;
-import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.LogWriterUtils;
-import org.apache.geode.test.dunit.NetworkUtils;
-import org.apache.geode.test.dunit.SerializableRunnable;
-import org.apache.geode.test.dunit.ThreadUtils;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.rules.CacheRule;
-import org.apache.geode.test.dunit.rules.ClientCacheRule;
-import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
 import org.apache.geode.test.junit.categories.OQLQueryTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+import org.apache.geode.test.junit.rules.VMProvider;
 
 /**
- * Tests for QueryMonitoring service.
+ * These tests add a test hook to make sure query execution sleeps for at least 20 ms, so the
+ * queryMonitor thread will get a chance to cancel the query before it completes.
+ *
+ * The MAX_QUERY_EXECUTION_TIME is set as 1 ms, i.e, theoretically all queries will be cancelled
+ * after 1ms, but due to thread scheduling, this may not be true. we can only decrease the flakyness
+ * of the test by making MAX_QUERY_EXECUTION_TIME the smallest possible (1) and making the query
+ * execution time longer (but not too long to make the test run too slow).
  *
- * @since GemFire 6.0
  */
 @Category({OQLQueryTest.class})
-public class QueryMonitorDUnitTest implements Serializable {
-
-  private static final Logger logger = LogService.getLogger();
-
-  @Rule
-  public DistributedTestRule distributedTestRule = new DistributedTestRule();
-
+public class QueryMonitorDUnitTest {
+  private static int MAX_QUERY_EXECUTE_TIME = 1;
   @Rule
-  public CacheRule cacheRule = new CacheRule();
+  public ClusterStartupRule cluster = new ClusterStartupRule(5);
 
   @Rule
-  public ClientCacheRule clientCacheRule = new ClientCacheRule();
-
-  private final String exampleRegionName = "exampleRegion";
-
-
-  /* Some of the queries are commented out as they were taking less time */
-  String[] queryStr = {"SELECT ID FROM /exampleRegion p WHERE  p.ID > 100",
-      "SELECT DISTINCT * FROM /exampleRegion x, x.positions.values WHERE  x.pk != '1000'",
-      "SELECT DISTINCT * FROM /exampleRegion x, x.positions.values WHERE  x.pkid != '1'",
-      "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values WHERE  p.pk > '1'",
-      "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values WHERE  p.pkid != '53'",
-      "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE  pos.Id > 100",
-      "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE  pos.Id > 100 and pos.secId IN SET('YHOO', 'IBM', 'AMZN')",
-      "SELECT * FROM /exampleRegion p WHERE  p.ID > 100 and p.status = 'active' and p.ID < 100000",
-      "SELECT * FROM /exampleRegion WHERE  ID > 100 and status = 'active'",
-      "SELECT DISTINCT * FROM /exampleRegion p WHERE  p.ID > 100 and p.status = 'active' and p.ID < 100000",
-      "SELECT DISTINCT ID FROM /exampleRegion WHERE  status = 'active'",
-      "SELECT DISTINCT ID FROM /exampleRegion p WHERE  p.status = 'active'",
-      "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE  pos.secId IN SET('YHOO', 'IBM', 'AMZN')",
-      "SELECT DISTINCT proj1:p, proj2:itrX FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos"
-          + " WHERE  pos.secId = 'YHOO') as itrX",
-      "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos"
-          + " WHERE  pos.secId = 'YHOO') as itrX",
-      "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT p.ID FROM /exampleRegion x"
-          + " WHERE  x.ID = p.ID) as itrX",
-      "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion x, x.positions.values pos"
-          + " WHERE  x.ID = p.ID) as itrX",
-      "SELECT DISTINCT x.ID FROM /exampleRegion x, x.positions.values v WHERE  "
-          + "v.secId = element(SELECT DISTINCT vals.secId FROM /exampleRegion p, p.positions.values vals WHERE  vals.secId = 'YHOO')",
-      "SELECT DISTINCT * FROM /exampleRegion p, positions.values pos WHERE   (p.ID > 1 or p.status = 'active') or (true AND pos.secId ='IBM')",
-      "SELECT DISTINCT * FROM /exampleRegion p, positions.values pos WHERE   (p.ID > 1 or p.status = 'active') or (true AND pos.secId !='IBM')",
-      "SELECT DISTINCT structset.sos, structset.key "
-          + "FROM /exampleRegion p, p.positions.values outerPos, "
-          + "(SELECT DISTINCT key: key, sos: pos.sharesOutstanding "
-          + "FROM /exampleRegion.entries pf, pf.value.positions.values pos "
-          + "where outerPos.secId != 'IBM' AND "
-          + "pf.key IN (SELECT DISTINCT * FROM pf.value.collectionHolderMap['0'].arr)) structset "
-          + "where structset.sos > 2000",
-      "SELECT DISTINCT * " + "FROM /exampleRegion p, p.positions.values outerPos, "
-          + "(SELECT DISTINCT key: key, sos: pos.sharesOutstanding "
-          + "FROM /exampleRegion.entries pf, pf.value.positions.values pos "
-          + "where outerPos.secId != 'IBM' AND "
-          + "pf.key IN (SELECT DISTINCT * FROM pf.value.collectionHolderMap['0'].arr)) structset "
-          + "where structset.sos > 2000",
-      "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values position "
-          + "WHERE (true = null OR position.secId = 'SUN') AND true",};
-
-  String[] prQueryStr = {
-      "SELECT ID FROM /exampleRegion p WHERE  p.ID > 100 and p.status = 'active'",
-      "SELECT * FROM /exampleRegion WHERE  ID > 100 and status = 'active'",
-      "SELECT DISTINCT * FROM /exampleRegion p WHERE   p.ID > 100 and p.status = 'active' and p.ID < 100000",
-      "SELECT DISTINCT p.ID FROM /exampleRegion p WHERE p.ID > 100 and p.ID < 100000 and p.status = 'active'",
-      "SELECT DISTINCT * FROM /exampleRegion p, positions.values pos WHERE (p.ID > 1 or p.status = 'active') or (pos.secId != 'IBM')",};
-
-  private int numServers;
-
-  @After
-  public final void preTearDownCacheTestCase() throws Exception {
-    Host host = Host.getHost(0);
-    // shut down clients before servers
-    for (int i = numServers; i < 4; i++) {
-      host.getVM(i).invoke(() -> CacheTestCase.disconnectFromDS());
-    }
-  }
-
-  public void setup(int numServers) throws Exception {
-    Host host = Host.getHost(0);
-    this.numServers = numServers;
-  }
-
-  public void createExampleRegion() {
-    createExampleRegion(false, null);
-  }
-
-  private void createExampleRegion(final boolean eviction, final String dirName) {
-    RegionFactory regionFactory =
-        cacheRule.getCache().createRegionFactory(RegionShortcut.REPLICATE);
-
-    // setting the eviction attributes.
-    if (eviction) {
-      File[] f = new File[1];
-      f[0] = new File(dirName);
-      f[0].mkdir();
-      DiskStoreFactory dsf = cacheRule.getCache().createDiskStoreFactory();
-      dsf.setDiskDirs(f).create("ds1");
-      EvictionAttributes evictAttrs =
-          EvictionAttributes.createLRUEntryAttributes(100, EvictionAction.OVERFLOW_TO_DISK);
-      regionFactory.setDiskStoreName("ds1").setEvictionAttributes(evictAttrs);
-    }
-    regionFactory.create(exampleRegionName);
-  }
-
-  private void createExamplePRRegion() {
-    RegionFactory regionFactory =
-        cacheRule.getCache().createRegionFactory(RegionShortcut.PARTITION);
-
-    AttributesFactory factory = new AttributesFactory();
-    // factory.setDataPolicy(DataPolicy.PARTITION);
-    regionFactory
-        .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(8).create());
-    regionFactory.create(exampleRegionName);
-  }
-
-  private int configServer(final int queryMonitorTime, final String testName) throws IOException {
-    cacheRule.createCache();
-    CacheServer cacheServer = cacheRule.getCache().addCacheServer();
-    cacheServer.setPort(0);
-    cacheServer.start();
-    GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = queryMonitorTime;
-    cacheRule.getCache().getLogger().fine("#### RUNNING TEST : " + testName);
-    DefaultQuery.testHook = new QueryTimeoutHook(queryMonitorTime);
-    return cacheServer.getPort();
-  }
-
-  // Stop server
-  private void stopServer(VM server) {
-    SerializableRunnable stopServer = new SerializableRunnable("Stop CacheServer") {
-      public void run() {
-        // Reset the test flag.
-        Cache cache = cacheRule.getCache();
-        DefaultQuery.testHook = null;
-        GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = -1;
-        stopBridgeServer(cacheRule.getCache());
-      }
-    };
-    server.invoke(stopServer);
-  }
-
-  private void configClient(String host, int... ports) {
-    configClient(false, host, ports);
-  }
-
-  private void configClient(boolean enableSubscriptions, String host, int... ports) {
-    ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
-    for (int port : ports) {
-      clientCacheFactory.addPoolServer(host, port);
-    }
-    clientCacheFactory.setPoolSubscriptionEnabled(true);
-    clientCacheFactory.setPoolReadTimeout(10 * 60 * 1000); // 10 mins
-    clientCacheRule.createClientCache(clientCacheFactory);
-  }
+  public GfshCommandRule gfsh = new GfshCommandRule();
 
-  private void verifyException(Exception e) {
-    e.printStackTrace();
-    String error = e.getMessage();
-    if (e.getCause() != null) {
-      error = e.getCause().getMessage();
-    }
+  private MemberVM locator, server1, server2;
+  private ClientVM client3, client4;
 
-    if (error.contains("Query execution cancelled after exceeding max execution time")
-        || error.contains("The Query completed sucessfully before it got canceled")
-        || error.contains("The QueryMonitor thread may be sleeping longer than the set sleep time")
-        || error.contains(
-            "The query task could not be found but the query is marked as having been canceled")) {
-      // Expected exception.
-      return;
-    }
-
-    System.out.println("Unexpected exception:");
-    if (e.getCause() != null) {
-      e.getCause().printStackTrace();
-    } else {
-      e.printStackTrace();
-    }
+  @Before
+  public void setUpServers() throws Exception {
+    locator = cluster.startLocatorVM(0, l -> l.withoutClusterConfigurationService());
+    server1 = cluster.startServerVM(1, locator.getPort());
+    server2 = cluster.startServerVM(2, locator.getPort());
 
-    fail("Expected exception Not found. Expected exception with message: \n"
-        + "\"Query execution taking more than the max execution time\"" + "\n Found \n" + error);
+    // configure the server to make the query to wait for at least 1 second in every execution spot
+    // and set a MAX_QUERY_EXECUTION_TIME to be 10ms
+    VMProvider.invokeInEveryMember(() -> {
+      DefaultQuery.testHook = new QueryTimeoutHook();
+      GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = MAX_QUERY_EXECUTE_TIME;
+    }, server1, server2);
+    gfsh.connectAndVerify(locator);
   }
 
-  /**
-   * Tests query execution from client to server (single server).
-   */
   @Test
-  public void testQueryMonitorClientServer() throws Exception {
-
-    setup(1);
-
-    final Host host = Host.getHost(0);
-
-    VM server = host.getVM(0);
-    VM client1 = host.getVM(1);
-    VM client2 = host.getVM(2);
-    VM client3 = host.getVM(3);
-
-    final int numberOfEntries = 100;
-    String serverHostName = NetworkUtils.getServerHostName(host);
-
-    // Start server
-    int serverPort = server.invoke("Create BridgeServer",
-        () -> configServer(10, "testQueryMonitorClientServer")); // All the queries taking more than
-                                                                 // 20ms should be canceled by Query
-                                                                 // monitor.
-    server.invoke("createExampleRegion", () -> createExampleRegion());
-
-    // Initialize server regions.
-    server.invoke("populatePortfolioRegions", () -> populatePortfolioRegions(numberOfEntries));
-
-    // Initialize Client1
-    client1.invoke("Init client", () -> configClient(serverHostName, serverPort));
-
-    // Initialize Client2
-    client2.invoke("Init client", () -> configClient(serverHostName, serverPort));
+  public void testMultipleClientToOneServer() throws Exception {
+    int server1Port = server1.getPort();
+    client3 = cluster.startClientVM(3, true, server1Port);
+    client4 = cluster.startClientVM(4, true, server1Port);
 
-    // Initialize Client3
-    client3.invoke("Init client", () -> configClient(serverHostName, serverPort));
+    gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE")
+        .statusIsSuccess();
 
-    // Execute client queries
+    locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2);
+    server1.invoke(() -> populateRegion(0, 100));
 
-    client1.invoke("execute Queries", () -> executeQueriesFromClient(10));
-    client2.invoke("execute Queries", () -> executeQueriesFromClient(10));
-    client3.invoke("execute Queries", () -> executeQueriesFromClient(10));
-
-    stopServer(server);
-  }
-
-  private void executeQueriesFromClient(int timeout) {
-    GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = timeout;
-    QueryService queryService = clientCacheRule.getClientCache().getQueryService();
-    executeQueriesAgainstQueryService(queryService);
-  }
-
-  private void executeQueriesOnServer() {
-    QueryService queryService = GemFireCacheImpl.getInstance().getQueryService();
-    executeQueriesAgainstQueryService(queryService);
+    // execute the query
+    VMProvider.invokeInEveryMember(() -> exuteQuery(), client3, client4);
   }
 
-  private void executeQueriesAgainstQueryService(QueryService queryService) {
-    for (int k = 0; k < queryStr.length; k++) {
-      String qStr = queryStr[k];
-      executeQuery(queryService, qStr);
-    }
-  }
-
-  private void executeQuery(QueryService queryService, String qStr) {
-    try {
-      logger.info("Executing query :" + qStr);
-      Query query = queryService.newQuery(qStr);
-      query.execute();
-      fail("The query should have been canceled by the QueryMonitor. Query: " + qStr);
-    } catch (Exception e) {
-      verifyException(e);
-    }
-  }
-
-  /**
-   * Tests query execution from client to server (multi server).
-   */
   @Test
-  public void testQueryMonitorMultiClientMultiServer() throws Exception {
-
-    setup(2);
-
-    final Host host = Host.getHost(0);
-
-    VM server1 = host.getVM(0);
-    VM server2 = host.getVM(1);
-    VM client1 = host.getVM(2);
-    VM client2 = host.getVM(3);
-
-    final int numberOfEntries = 100;
-
-    String serverHostName = NetworkUtils.getServerHostName(host);
-
-    // Start server
-    int serverPort1 = server1.invoke("Create BridgeServer",
-        () -> configServer(20, "testQueryMonitorMultiClientMultiServer"));// All the queries taking
-                                                                          // more than 20ms should
-                                                                          // be canceled by Query
-                                                                          // monitor.
-    server1.invoke("createExampleRegion", () -> createExampleRegion());
+  public void testOneClientToMultipleServerOnReplicateRegion() throws Exception {
+    int server1Port = server1.getPort();
+    int server2Port = server2.getPort();
+    client3 = cluster.startClientVM(3, true, server1Port, server2Port);
 
-    int serverPort2 = server2.invoke("Create BridgeServer",
-        () -> configServer(20, "testQueryMonitorMultiClientMultiServer"));// All the queries taking
-                                                                          // more than 20ms should
-                                                                          // be canceled by Query
-                                                                          // monitor.
-    server2.invoke("createExampleRegion", () -> createExampleRegion());
+    gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE")
+        .statusIsSuccess();
 
-    // Initialize server regions.
-    server1.invoke("Create Bridge Server", () -> populatePortfolioRegions(numberOfEntries));
+    locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2);
+    server1.invoke(() -> populateRegion(0, 100));
 
-    // Initialize server regions.
-    server2.invoke("Create Bridge Server", () -> populatePortfolioRegions(numberOfEntries));
-
-    // Initialize Client1 and create client regions.
-    client1.invoke("Init client", () -> configClient(serverHostName, serverPort1, serverPort2));
-    // client1.invoke("createExampleRegion", () -> createExampleRegion());
-
-    // Initialize Client2 and create client regions.
-    client2.invoke("Init client", () -> configClient(serverHostName, serverPort1, serverPort2));
-    // client2.invoke("createExampleRegion", () -> createExampleRegion());
-
-    // Execute client queries
-
-    client1.invoke("executeQueriesFromClient", () -> executeQueriesFromClient(20));
-    client2.invoke("executeQueriesFromClient", () -> executeQueriesFromClient(20));
-
-    stopServer(server1);
-    stopServer(server2);
+    // execute the query from client3
+    client3.invoke(() -> exuteQuery());
   }
 
-  /**
-   * Tests query execution on local vm.
-   */
   @Test
-  public void testQueryExecutionLocally() throws Exception {
-
-    setup(2);
-
-    final Host host = Host.getHost(0);
-
-    VM server1 = host.getVM(0);
-    VM server2 = host.getVM(1);
-
-    final int numberOfEntries = 100;
-
-    // Start server
-    server1.invoke("Create BridgeServer", () -> configServer(20, "testQueryExecutionLocally"));// All
-                                                                                               // the
-                                                                                               // queries
-                                                                                               // taking
-                                                                                               // more
-                                                                                               // than
-                                                                                               // 20ms
-                                                                                               // should
-                                                                                               // be
-                                                                                               // canceled
-                                                                                               // by
-                                                                                               // Query
-                                                                                               // monitor.
-    server1.invoke("createExampleRegion", () -> createExampleRegion());
-
-    server2.invoke("Create BridgeServer", () -> configServer(20, "testQueryExecutionLocally"));// All
-                                                                                               // the
-                                                                                               // queries
-                                                                                               // taking
-                                                                                               // more
-                                                                                               // than
-                                                                                               // 20ms
-                                                                                               // should
-                                                                                               // be
-                                                                                               // canceled
-                                                                                               // by
-                                                                                               // Query
-                                                                                               // monitor.
-    server2.invoke("createExampleRegion", () -> createExampleRegion());
-
-    // Initialize server regions.
-    server1.invoke("Create Bridge Server", () -> populatePortfolioRegions(numberOfEntries));
-
-    // Initialize server regions.
-    server2.invoke("Create Bridge Server", () -> populatePortfolioRegions(numberOfEntries));
-
-    // Execute server queries
-
-    server1.invoke("execute queries on Server", () -> executeQueriesOnServer());
-    server2.invoke("execute queries on Server", () -> executeQueriesOnServer());
-
-    stopServer(server1);
-    stopServer(server2);
-  }
+  public void testOneClientToOneServerOnPartitionedRegion() throws Exception {
+    // client3 connects to server1, client4 connects to server2
+    int server1Port = server1.getPort();
+    int server2Port = server2.getPort();
+    client3 = cluster.startClientVM(3, true, server1Port);
+    client4 = cluster.startClientVM(4, true, server2Port);
 
-  /**
-   * Tests query execution on local vm.
-   */
-  @Test
-  public void testQueryExecutionLocallyAndCacheOp() throws Exception {
-
-    setup(2);
-
-    final Host host = Host.getHost(0);
-
-    VM server1 = host.getVM(0);
-    VM server2 = host.getVM(1);
-
-    final int numberOfEntries = 1000;
-
-    // Start server
-    server1.invoke("Create BridgeServer", () -> configServer(20, "testQueryExecutionLocally"));// All
-                                                                                               // the
-                                                                                               // queries
-                                                                                               // taking
-                                                                                               // more
-                                                                                               // than
-                                                                                               // 20ms
-                                                                                               // should
-                                                                                               // be
-                                                                                               // canceled
-                                                                                               // by
-                                                                                               // Query
-                                                                                               // monitor.
-    server1.invoke("createExampleRegion", () -> createExampleRegion());
-
-    server2.invoke("Create BridgeServer", () -> configServer(20, "testQueryExecutionLocally"));// All
-                                                                                               // the
-                                                                                               // queries
-                                                                                               // taking
-                                                                                               // more
-                                                                                               // than
-                                                                                               // 20ms
-                                                                                               // should
-                                                                                               // be
-                                                                                               // canceled
-                                                                                               // by
-                                                                                               // Query
-                                                                                               // monitor.
-    server2.invoke("createExampleRegion", () -> createExampleRegion());
-
-    // Initialize server regions.
-    server1.invoke("populatePortfolioRegions", () -> populatePortfolioRegions(numberOfEntries));
-
-    // Initialize server regions.
-    server2.invoke("populatePortfolioRegions", () -> populatePortfolioRegions(numberOfEntries));
-
-    // Execute server queries
-    SerializableRunnable executeQuery = new CacheSerializableRunnable("Execute queries") {
-      public void run2() throws CacheException {
-        try {
-          QueryService queryService = GemFireCacheImpl.getInstance().getQueryService();
-          String qStr =
-              "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion x, x.positions.values pos"
-                  + " WHERE  x.ID = p.ID) as itrX";
-          executeQuery(queryService, qStr);
-
-          // Create index and Perform cache op. Bug#44307
-          queryService.createIndex("idIndex", "ID", "/exampleRegion");
-          queryService.createIndex("statusIndex", "status", "/exampleRegion");
-          Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);
-          for (int i = (1 + 100); i <= (numberOfEntries + 200); i++) {
-            exampleRegion.put("" + i, new Portfolio(i));
-          }
-
-        } catch (Exception ex) {
-          Assert.fail("Exception creating the query service", ex);
-        }
-      }
-    };
-
-    server1.invoke(executeQuery);
-    server2.invoke(executeQuery);
+    gfsh.executeAndAssertThat("create region --name=exampleRegion --type=PARTITION")
+        .statusIsSuccess();
 
-    stopServer(server1);
-    stopServer(server2);
-  }
+    locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2);
+    server1.invoke(() -> populateRegion(0, 100));
+    server2.invoke(() -> populateRegion(100, 200));
 
-  private void populatePortfolioRegions(int numberOfEntries) {
-    Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);;
-    for (int i = (1 + 100); i <= (numberOfEntries + 100); i++) {
-      exampleRegion.put("" + i, new Portfolio(i));
-    }
+    client3.invoke(() -> exuteQuery());
+    client4.invoke(() -> exuteQuery());
   }
 
-  /**
-   * Tests query execution from client to server (multiple server) on Partition Region .
-   */
   @Test
-  public void testQueryMonitorOnPR() throws Exception {
-
-    setup(2);
-
-    final Host host = Host.getHost(0);
-
-    VM server1 = host.getVM(0);
-    VM server2 = host.getVM(1);
-    VM client1 = host.getVM(2);
-    VM client2 = host.getVM(3);
-
-    final int numberOfEntries = 100;
-
-    String serverHostName = NetworkUtils.getServerHostName(host);
-
-    // Start server
-    int serverPort1 = server1.invoke("configServer",
-        () -> configServer(20, "testQueryMonitorMultiClientMultiServerOnPR"));// All the queries
-                                                                              // taking more than
-                                                                              // 100ms should be
-                                                                              // canceled by Query
-                                                                              // monitor.
-    server1.invoke("createExamplePRRegion", () -> createExamplePRRegion());
-
-    int serverPort2 = server2.invoke("configServer",
-        () -> configServer(20, "testQueryMonitorMultiClientMultiServerOnPR"));// All the queries
-                                                                              // taking more than
-                                                                              // 100ms should be
-                                                                              // canceled by Query
-                                                                              // monitor.
-    server2.invoke("createExamplePRRegion", () -> createExamplePRRegion());
-
-    // Initialize server regions.
-    server1.invoke("bulkInsertPorfolio", () -> bulkInsertPorfolio(101, numberOfEntries));
-
-    // Initialize server regions.
-    server2.invoke("bulkInsertPorfolio", () -> bulkInsertPorfolio((numberOfEntries + 100),
-        (numberOfEntries + numberOfEntries + 100)));
-
-    // Initialize Client1
-    client1.invoke("Init client", () -> configClient(serverHostName, serverPort1));
-
-    // Initialize Client2
-    client2.invoke("Init client", () -> configClient(serverHostName, serverPort2));
-
-    // Execute client queries
-
-    client1.invoke("Execute Queries", () -> executeQueriesFromClient(20));
-    client2.invoke("Execute Queries", () -> executeQueriesFromClient(20));
-
-    stopServer(server1);
-    stopServer(server2);
-  }
+  public void testQueryExecutionFromServerAndPerformCacheOp() throws Exception {
+    gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE")
+        .statusIsSuccess();
+
+    locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2);
+    server1.invoke(() -> populateRegion(0, 100));
+
+    // execute the query from one server
+    server1.invoke(() -> exuteQuery());
+
+    // Create index and Perform cache op. Bug#44307
+    server1.invoke(() -> {
+      QueryService queryService = ClusterStartupRule.getCache().getQueryService();
+      queryService.createIndex("idIndex", "ID", "/exampleRegion");
+      queryService.createIndex("statusIndex", "status", "/exampleRegion");
+      populateRegion(100, 10);
+    });
 
-  private void bulkInsertPorfolio(int startingId, int numberOfEntries) {
-    Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);
-    for (int i = startingId; i <= (numberOfEntries + 100); i++) {
-      exampleRegion.put("" + i, new Portfolio(i));
-    }
+    // destroy indices created in this test
+    gfsh.executeAndAssertThat("destroy index --region=/exampleRegion").statusIsSuccess();
   }
 
-  /**
-   * Tests query execution on Partition Region, executes query locally.
-   */
   @Test
-  public void testQueryMonitorWithLocalQueryOnPR() throws Exception {
-
-    setup(2);
-
-    final Host host = Host.getHost(0);
-
-    VM server1 = host.getVM(0);
-    VM server2 = host.getVM(1);
-
-    final int numberOfEntries = 100;
-
-    // Start server
-    server1.invoke("configServer",
-        () -> configServer(20, "testQueryMonitorMultiClientMultiServerOnPR"));// All the queries
-                                                                              // taking more than
-                                                                              // 100ms should be
-                                                                              // canceled by Query
-                                                                              // monitor.
-    server1.invoke("Create Partition Regions", () -> createExamplePRRegion());
-
-    server2.invoke("configServer",
-        () -> configServer(20, "testQueryMonitorMultiClientMultiServerOnPR"));// All the queries
-                                                                              // taking more than
-                                                                              // 100ms should be
-                                                                              // canceled by Query
-                                                                              // monitor.
-    server2.invoke("Create Partition Regions", () -> createExamplePRRegion());
-
-    // Initialize server regions.
-    server1.invoke(new CacheSerializableRunnable("Create Bridge Server") {
-      public void run2() throws CacheException {
-        bulkInsertPorfolio(101, numberOfEntries);
-      }
-    });
-
-    // Initialize server regions.
-    server2.invoke(new CacheSerializableRunnable("Create Bridge Server") {
-      public void run2() throws CacheException {
-        bulkInsertPorfolio((numberOfEntries + 100), (numberOfEntries + numberOfEntries + 100));
-      }
-    });
+  public void testQueryExecutionFromServerOnPartitionedRegion() throws Exception {
+    gfsh.executeAndAssertThat("create region --name=exampleRegion --type=PARTITION")
+        .statusIsSuccess();
 
-    // Execute client queries
-    server1.invoke("execute queries on server", () -> executeQueriesOnServer());
-    server2.invoke("execute queries on server", () -> executeQueriesOnServer());
+    locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2);
+    server1.invoke(() -> populateRegion(100, 200));
+    server2.invoke(() -> populateRegion(200, 300));
 
-    stopServer(server1);
-    stopServer(server2);
+    // execute the query from one server
+    server1.invoke(() -> exuteQuery());
+    server2.invoke(() -> exuteQuery());
   }
 
-  /**
-   * Tests query execution from client to server (multiple server) with eviction to disk.
-   */
   @Test
-  public void testQueryMonitorRegionWithEviction() throws CacheException {
-
-    final Host host = Host.getHost(0);
-
-    VM server1 = host.getVM(0);
-    VM server2 = host.getVM(1);
-    VM client1 = host.getVM(2);
-    VM client2 = host.getVM(3);
-
-    final int numberOfEntries = 100;
-
-    String serverHostName = NetworkUtils.getServerHostName(host);
-
-    // Start server
-    int serverPort1 = server1.invoke("Create BridgeServer",
-        () -> configServer(20, "testQueryMonitorRegionWithEviction"));// All the queries taking more
-                                                                      // than 20ms should be
-                                                                      // canceled by Query monitor.
-    server1.invoke("createExampleRegion",
-        () -> createExampleRegion(true, "server1_testQueryMonitorRegionWithEviction"));
-
-    int serverPort2 = server2.invoke("Create BridgeServer",
-        () -> configServer(20, "testQueryMonitorRegionWithEviction"));// All the queries taking more
-                                                                      // than 20ms should be
-                                                                      // canceled by Query monitor.
-    server2.invoke("createExampleRegion",
-        () -> createExampleRegion(true, "server2_testQueryMonitorRegionWithEviction"));
-
-    // Initialize server regions.
-    server1.invoke(new CacheSerializableRunnable("Create Bridge Server") {
-      public void run2() throws CacheException {
-        bulkInsertPorfolio(101, numberOfEntries);
-      }
+  public void testQueryMonitorRegionWithEviction() throws Exception {
+    File server1WorkingDir = server1.getWorkingDir();
+    File server2WorkingDir = server2.getWorkingDir();
+    server1.invoke(() -> createReplicateRegionWithEviction(server1WorkingDir));
+    server2.invoke(() -> createReplicateRegionWithEviction(server2WorkingDir));
+    server1.invoke(() -> populateRegion(0, 100));
+    server2.invoke(() -> populateRegion(100, 200));
+
+    // client3 connects to server1, client4 connects to server2
+    int server1Port = server1.getPort();
+    int server2Port = server2.getPort();
+    client3 = cluster.startClientVM(3, ccf -> {
+      configureClientCacheFactory(ccf, server1Port);
     });
 
-    // Initialize server regions.
-    server2.invoke(new CacheSerializableRunnable("Create Bridge Server") {
-      public void run2() throws CacheException {
-        bulkInsertPorfolio((numberOfEntries + 100), (numberOfEntries + numberOfEntries + 100));
-      }
+    client4 = cluster.startClientVM(4, ccf -> {
+      configureClientCacheFactory(ccf, server2Port);
     });
-
-    // Initialize Client1 and create client regions.
-    client1.invoke("Init client", () -> configClient(serverHostName, serverPort1));
-
-    // Initialize Client2 and create client regions.
-    client2.invoke("Init client", () -> configClient(serverHostName, serverPort2));
-
-    // Execute client queries
-    client1.invoke("Execute Queries", () -> executeQueriesFromClient(20));
-    client2.invoke("Execute Queries", () -> executeQueriesFromClient(20));
-
-    stopServer(server1);
-    stopServer(server2);
+    client3.invoke(() -> exuteQuery());
+    client4.invoke(() -> exuteQuery());
   }
 
-  /**
-   * Tests query execution on region with indexes.
-   */
   @Test
   public void testQueryMonitorRegionWithIndex() throws Exception {
-
-    setup(2);
-
-    final Host host = Host.getHost(0);
-
-    VM server1 = host.getVM(0);
-    VM server2 = host.getVM(1);
-    VM client1 = host.getVM(2);
-    VM client2 = host.getVM(3);
-
-    final int numberOfEntries = 100;
-
-    String serverHostName = NetworkUtils.getServerHostName(host);
-
-    // Start server
-    int serverPort1 =
-        server1.invoke("configServer", () -> configServer(20, "testQueryMonitorRegionWithIndex"));// All
-                                                                                                  // the
-                                                                                                  // queries
-                                                                                                  // taking
-                                                                                                  // more
-                                                                                                  // than
-                                                                                                  // 20ms
-                                                                                                  // should
-                                                                                                  // be
-                                                                                                  // canceled
-                                                                                                  // by
-                                                                                                  // Query
-                                                                                                  // monitor.
-    server1.invoke("createExampleRegion", () -> createExampleRegion());
-
-    int serverPort2 =
-        server2.invoke("configServer", () -> configServer(20, "testQueryMonitorRegionWithIndex"));// All
-                                                                                                  // the
-                                                                                                  // queries
-                                                                                                  // taking
-                                                                                                  // more
-                                                                                                  // than
-                                                                                                  // 20ms
-                                                                                                  // should
-                                                                                                  // be
-                                                                                                  // canceled
-                                                                                                  // by
-                                                                                                  // Query
-                                                                                                  // monitor.
-    server2.invoke("createExampleRegion", () -> createExampleRegion());
-
-    // Initialize server regions.
-    server1.invoke("Create Indexes", () -> createIndexes(numberOfEntries));
-
-    // Initialize server regions.
-    server2.invoke("Create Indexes", () -> createIndexes(numberOfEntries));
-
-    // Initialize Client1
-    client1.invoke("Init client", () -> configClient(serverHostName, serverPort1));
-
-    // Initialize Client2
-    client2.invoke("Init client", () -> configClient(serverHostName, serverPort2));
-
-    // Execute client queries
-    client1.invoke("executeQueriesFromClient", () -> executeQueriesFromClient(20));
-    client2.invoke("executeQueriesFromClient", () -> executeQueriesFromClient(20));
-
-    stopServer(server1);
-    stopServer(server2);
-  }
-
-  private void createIndexes(int numberOfEntries) throws Exception {
-    Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);
-
-    // create index.
-    QueryService cacheQS = cacheRule.getCache().getQueryService();
-    cacheQS.createIndex("idIndex", "p.ID", "/exampleRegion p");
-    cacheQS.createIndex("statusIndex", "p.status", "/exampleRegion p");
-    cacheQS.createIndex("secIdIndex", "pos.secId", "/exampleRegion p, p.positions.values pos");
-    cacheQS.createIndex("posIdIndex", "pos.Id", "/exampleRegion p, p.positions.values pos");
-    cacheQS.createKeyIndex("pkIndex", "pk", "/exampleRegion");
-    cacheQS.createKeyIndex("pkidIndex", "pkid", "/exampleRegion");
-
-    for (int i = (1 + 100); i <= (numberOfEntries + 100); i++) {
-      exampleRegion.put("" + i, new Portfolio(i));
-    }
+    gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE")
+        .statusIsSuccess();
+    locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2);
+
+    // create the indices using API
+    VMProvider.invokeInEveryMember(() -> {
+      // create index.
+      QueryService cacheQS = ClusterStartupRule.getCache().getQueryService();
+      cacheQS.createIndex("idIndex", "p.ID", "/exampleRegion p");
+      cacheQS.createIndex("statusIndex", "p.status", "/exampleRegion p");
+      cacheQS.createIndex("secIdIndex", "pos.secId", "/exampleRegion p, p.positions.values pos");
+      cacheQS.createIndex("posIdIndex", "pos.Id", "/exampleRegion p, p.positions.values pos");
+      cacheQS.createKeyIndex("pkIndex", "pk", "/exampleRegion");
+      cacheQS.createKeyIndex("pkidIndex", "pkid", "/exampleRegion");
+      populateRegion(0, 150);
+    }, server1, server2);
+
+    // client3 connects to server1, client4 connects to server2
+    int server1Port = server1.getPort();
+    int server2Port = server2.getPort();
+    client3 = cluster.startClientVM(3, true, server1Port);
+    client4 = cluster.startClientVM(4, true, server2Port);
+
+    client3.invoke(() -> exuteQuery());
+    client4.invoke(() -> exuteQuery());
   }
 
-  /**
-   * The following CQ test is added to make sure testMaxQueryExecutionTime is reset and is not
-   * affecting other query related tests.
-   *
-   */
   @Test
   public void testCqExecuteDoesNotGetAffectedByTimeout() throws Exception {
-    setup(1);
-
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
-    VM producerClient = host.getVM(2);
-
-    // Start server
-    int serverPort =
-        server.invoke("configServer", () -> configServer(20, "testQueryMonitorRegionWithIndex"));// All
-    server.invoke("createExampleRegion", () -> createExampleRegion());
-
-
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
-
-    // Create client.
-    client.invoke("createClient", () -> configClient(true, host.getHostName(), serverPort));
-
-    final int size = 10;
-    final String name = "testQuery_4";
-    server.invoke(() -> {
-      Region region = cacheRule.getCache().getRegion(exampleRegionName);
-      for (int i = 1; i <= size; i++) {
-        region.put("key" + i, new Portfolio(i));
-      }
+    gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE")
+        .statusIsSuccess();
+    locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2);
+    server1.invoke(() -> populateRegion(0, 100));
+
+    int server1Port = server1.getPort();
+    client3 = cluster.startClientVM(3, ccf -> {
+      configureClientCacheFactory(ccf, server1Port);
     });
 
-    // create and execute cq
-    client.invoke(() -> {
+    client3.invoke(() -> {
       String cqName = "testCQForQueryMonitorDUnitTest";
-      String query = "select * from /" + exampleRegionName;
+      String query = "select * from /exampleRegion";
       // Get CQ Service.
-      QueryService cqService = null;
-      cqService = clientCacheRule.getClientCache().getQueryService();
+      QueryService cqService = ClusterStartupRule.getClientCache().getQueryService();
 
       // Create CQ Attributes.
       CqAttributesFactory cqf = new CqAttributesFactory();
@@ -857,278 +256,212 @@ public class QueryMonitorDUnitTest implements Serializable {
       CqQuery cq1 = cqService.newCq(cqName, query, cqa);
       cq1.execute();
     });
+
+    server1.invoke(() -> {
+      populateRegion(0, 150);
+    });
   }
 
   @Test
-  public void testCqProcessingDoesNotGetAffectedByTimeout() throws Exception {
-    setup(1);
-
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
-    VM producerClient = host.getVM(2);
-
-    // Start server
-    int serverPort =
-        server.invoke("configServer", () -> configServer(20, "testQueryMonitorRegionWithIndex"));// All
-    server.invoke("createExampleRegion", () -> createExampleRegion());
-
-
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
-
-    // Create client.
-    client.invoke("createClient", () -> configClient(true, host.getHostName(), serverPort));
+  public void testCacheOpAfterQueryCancel() throws Exception {
+    int locatorPort = locator.getPort();
+    // start up more servers
+    MemberVM server3 = cluster.startServerVM(3, locatorPort);
 
-    final int size = 10;
-    final String name = "testQuery_4";
-    server.invoke(() -> {
-      Region region = cacheRule.getCache().getRegion(exampleRegionName);
-      for (int i = 1; i <= size; i++) {
-        region.put("key" + i, new Portfolio(i));
-      }
+    server3.invoke(() -> {
+      DefaultQuery.testHook = new QueryTimeoutHook();
+      GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = MAX_QUERY_EXECUTE_TIME;
     });
 
-    // create and execute cq
-    client.invoke(() -> {
-      String cqName = "testCQForQueryMonitorDUnitTest";
-      String query = "select * from /" + exampleRegionName;
-      // Get CQ Service.
-      QueryService cqService = null;
-      cqService = clientCacheRule.getClientCache().getQueryService();
-
-      // Create CQ Attributes.
-      CqAttributesFactory cqf = new CqAttributesFactory();
-      CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())};
-      cqf.initCqListeners(cqListeners);
-      CqAttributes cqa = cqf.create();
+    gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE")
+        .statusIsSuccess();
+    locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 3);
 
-      CqQuery cq1 = cqService.newCq(cqName, query, cqa);
-      cq1.execute();
+    server1.invoke(() -> {
+      QueryService queryService = ClusterStartupRule.getCache().getQueryService();
+      queryService.createIndex("statusIndex", "status", "/exampleRegion");
+      queryService.createIndex("secIdIndex", "pos.secId",
+          "/exampleRegion p, p.positions.values pos");
+      populateRegion(100, 1000);
     });
 
-    server.invoke(() -> {
-      Region region = cacheRule.getCache().getRegion(exampleRegionName);
-      for (int i = 1; i <= size; i++) {
-        region.put("key" + i, new Portfolio(i));
+    AsyncInvocation ai1 = server1.invokeAsync(() -> {
+      for (int j = 0; j < 5; j++) {
+        populateRegion(0, 2000);
       }
     });
-  }
-
-  /**
-   * Tests cache operation right after query cancellation.
-   */
-  @Test
-  public void testCacheOpAfterQueryCancel() throws Exception {
-
-    setup(4);
-
-    final Host host = Host.getHost(0);
-
-    VM server1 = host.getVM(0);
-    VM server2 = host.getVM(1);
-    VM server3 = host.getVM(2);
-    VM server4 = host.getVM(3);
-
-    final int numberOfEntries = 1000;
 
-    // Start server
-    server1.invoke("Create BridgeServer", () -> configServer(5, "testQueryExecutionLocally"));
-    server1.invoke("Create Partition Regions", () -> createExamplePRRegion());
-
-    server2.invoke("Create BridgeServer", () -> configServer(5, "testQueryExecutionLocally"));
-    server2.invoke("Create Partition Regions", () -> createExamplePRRegion());
-
-    server3.invoke("Create BridgeServer", () -> configServer(5, "testQueryExecutionLocally"));
-    server3.invoke("Create Partition Regions", () -> createExamplePRRegion());
-
-    server4.invoke("Create BridgeServer", () -> configServer(5, "testQueryExecutionLocally"));
-    server4.invoke("Create Partition Regions", () -> createExamplePRRegion());
+    AsyncInvocation ai2 = server2.invokeAsync(() -> {
+      for (int j = 0; j < 5; j++) {
+        populateRegion(1000, 3000);
+      }
+    });
 
-    server1.invoke(new CacheSerializableRunnable("Create Bridge Server") {
-      public void run2() throws CacheException {
+    // server3 performs a region put after a query is canceled.
+    AsyncInvocation ai3 = server3.invokeAsync(() -> {
+      Region exampleRegion = ClusterStartupRule.getCache().getRegion("exampleRegion");
+      QueryService queryService = GemFireCacheImpl.getInstance().getQueryService();
+      String qStr =
+          "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values pos1, p.positions.values pos"
+              + " where p.ID < pos.sharesOutstanding OR p.ID > 0 OR p.position1.mktValue > 0 "
+              + " OR pos.secId in SET ('SUN', 'IBM', 'YHOO', 'GOOG', 'MSFT', "
+              + " 'AOL', 'APPL', 'ORCL', 'SAP', 'DELL', 'RHAT', 'NOVL', 'HP')"
+              + " order by p.status, p.ID desc";
+      for (int i = 0; i < 100; i++) {
         try {
-          QueryService queryService = GemFireCacheImpl.getInstance().getQueryService();
-          queryService.createIndex("statusIndex", "status", "/exampleRegion");
-          queryService.createIndex("secIdIndex", "pos.secId",
-              "/exampleRegion p, p.positions.values pos");
-        } catch (Exception ex) {
-          fail("Failed to create index.");
-        }
-        Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);
-        for (int i = 100; i <= (numberOfEntries); i++) {
+          Query query = queryService.newQuery(qStr);
+          query.execute();
+          fail("The query should have been canceled by the QueryMonitor. Query: " + qStr);
+        } catch (QueryExecutionTimeoutException qet) {
           exampleRegion.put("" + i, new Portfolio(i));
         }
       }
     });
 
-    // Initialize server regions.
-    AsyncInvocation ai1 =
-        server1.invokeAsync(new CacheSerializableRunnable("Create Bridge Server") {
-          public void run2() throws CacheException {
-            Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);
-            for (int j = 0; j < 5; j++) {
-              for (int i = 1; i <= (numberOfEntries + 1000); i++) {
-                exampleRegion.put("" + i, new Portfolio(i));
-              }
-            }
-            LogWriterUtils.getLogWriter()
-                .info("### Completed updates in server1 in testCacheOpAfterQueryCancel");
-          }
-        });
-
-    AsyncInvocation ai2 =
-        server2.invokeAsync(new CacheSerializableRunnable("Create Bridge Server") {
-          public void run2() throws CacheException {
-            Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);
-            for (int j = 0; j < 5; j++) {
-              for (int i = (1 + 1000); i <= (numberOfEntries + 2000); i++) {
-                exampleRegion.put("" + i, new Portfolio(i));
-              }
-            }
-            LogWriterUtils.getLogWriter()
-                .info("### Completed updates in server2 in testCacheOpAfterQueryCancel");
-          }
-        });
-
-    // Execute server queries
-    SerializableRunnable executeQuery = new CacheSerializableRunnable("Execute queries") {
-      public void run2() throws CacheException {
-        try {
-          Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);
-          QueryService queryService = GemFireCacheImpl.getInstance().getQueryService();
-          String qStr =
-              "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values pos1, p.positions.values pos"
-                  + " where p.ID < pos.sharesOutstanding OR p.ID > 0 OR p.position1.mktValue > 0 "
-                  + " OR pos.secId in SET ('SUN', 'IBM', 'YHOO', 'GOOG', 'MSFT', "
-                  + " 'AOL', 'APPL', 'ORCL', 'SAP', 'DELL', 'RHAT', 'NOVL', 'HP')"
-                  + " order by p.status, p.ID desc";
-          for (int i = 0; i < 500; i++) {
-            try {
-              GemFireCacheImpl.getInstance().getLogger().info("Executing query :" + qStr);
-              Query query = queryService.newQuery(qStr);
-              query.execute();
-            } catch (QueryExecutionTimeoutException qet) {
-              LogWriterUtils.getLogWriter()
-                  .info("### Got Expected QueryExecutionTimeout exception. " + qet.getMessage());
-              if (qet.getMessage().contains("cancelled after exceeding max execution")) {
-                LogWriterUtils.getLogWriter().info("### Doing a put operation");
-                exampleRegion.put("" + i, new Portfolio(i));
-              }
-            } catch (Exception e) {
-              fail("Exception executing query." + e.getMessage());
-            }
-          }
-          LogWriterUtils.getLogWriter()
-              .info("### Completed Executing queries in testCacheOpAfterQueryCancel");
-        } catch (Exception ex) {
-          Assert.fail("Exception creating the query service", ex);
-        }
-      }
-    };
-
-    AsyncInvocation ai3 = server3.invokeAsync(executeQuery);
-    AsyncInvocation ai4 = server4.invokeAsync(executeQuery);
-
-    LogWriterUtils.getLogWriter()
-        .info("### Waiting for async threads to join in testCacheOpAfterQueryCancel");
-    try {
-      ThreadUtils.join(ai1, 5 * 60 * 1000);
-      ThreadUtils.join(ai2, 5 * 60 * 1000);
-      ThreadUtils.join(ai3, 5 * 60 * 1000);
-      ThreadUtils.join(ai4, 5 * 60 * 1000);
-    } catch (Exception ex) {
-      fail("Async thread join failure");
-    }
-    LogWriterUtils.getLogWriter()
-        .info("### DONE Waiting for async threads to join in testCacheOpAfterQueryCancel");
+    ai1.await();
+    ai2.await();
+    ai3.await();
 
-    validateQueryMonitorThreadCnt(server1, 0, 1000);
-    validateQueryMonitorThreadCnt(server2, 0, 1000);
-    validateQueryMonitorThreadCnt(server3, 0, 1000);
-    validateQueryMonitorThreadCnt(server4, 0, 1000);
+    server3.invoke(() -> {
+      DefaultQuery.testHook = null;
+      GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = -1;
+    });
+  }
 
-    LogWriterUtils.getLogWriter()
-        .info("### DONE validating query monitor threads testCacheOpAfterQueryCancel");
 
-    stopServer(server1);
-    stopServer(server2);
-    stopServer(server3);
-    stopServer(server4);
+  private static void populateRegion(int startingId, int endingId) {
+    Region exampleRegion = ClusterStartupRule.getCache().getRegion("exampleRegion");
+    for (int i = startingId; i < endingId; i++) {
+      exampleRegion.put("" + i, new Portfolio(i));
+    }
   }
 
-  private void validateQueryMonitorThreadCnt(VM vm, final int threadCount, final int waitTime) {
-    SerializableRunnable validateThreadCnt =
-        new CacheSerializableRunnable("validateQueryMonitorThreadCnt") {
-          public void run2() throws CacheException {
-            Cache cache = cacheRule.getCache();
-            QueryMonitor qm = ((GemFireCacheImpl) cache).getQueryMonitor();
-            if (qm == null) {
-              fail("Didn't found query monitor.");
-            }
-            int waited = 0;
-            while (true) {
-              if (qm.getQueryMonitorThreadCount() != threadCount) {
-                if (waited <= waitTime) {
-                  Wait.pause(10);
-                  waited += 10;
-                  continue;
-                } else {
-                  fail("Didn't found expected monitoring thread. Expected: " + threadCount
-                      + " found :" + qm.getQueryMonitorThreadCount());
-                }
-              }
-              break;
-            }
-          }
-        };
-    vm.invoke(validateThreadCnt);
+  private static void exuteQuery() {
+    QueryService queryService;
+    if (ClusterStartupRule.getClientCache() == null) {
+      queryService = ClusterStartupRule.getCache().getQueryService();
+    } else {
+      queryService = ClusterStartupRule.getClientCache().getQueryService();
+    }
+    for (int k = 0; k < queryStr.length; k++) {
+      String qStr = queryStr[k];
+      try {
+        Query query = queryService.newQuery(qStr);
+        query.execute();
+        fail("The query should have been canceled by the QueryMonitor. Query: " + qStr);
+      } catch (Exception e) {
+        verifyException(e);
+      }
+    }
   }
 
-  /**
-   * Starts a bridge server on the given port, using the given deserializeValues and
-   * notifyBySubscription to serve up the given region.
-   */
-  protected int startBridgeServer(int port, boolean notifyBySubscription) throws IOException {
-
-    Cache cache = cacheRule.getCache();
-    CacheServer bridge = cache.addCacheServer();
-    bridge.setPort(port);
-    bridge.setNotifyBySubscription(notifyBySubscription);
-    bridge.start();
-    return bridge.getPort();
+  private static void configureClientCacheFactory(ClientCacheFactory ccf, int... serverPorts) {
+    for (int serverPort : serverPorts) {
+      ccf.addPoolServer("localhost", serverPort);
+    }
+    ccf.setPoolReadTimeout(10 * 60 * 1000); // 10 min
+    ccf.setPoolSubscriptionEnabled(true);
   }
 
-  /**
-   * Stops the bridge server that serves up the given cache.
-   */
-  private void stopBridgeServer(Cache cache) {
-    CacheServer bridge = (CacheServer) cache.getCacheServers().iterator().next();
-    bridge.stop();
-    assertFalse(bridge.isRunning());
+  private static void createReplicateRegionWithEviction(File workingDir) {
+    InternalCache cache = ClusterStartupRule.getCache();
+    DiskStoreFactory dsf = cache.createDiskStoreFactory();
+    dsf.setDiskDirs(new File[] {workingDir}).create("ds");
+    EvictionAttributes evictAttrs =
+        EvictionAttributes.createLRUEntryAttributes(100, EvictionAction.OVERFLOW_TO_DISK);
+    cache.createRegionFactory(RegionShortcut.REPLICATE)
+        .setDiskStoreName("ds")
+        .setEvictionAttributes(evictAttrs)
+        .create("exampleRegion");
   }
 
-  private class QueryTimeoutHook implements DefaultQuery.TestHook {
-
-    long timeout;
+  private static void verifyException(Exception e) {
+    String error = e.getMessage();
+    if (e.getCause() != null) {
+      error = e.getCause().getMessage();
+    }
 
-    private QueryTimeoutHook(long timeout) {
-      this.timeout = timeout;
+    if (error.contains("Query execution cancelled after exceeding max execution time")
+        || error.contains("The Query completed sucessfully before it got canceled")
+        || error.contains("The QueryMonitor thread may be sleeping longer than the set sleep time")
+        || error.contains(
+            "The query task could not be found but the query is marked as having been canceled")) {
+      // Expected exception.
+      return;
     }
 
-    public void doTestHook(String description) {
-      if (description.equals("6")) {
-        try {
-          Thread.sleep(timeout * 2);
-        } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt();
-        }
-      }
+    System.out.println("Unexpected exception:");
+    if (e.getCause() != null) {
+      e.getCause().printStackTrace();
+    } else {
+      e.printStackTrace();
     }
 
+    fail("Expected exception Not found. Expected exception with message: \n"
+        + "\"Query execution taking more than the max execution time\"" + "\n Found \n" + error);
+  }
+
+
+  @After
+  public void reset() {
+    VMProvider.invokeInEveryMember(() -> {
+      DefaultQuery.testHook = null;
+      GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = -1;
+    }, server1, server2);
+  }
+
+  private static class QueryTimeoutHook implements DefaultQuery.TestHook {
     public void doTestHook(int spot) {
-      doTestHook("" + spot);
+      if (spot != 6) {
+        return;
+      }
+      try {
+        TimeUnit.MILLISECONDS.sleep(20);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e.getMessage(), e);
+      }
     }
-
   }
 
+  private static String[] queryStr = {"SELECT ID FROM /exampleRegion p WHERE  p.ID > 100",
+      "SELECT DISTINCT * FROM /exampleRegion x, x.positions.values WHERE  x.pk != '1000'",
+      "SELECT DISTINCT * FROM /exampleRegion x, x.positions.values WHERE  x.pkid != '1'",
+      "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values WHERE  p.pk > '1'",
+      "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values WHERE  p.pkid != '53'",
+      "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE  pos.Id > 100",
+      "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE  pos.Id > 100 and pos.secId IN SET('YHOO', 'IBM', 'AMZN')",
+      "SELECT * FROM /exampleRegion p WHERE  p.ID > 100 and p.status = 'active' and p.ID < 100000",
+      "SELECT * FROM /exampleRegion WHERE  ID > 100 and status = 'active'",
+      "SELECT DISTINCT * FROM /exampleRegion p WHERE  p.ID > 100 and p.status = 'active' and p.ID < 100000",
+      "SELECT DISTINCT ID FROM /exampleRegion WHERE  status = 'active'",
+      "SELECT DISTINCT ID FROM /exampleRegion p WHERE  p.status = 'active'",
+      "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE  pos.secId IN SET('YHOO', 'IBM', 'AMZN')",
+      "SELECT DISTINCT proj1:p, proj2:itrX FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos"
+          + " WHERE  pos.secId = 'YHOO') as itrX",
+      "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos"
+          + " WHERE  pos.secId = 'YHOO') as itrX",
+      "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT p.ID FROM /exampleRegion x"
+          + " WHERE  x.ID = p.ID) as itrX",
+      "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion x, x.positions.values pos"
+          + " WHERE  x.ID = p.ID) as itrX",
+      "SELECT DISTINCT x.ID FROM /exampleRegion x, x.positions.values v WHERE  "
+          + "v.secId = element(SELECT DISTINCT vals.secId FROM /exampleRegion p, p.positions.values vals WHERE  vals.secId = 'YHOO')",
+      "SELECT DISTINCT * FROM /exampleRegion p, positions.values pos WHERE   (p.ID > 1 or p.status = 'active') or (true AND pos.secId ='IBM')",
+      "SELECT DISTINCT * FROM /exampleRegion p, positions.values pos WHERE   (p.ID > 1 or p.status = 'active') or (true AND pos.secId !='IBM')",
+      "SELECT DISTINCT structset.sos, structset.key "
+          + "FROM /exampleRegion p, p.positions.values outerPos, "
+          + "(SELECT DISTINCT key: key, sos: pos.sharesOutstanding "
+          + "FROM /exampleRegion.entries pf, pf.value.positions.values pos "
+          + "where outerPos.secId != 'IBM' AND "
+          + "pf.key IN (SELECT DISTINCT * FROM pf.value.collectionHolderMap['0'].arr)) structset "
+          + "where structset.sos > 2000",
+      "SELECT DISTINCT * " + "FROM /exampleRegion p, p.positions.values outerPos, "
+          + "(SELECT DISTINCT key: key, sos: pos.sharesOutstanding "
+          + "FROM /exampleRegion.entries pf, pf.value.positions.values pos "
+          + "where outerPos.secId != 'IBM' AND "
+          + "pf.key IN (SELECT DISTINCT * FROM pf.value.collectionHolderMap['0'].arr)) structset "
+          + "where structset.sos > 2000",
+      "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values position "
+          + "WHERE (true = null OR position.secId = 'SUN') AND true",};
+
 }