You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2018/08/06 21:19:34 UTC
[geode] branch develop updated: Revert "GEODE-577: rewrite
QueryMonitorDUnitTest (#2179)"
This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 002efec Revert "GEODE-577: rewrite QueryMonitorDUnitTest (#2179)"
002efec is described below
commit 002efecdb188b24977374f12cbac3aaab955e81c
Author: Jinmei Liao <ji...@pivotal.io>
AuthorDate: Mon Aug 6 14:19:09 2018 -0700
Revert "GEODE-577: rewrite QueryMonitorDUnitTest (#2179)"
This reverts commit 38e1714
---
.../ResourceManagerWithQueryMonitorDUnitTest.java | 23 +-
.../geode/cache/query/internal/DefaultQuery.java | 10 +-
.../geode/cache/query/internal/QueryMonitor.java | 102 +-
.../cache/PartitionedRegionQueryEvaluator.java | 6 +-
.../cache/query/internal/QueryMonitorTest.java | 91 --
.../cache/query/dunit/QueryMonitorDUnitTest.java | 1299 +++++++++++++++-----
6 files changed, 1074 insertions(+), 457 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java
index 80f2192..598bde1 100755
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java
@@ -870,7 +870,7 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
ServerOperationException soe = (ServerOperationException) e;
if (soe.getRootCause() instanceof QueryException) {
QueryException qe = (QueryException) soe.getRootCause();
- if (isExceptionDueToTimeout(qe)) {
+ if (isExceptionDueToTimeout(qe, queryTimeout)) {
logger.info("Query Execution must be terminated by a timeout. Expected behavior");
return 0;
}
@@ -952,7 +952,7 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
// meaning the query should not be canceled due to low memory
throw new CacheException("Query should not have been canceled due to memory") {};
}
- } else if (isExceptionDueToTimeout((QueryException) e)) {
+ } else if (isExceptionDueToTimeout((QueryException) e, queryTimeout)) {
if (queryTimeout == -1) {
// no time out set, this should not be thrown
@@ -976,7 +976,7 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
// meaning the query should not be canceled due to low memory
throw new CacheException("Query should not have been canceled due to memory") {};
}
- } else if (isExceptionDueToTimeout(qe)) {
+ } else if (isExceptionDueToTimeout(qe, queryTimeout)) {
if (queryTimeout == -1) {
e.printStackTrace();
// no time out set, this should not be thrown
@@ -1219,12 +1219,13 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
.toLocalizedString()));
}
- private boolean isExceptionDueToTimeout(QueryException e) {
+ private boolean isExceptionDueToTimeout(QueryException e, long queryTimeout) {
String message = e.getMessage();
// -1 needs to be matched due to client/server set up, BaseCommand uses the
// MAX_QUERY_EXECUTION_TIME and not the testMaxQueryExecutionTime
return (message.contains("The QueryMonitor thread may be sleeping longer than")
- || message.contains("Query execution cancelled after exceeding max execution time")
+ || message.contains(LocalizedStrings.QueryMonitor_LONG_RUNNING_QUERY_CANCELED
+ .toLocalizedString(queryTimeout))
|| message.contains(
LocalizedStrings.QueryMonitor_LONG_RUNNING_QUERY_CANCELED.toLocalizedString(-1)));
}
@@ -1260,6 +1261,10 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
}
}
+ public void doTestHook(String description) {
+
+ }
+
public void countDown() {
latch.countDown();
}
@@ -1283,6 +1288,10 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
}
}
}
+
+ public void doTestHook(String description) {
+
+ }
}
private class CancelDuringAddResultsHook implements DefaultQuery.TestHook {
@@ -1306,5 +1315,9 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
rejectedObjects = true;
}
}
+
+ public void doTestHook(String description) {
+
+ }
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
index 1053d8d..fad0e0f 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
@@ -716,9 +716,11 @@ public class DefaultQuery implements Query {
/**
* The query gets canceled by the QueryMonitor with the reason being specified
+ * <p>
+ * TODO: parameter isCanceled is always true
*/
- public void setCanceled(CacheRuntimeException canceledException) {
- this.isCanceled = true;
+ public void setCanceled(boolean isCanceled, CacheRuntimeException canceledException) {
+ this.isCanceled = isCanceled;
this.canceledException = canceledException;
}
@@ -983,8 +985,8 @@ public class DefaultQuery implements Query {
}
public interface TestHook {
- default void doTestHook(int spot) {};
+ void doTestHook(int spot);
- default void doTestHook(String spot) {};
+ void doTestHook(String spot);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java
index f7750bc..ccba334 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java
@@ -14,16 +14,21 @@
*/
package org.apache.geode.cache.query.internal;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.Logger;
+import org.apache.geode.cache.query.Query;
import org.apache.geode.cache.query.QueryExecutionLowMemoryException;
import org.apache.geode.cache.query.QueryExecutionTimeoutException;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.log4j.LocalizedMessage;
/**
* QueryMonitor class, monitors the query execution time. Instantiated based on the system property
@@ -41,12 +46,13 @@ public class QueryMonitor implements Runnable {
private static final Logger logger = LogService.getLogger();
private final InternalCache cache;
+ private boolean testingQueryMonitor = false;
/**
* Holds the query execution status for the thread executing the query. FALSE if the query is not
* canceled due to max query execution timeout. TRUE it the query is canceled due to max query
* execution timeout timeout.
*/
- private static final ThreadLocal<AtomicBoolean> queryCancelled =
+ private static final ThreadLocal<AtomicBoolean> queryExecutionStatus =
ThreadLocal.withInitial(() -> new AtomicBoolean(Boolean.FALSE));
private final long maxQueryExecutionTime;
@@ -57,6 +63,9 @@ public class QueryMonitor implements Runnable {
private final AtomicBoolean stopped = new AtomicBoolean(Boolean.FALSE);
+ /** For DUnit test purpose TODO: delete this ConcurrentMap */
+ private ConcurrentMap queryMonitorTasks = null;
+
// Variables for cancelling queries due to low memory
private static volatile Boolean LOW_MEMORY = Boolean.FALSE;
@@ -73,19 +82,14 @@ public class QueryMonitor implements Runnable {
* @param queryThread Thread executing the query.
* @param query Query.
*/
- public void monitorQueryThread(Thread queryThread, DefaultQuery query) {
- // cq query is not monitored
- if (query.isCqQuery()) {
- return;
- }
-
+ public void monitorQueryThread(Thread queryThread, Query query) {
if (LOW_MEMORY) {
String reason = LocalizedStrings.QueryMonitor_LOW_MEMORY_CANCELED_QUERY
.toLocalizedString(LOW_MEMORY_USED_BYTES);
- query.setCanceled(new QueryExecutionLowMemoryException(reason));
+ ((DefaultQuery) query).setCanceled(true, new QueryExecutionLowMemoryException(reason));
throw new QueryExecutionLowMemoryException(reason);
}
- QueryThreadTask queryTask = new QueryThreadTask(queryThread, query, queryCancelled.get());
+ QueryThreadTask queryTask = new QueryThreadTask(queryThread, query, queryExecutionStatus.get());
synchronized (queryThreads) {
queryThreads.add(queryTask);
queryThreads.notifyAll();
@@ -97,19 +101,28 @@ public class QueryMonitor implements Runnable {
queryThreads.size(), queryThread.getId(), query.getQueryString(), queryThread);
}
+ // For dunit test purpose
+ if (cache != null && testingQueryMonitor) {
+ if (this.queryMonitorTasks == null) {
+ this.queryMonitorTasks = new ConcurrentHashMap();
+ }
+ this.queryMonitorTasks.put(queryThread, queryTask);
+ }
}
/**
* Stops monitoring the query. Removes the passed thread from QueryMonitor queue.
*/
- public void stopMonitoringQueryThread(Thread queryThread, DefaultQuery query) {
+ public void stopMonitoringQueryThread(Thread queryThread, Query query) {
// Re-Set the queryExecution status on the LocalThread.
QueryExecutionTimeoutException testException = null;
- boolean[] queryCompleted = query.getQueryCompletedForMonitoring();
+ DefaultQuery defaultQuery = (DefaultQuery) query;
+ boolean[] queryCompleted = defaultQuery.getQueryCompletedForMonitoring();
synchronized (queryCompleted) {
- queryCancelled.get().getAndSet(Boolean.FALSE);
- query.setQueryCompletedForMonitoring(true);
+ queryExecutionStatus.get().getAndSet(Boolean.FALSE);
+
+ defaultQuery.setQueryCompletedForMonitoring(true);
// Remove the query task from the queue.
queryThreads.remove(new QueryThreadTask(queryThread, null, null));
}
@@ -134,7 +147,7 @@ public class QueryMonitor implements Runnable {
* gemfire.Cache.MAX_QUERY_EXECUTION_TIME
*/
public static void isQueryExecutionCanceled() {
- if (queryCancelled.get() != null && queryCancelled.get().get()) {
+ if (queryExecutionStatus.get() != null && queryExecutionStatus.get().get()) {
throw new QueryExecutionCanceledException();
}
}
@@ -169,21 +182,24 @@ public class QueryMonitor implements Runnable {
try {
QueryThreadTask queryTask;
long sleepTime;
+ // TODO: while-block cannot complete without throwing
while (true) {
// Get the first query task from the queue. This query will have the shortest
// remaining time that needs to canceled first.
queryTask = (QueryThreadTask) queryThreads.peek();
if (queryTask == null) {
+ // Empty queue.
synchronized (queryThreads) {
queryThreads.wait();
}
continue;
}
- long executionTime = System.currentTimeMillis() - queryTask.StartTime;
+ long currentTime = System.currentTimeMillis();
+
// Check if the sleepTime is greater than the remaining query execution time.
- if (executionTime < this.maxQueryExecutionTime) {
- sleepTime = this.maxQueryExecutionTime - executionTime;
+ if (currentTime - queryTask.StartTime < this.maxQueryExecutionTime) {
+ sleepTime = this.maxQueryExecutionTime - (currentTime - queryTask.StartTime);
// Its been noted that the sleep is not guaranteed to wait for the specified
// time (as stated in Suns doc too), it depends on the OSs thread scheduling
// behavior, hence thread may sleep for longer than the specified time.
@@ -191,23 +207,33 @@ public class QueryMonitor implements Runnable {
Thread.sleep(sleepTime);
continue;
}
- // Query execution has taken more than the max time, Set queryCancelled flag
- // to true.
- boolean[] queryCompleted = queryTask.query.getQueryCompletedForMonitoring();
+
+ // Query execution has taken more than the max time, Set queryExecutionStatus flag
+ // to canceled (TRUE).
+ boolean[] queryCompleted =
+ ((DefaultQuery) queryTask.query).getQueryCompletedForMonitoring();
synchronized (queryCompleted) {
- // check if query is already completed
- if (!queryCompleted[0]) {
- queryTask.query.setCanceled(new QueryExecutionTimeoutException(String
- .format("Query execution cancelled after exceeding max execution time %sms.",
- this.maxQueryExecutionTime)));
- queryTask.queryCancelled.set(Boolean.TRUE);
- // remove the query threads from monitoring queue
+ if (!queryCompleted[0] && !((DefaultQuery) queryTask.query).isCqQuery()) { // Check if the
+ // query is
+ // already
+ // completed.
+ ((DefaultQuery) queryTask.query).setCanceled(true,
+ new QueryExecutionTimeoutException(
+ LocalizedStrings.QueryMonitor_LONG_RUNNING_QUERY_CANCELED
+ .toLocalizedString(GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME)));
+ queryTask.queryExecutionStatus.set(Boolean.TRUE);
+ // Remove the task from queue.
queryThreads.poll();
- logger.info(String.format(
- "%s is set as canceled after %s milliseconds", queryTask.toString(),
- executionTime));
}
}
+
+ logger.info(LocalizedMessage.create(
+ LocalizedStrings.GemFireCache_LONG_RUNNING_QUERY_EXECUTION_CANCELED,
+ new Object[] {queryTask.query.getQueryString(), queryTask.queryThread.getId()}));
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Query Execution for the thread {} got canceled.", queryTask.queryThread);
+ }
}
} catch (InterruptedException ignore) {
if (logger.isDebugEnabled()) {
@@ -249,15 +275,15 @@ public class QueryMonitor implements Runnable {
}
private void cancelQueryDueToLowMemory(QueryThreadTask queryTask, long memoryThreshold) {
- boolean[] queryCompleted = queryTask.query.getQueryCompletedForMonitoring();
+ boolean[] queryCompleted = ((DefaultQuery) queryTask.query).getQueryCompletedForMonitoring();
synchronized (queryCompleted) {
if (!queryCompleted[0]) {
// cancel if query is not completed
String reason = LocalizedStrings.QueryMonitor_LOW_MEMORY_CANCELED_QUERY
.toLocalizedString(memoryThreshold);
- queryTask.query.setCanceled(
+ ((DefaultQuery) queryTask.query).setCanceled(true,
new QueryExecutionLowMemoryException(reason));
- queryTask.queryCancelled.set(Boolean.TRUE);
+ queryTask.queryExecutionStatus.set(Boolean.TRUE);
}
}
}
@@ -279,16 +305,16 @@ public class QueryMonitor implements Runnable {
final Thread queryThread;
// package-private to avoid synthetic accessor
- final DefaultQuery query;
+ final Query query;
// package-private to avoid synthetic accessor
- final AtomicBoolean queryCancelled;
+ final AtomicBoolean queryExecutionStatus;
- QueryThreadTask(Thread queryThread, DefaultQuery query, AtomicBoolean queryCancelled) {
+ QueryThreadTask(Thread queryThread, Query query, AtomicBoolean queryExecutionStatus) {
this.StartTime = System.currentTimeMillis();
this.queryThread = queryThread;
this.query = query;
- this.queryCancelled = queryCancelled;
+ this.queryExecutionStatus = queryExecutionStatus;
}
@Override
@@ -315,7 +341,7 @@ public class QueryMonitor implements Runnable {
return new StringBuilder().append("QueryThreadTask[StartTime:").append(this.StartTime)
.append(", queryThread:").append(this.queryThread).append(", threadId:")
.append(this.queryThread.getId()).append(", query:").append(this.query.getQueryString())
- .append(", queryCancelled:").append(this.queryCancelled).append(']')
+ .append(", queryExecutionStatus:").append(this.queryExecutionStatus).append(']')
.toString();
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
index 76262fc..8a8c61c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
@@ -241,7 +241,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
String reason =
LocalizedStrings.QueryMonitor_LOW_MEMORY_WHILE_GATHERING_RESULTS_FROM_PARTITION_REGION
.toLocalizedString();
- query.setCanceled(new QueryExecutionLowMemoryException(reason));
+ query.setCanceled(true, new QueryExecutionLowMemoryException(reason));
} else {
if (logger.isDebugEnabled()) {
logger.debug("query cancelled while gathering results, aborting due to exception "
@@ -762,7 +762,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
String reason =
LocalizedStrings.QueryMonitor_LOW_MEMORY_WHILE_GATHERING_RESULTS_FROM_PARTITION_REGION
.toLocalizedString();
- query.setCanceled(new QueryExecutionLowMemoryException(reason));
+ query.setCanceled(true, new QueryExecutionLowMemoryException(reason));
if (DefaultQuery.testHook != null) {
DefaultQuery.testHook.doTestHook(5);
}
@@ -1103,7 +1103,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
String reason =
LocalizedStrings.QueryMonitor_LOW_MEMORY_WHILE_GATHERING_RESULTS_FROM_PARTITION_REGION
.toLocalizedString();
- query.setCanceled(new QueryExecutionLowMemoryException(reason));
+ query.setCanceled(true, new QueryExecutionLowMemoryException(reason));
this.abort = true;
}
diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java
deleted file mode 100644
index ea04c18..0000000
--- a/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.cache.query.internal;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.awaitility.Awaitility;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.geode.internal.cache.InternalCache;
-
-/**
- * although max_execution_time is set as 10ms, the monitor thread can sleep more than the specified
- * time, so query will be cancelled at un-deterministic time after 10ms. We cannot assert on
- * specific time at which the query will be cancelled. We can only assert that the query will be
- * cancelled at one point after 10ms.
- */
-public class QueryMonitorTest {
-
- private static InternalCache cache;
- private static QueryMonitor monitor;
- private static long max_execution_time = 5;
-
- @BeforeClass
- public static void setUp() {
- cache = mock(InternalCache.class);
- monitor = new QueryMonitor(cache, max_execution_time);
- Thread monitorThread = new Thread(() -> monitor.run(), "query monitor thread");
- monitorThread.setDaemon(true);
- monitorThread.start();
- }
-
- @Test
- public void queryIsCancelled() {
- List<DefaultQuery> queries = new ArrayList<>();
- List<Thread> threads = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- DefaultQuery query = new DefaultQuery("query" + i, cache, false);
- queries.add(query);
- Thread queryExecutionThread = createQueryExecutionThread(i);
- threads.add(queryExecutionThread);
- monitor.monitorQueryThread(queryExecutionThread, query);
- }
-
- for (DefaultQuery query : queries) {
- // make sure the isCancelled flag in Query is set correctly
- Awaitility.await().until(() -> query.isCanceled());
- }
- Awaitility.await().until(() -> monitor.getQueryMonitorThreadCount() == 0);
- // make sure all thread died
- for (Thread thread : threads) {
- Awaitility.await().until(() -> !thread.isAlive());
- }
- }
-
- @Test
- public void cqQueryIsNotMonitored() {
- DefaultQuery query = mock(DefaultQuery.class);
- when(query.isCqQuery()).thenReturn(true);
- monitor.monitorQueryThread(mock(Thread.class), query);
- assertThat(monitor.getQueryMonitorThreadCount()).isEqualTo(0);
- }
-
- private Thread createQueryExecutionThread(int i) {
- Thread thread = new Thread(() -> {
- // make sure the threadlocal variable is updated
- Awaitility.await().until(() -> QueryMonitor.isQueryExecutionCanceled());
- });
- thread.setName("query" + i);
- return thread;
- }
-
-}
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryMonitorDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryMonitorDUnitTest.java
index 871c0b4..4bcfd01 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryMonitorDUnitTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryMonitorDUnitTest.java
@@ -12,23 +12,31 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
+
package org.apache.geode.cache.query.dunit;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import java.io.File;
-import java.util.concurrent.TimeUnit;
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.logging.log4j.Logger;
import org.junit.After;
-import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.DiskStoreFactory;
import org.apache.geode.cache.EvictionAction;
import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.query.CqAttributes;
@@ -41,211 +49,804 @@ import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.cq.dunit.CqQueryTestListener;
import org.apache.geode.cache.query.data.Portfolio;
import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.cache.query.internal.QueryMonitor;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache30.CacheSerializableRunnable;
+import org.apache.geode.cache30.CacheTestCase;
import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.LogWriterUtils;
-import org.apache.geode.test.dunit.rules.ClientVM;
-import org.apache.geode.test.dunit.rules.ClusterStartupRule;
-import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.NetworkUtils;
+import org.apache.geode.test.dunit.SerializableRunnable;
+import org.apache.geode.test.dunit.ThreadUtils;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.Wait;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
import org.apache.geode.test.junit.categories.OQLQueryTest;
-import org.apache.geode.test.junit.rules.GfshCommandRule;
-import org.apache.geode.test.junit.rules.VMProvider;
/**
- * These tests add a test hook to make sure query execution sleeps for at least 20 ms, so the
- * queryMonitor thread will get a chance to cancel the query before it completes.
- *
- * The MAX_QUERY_EXECUTION_TIME is set as 1 ms, i.e, theoretically all queries will be cancelled
- * after 1ms, but due to thread scheduling, this may not be true. we can only decrease the flakyness
- * of the test by making MAX_QUERY_EXECUTION_TIME the smallest possible (1) and making the query
- * execution time longer (but not too long to make the test run too slow).
+ * Tests for QueryMonitoring service.
*
+ * @since GemFire 6.0
*/
@Category({OQLQueryTest.class})
-public class QueryMonitorDUnitTest {
- private static int MAX_QUERY_EXECUTE_TIME = 1;
+public class QueryMonitorDUnitTest implements Serializable {
+
+ private static final Logger logger = LogService.getLogger();
+
+ @Rule
+ public DistributedTestRule distributedTestRule = new DistributedTestRule();
+
@Rule
- public ClusterStartupRule cluster = new ClusterStartupRule(5);
+ public CacheRule cacheRule = new CacheRule();
@Rule
- public GfshCommandRule gfsh = new GfshCommandRule();
+ public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+ private final String exampleRegionName = "exampleRegion";
+
+
+ /* Some of the queries are commented out as they were taking less time */
+ String[] queryStr = {"SELECT ID FROM /exampleRegion p WHERE p.ID > 100",
+ "SELECT DISTINCT * FROM /exampleRegion x, x.positions.values WHERE x.pk != '1000'",
+ "SELECT DISTINCT * FROM /exampleRegion x, x.positions.values WHERE x.pkid != '1'",
+ "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values WHERE p.pk > '1'",
+ "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values WHERE p.pkid != '53'",
+ "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE pos.Id > 100",
+ "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE pos.Id > 100 and pos.secId IN SET('YHOO', 'IBM', 'AMZN')",
+ "SELECT * FROM /exampleRegion p WHERE p.ID > 100 and p.status = 'active' and p.ID < 100000",
+ "SELECT * FROM /exampleRegion WHERE ID > 100 and status = 'active'",
+ "SELECT DISTINCT * FROM /exampleRegion p WHERE p.ID > 100 and p.status = 'active' and p.ID < 100000",
+ "SELECT DISTINCT ID FROM /exampleRegion WHERE status = 'active'",
+ "SELECT DISTINCT ID FROM /exampleRegion p WHERE p.status = 'active'",
+ "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE pos.secId IN SET('YHOO', 'IBM', 'AMZN')",
+ "SELECT DISTINCT proj1:p, proj2:itrX FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos"
+ + " WHERE pos.secId = 'YHOO') as itrX",
+ "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos"
+ + " WHERE pos.secId = 'YHOO') as itrX",
+ "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT p.ID FROM /exampleRegion x"
+ + " WHERE x.ID = p.ID) as itrX",
+ "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion x, x.positions.values pos"
+ + " WHERE x.ID = p.ID) as itrX",
+ "SELECT DISTINCT x.ID FROM /exampleRegion x, x.positions.values v WHERE "
+ + "v.secId = element(SELECT DISTINCT vals.secId FROM /exampleRegion p, p.positions.values vals WHERE vals.secId = 'YHOO')",
+ "SELECT DISTINCT * FROM /exampleRegion p, positions.values pos WHERE (p.ID > 1 or p.status = 'active') or (true AND pos.secId ='IBM')",
+ "SELECT DISTINCT * FROM /exampleRegion p, positions.values pos WHERE (p.ID > 1 or p.status = 'active') or (true AND pos.secId !='IBM')",
+ "SELECT DISTINCT structset.sos, structset.key "
+ + "FROM /exampleRegion p, p.positions.values outerPos, "
+ + "(SELECT DISTINCT key: key, sos: pos.sharesOutstanding "
+ + "FROM /exampleRegion.entries pf, pf.value.positions.values pos "
+ + "where outerPos.secId != 'IBM' AND "
+ + "pf.key IN (SELECT DISTINCT * FROM pf.value.collectionHolderMap['0'].arr)) structset "
+ + "where structset.sos > 2000",
+ "SELECT DISTINCT * " + "FROM /exampleRegion p, p.positions.values outerPos, "
+ + "(SELECT DISTINCT key: key, sos: pos.sharesOutstanding "
+ + "FROM /exampleRegion.entries pf, pf.value.positions.values pos "
+ + "where outerPos.secId != 'IBM' AND "
+ + "pf.key IN (SELECT DISTINCT * FROM pf.value.collectionHolderMap['0'].arr)) structset "
+ + "where structset.sos > 2000",
+ "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values position "
+ + "WHERE (true = null OR position.secId = 'SUN') AND true",};
+
+ String[] prQueryStr = {
+ "SELECT ID FROM /exampleRegion p WHERE p.ID > 100 and p.status = 'active'",
+ "SELECT * FROM /exampleRegion WHERE ID > 100 and status = 'active'",
+ "SELECT DISTINCT * FROM /exampleRegion p WHERE p.ID > 100 and p.status = 'active' and p.ID < 100000",
+ "SELECT DISTINCT p.ID FROM /exampleRegion p WHERE p.ID > 100 and p.ID < 100000 and p.status = 'active'",
+ "SELECT DISTINCT * FROM /exampleRegion p, positions.values pos WHERE (p.ID > 1 or p.status = 'active') or (pos.secId != 'IBM')",};
+
+ private int numServers;
+
+ @After
+ public final void preTearDownCacheTestCase() throws Exception {
+ Host host = Host.getHost(0);
+ // shut down clients before servers
+ for (int i = numServers; i < 4; i++) {
+ host.getVM(i).invoke(() -> CacheTestCase.disconnectFromDS());
+ }
+ }
+
+ public void setup(int numServers) throws Exception {
+ Host host = Host.getHost(0);
+ this.numServers = numServers;
+ }
+
+ public void createExampleRegion() {
+ createExampleRegion(false, null);
+ }
+
+ private void createExampleRegion(final boolean eviction, final String dirName) {
+ RegionFactory regionFactory =
+ cacheRule.getCache().createRegionFactory(RegionShortcut.REPLICATE);
+
+ // setting the eviction attributes.
+ if (eviction) {
+ File[] f = new File[1];
+ f[0] = new File(dirName);
+ f[0].mkdir();
+ DiskStoreFactory dsf = cacheRule.getCache().createDiskStoreFactory();
+ dsf.setDiskDirs(f).create("ds1");
+ EvictionAttributes evictAttrs =
+ EvictionAttributes.createLRUEntryAttributes(100, EvictionAction.OVERFLOW_TO_DISK);
+ regionFactory.setDiskStoreName("ds1").setEvictionAttributes(evictAttrs);
+ }
+ regionFactory.create(exampleRegionName);
+ }
+
+ private void createExamplePRRegion() {
+ RegionFactory regionFactory =
+ cacheRule.getCache().createRegionFactory(RegionShortcut.PARTITION);
+
+ AttributesFactory factory = new AttributesFactory();
+ // factory.setDataPolicy(DataPolicy.PARTITION);
+ regionFactory
+ .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(8).create());
+ regionFactory.create(exampleRegionName);
+ }
+
+ private int configServer(final int queryMonitorTime, final String testName) throws IOException {
+ cacheRule.createCache();
+ CacheServer cacheServer = cacheRule.getCache().addCacheServer();
+ cacheServer.setPort(0);
+ cacheServer.start();
+ GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = queryMonitorTime;
+ cacheRule.getCache().getLogger().fine("#### RUNNING TEST : " + testName);
+ DefaultQuery.testHook = new QueryTimeoutHook(queryMonitorTime);
+ return cacheServer.getPort();
+ }
+
+ // Stop server
+ private void stopServer(VM server) {
+ SerializableRunnable stopServer = new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ // Reset the test flag.
+ Cache cache = cacheRule.getCache();
+ DefaultQuery.testHook = null;
+ GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = -1;
+ stopBridgeServer(cacheRule.getCache());
+ }
+ };
+ server.invoke(stopServer);
+ }
+
+ private void configClient(String host, int... ports) {
+ configClient(false, host, ports);
+ }
+
+ private void configClient(boolean enableSubscriptions, String host, int... ports) {
+ ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+ for (int port : ports) {
+ clientCacheFactory.addPoolServer(host, port);
+ }
+ clientCacheFactory.setPoolSubscriptionEnabled(true);
+ clientCacheFactory.setPoolReadTimeout(10 * 60 * 1000); // 10 mins
+ clientCacheRule.createClientCache(clientCacheFactory);
+ }
- private MemberVM locator, server1, server2;
- private ClientVM client3, client4;
+ private void verifyException(Exception e) {
+ e.printStackTrace();
+ String error = e.getMessage();
+ if (e.getCause() != null) {
+ error = e.getCause().getMessage();
+ }
- @Before
- public void setUpServers() throws Exception {
- locator = cluster.startLocatorVM(0, l -> l.withoutClusterConfigurationService());
- server1 = cluster.startServerVM(1, locator.getPort());
- server2 = cluster.startServerVM(2, locator.getPort());
+ if (error.contains("Query execution cancelled after exceeding max execution time")
+ || error.contains("The Query completed sucessfully before it got canceled")
+ || error.contains("The QueryMonitor thread may be sleeping longer than the set sleep time")
+ || error.contains(
+ "The query task could not be found but the query is marked as having been canceled")) {
+ // Expected exception.
+ return;
+ }
+
+ System.out.println("Unexpected exception:");
+ if (e.getCause() != null) {
+ e.getCause().printStackTrace();
+ } else {
+ e.printStackTrace();
+ }
- // configure the server to make the query to wait for at least 1 second in every execution spot
- // and set a MAX_QUERY_EXECUTION_TIME to be 10ms
- VMProvider.invokeInEveryMember(() -> {
- DefaultQuery.testHook = new QueryTimeoutHook();
- GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = MAX_QUERY_EXECUTE_TIME;
- }, server1, server2);
- gfsh.connectAndVerify(locator);
+ fail("Expected exception Not found. Expected exception with message: \n"
+ + "\"Query execution taking more than the max execution time\"" + "\n Found \n" + error);
}
+ /**
+ * Tests query execution from client to server (single server).
+ */
@Test
- public void testMultipleClientToOneServer() throws Exception {
- int server1Port = server1.getPort();
- client3 = cluster.startClientVM(3, true, server1Port);
- client4 = cluster.startClientVM(4, true, server1Port);
+ public void testQueryMonitorClientServer() throws Exception {
+
+ setup(1);
+
+ final Host host = Host.getHost(0);
+
+ VM server = host.getVM(0);
+ VM client1 = host.getVM(1);
+ VM client2 = host.getVM(2);
+ VM client3 = host.getVM(3);
+
+ final int numberOfEntries = 100;
+ String serverHostName = NetworkUtils.getServerHostName(host);
+
+ // Start server
+ int serverPort = server.invoke("Create BridgeServer",
+ () -> configServer(10, "testQueryMonitorClientServer")); // All the queries taking more than
+ // 20ms should be canceled by Query
+ // monitor.
+ server.invoke("createExampleRegion", () -> createExampleRegion());
+
+ // Initialize server regions.
+ server.invoke("populatePortfolioRegions", () -> populatePortfolioRegions(numberOfEntries));
+
+ // Initialize Client1
+ client1.invoke("Init client", () -> configClient(serverHostName, serverPort));
+
+ // Initialize Client2
+ client2.invoke("Init client", () -> configClient(serverHostName, serverPort));
- gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE")
- .statusIsSuccess();
+ // Initialize Client3
+ client3.invoke("Init client", () -> configClient(serverHostName, serverPort));
- locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2);
- server1.invoke(() -> populateRegion(0, 100));
+ // Execute client queries
- // execute the query
- VMProvider.invokeInEveryMember(() -> exuteQuery(), client3, client4);
+ client1.invoke("execute Queries", () -> executeQueriesFromClient(10));
+ client2.invoke("execute Queries", () -> executeQueriesFromClient(10));
+ client3.invoke("execute Queries", () -> executeQueriesFromClient(10));
+
+ stopServer(server);
+ }
+
+ private void executeQueriesFromClient(int timeout) {
+ GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = timeout;
+ QueryService queryService = clientCacheRule.getClientCache().getQueryService();
+ executeQueriesAgainstQueryService(queryService);
+ }
+
+ private void executeQueriesOnServer() {
+ QueryService queryService = GemFireCacheImpl.getInstance().getQueryService();
+ executeQueriesAgainstQueryService(queryService);
}
+ private void executeQueriesAgainstQueryService(QueryService queryService) {
+ for (int k = 0; k < queryStr.length; k++) {
+ String qStr = queryStr[k];
+ executeQuery(queryService, qStr);
+ }
+ }
+
+ private void executeQuery(QueryService queryService, String qStr) {
+ try {
+ logger.info("Executing query :" + qStr);
+ Query query = queryService.newQuery(qStr);
+ query.execute();
+ fail("The query should have been canceled by the QueryMonitor. Query: " + qStr);
+ } catch (Exception e) {
+ verifyException(e);
+ }
+ }
+
+ /**
+ * Tests query execution from client to server (multi server).
+ */
@Test
- public void testOneClientToMultipleServerOnReplicateRegion() throws Exception {
- int server1Port = server1.getPort();
- int server2Port = server2.getPort();
- client3 = cluster.startClientVM(3, true, server1Port, server2Port);
+ public void testQueryMonitorMultiClientMultiServer() throws Exception {
+
+ setup(2);
+
+ final Host host = Host.getHost(0);
+
+ VM server1 = host.getVM(0);
+ VM server2 = host.getVM(1);
+ VM client1 = host.getVM(2);
+ VM client2 = host.getVM(3);
+
+ final int numberOfEntries = 100;
+
+ String serverHostName = NetworkUtils.getServerHostName(host);
+
+ // Start server
+ int serverPort1 = server1.invoke("Create BridgeServer",
+ () -> configServer(20, "testQueryMonitorMultiClientMultiServer"));// All the queries taking
+ // more than 20ms should
+ // be canceled by Query
+ // monitor.
+ server1.invoke("createExampleRegion", () -> createExampleRegion());
- gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE")
- .statusIsSuccess();
+ int serverPort2 = server2.invoke("Create BridgeServer",
+ () -> configServer(20, "testQueryMonitorMultiClientMultiServer"));// All the queries taking
+ // more than 20ms should
+ // be canceled by Query
+ // monitor.
+ server2.invoke("createExampleRegion", () -> createExampleRegion());
- locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2);
- server1.invoke(() -> populateRegion(0, 100));
+ // Initialize server regions.
+ server1.invoke("Create Bridge Server", () -> populatePortfolioRegions(numberOfEntries));
- // execute the query from client3
- client3.invoke(() -> exuteQuery());
+ // Initialize server regions.
+ server2.invoke("Create Bridge Server", () -> populatePortfolioRegions(numberOfEntries));
+
+ // Initialize Client1 and create client regions.
+ client1.invoke("Init client", () -> configClient(serverHostName, serverPort1, serverPort2));
+ // client1.invoke("createExampleRegion", () -> createExampleRegion());
+
+ // Initialize Client2 and create client regions.
+ client2.invoke("Init client", () -> configClient(serverHostName, serverPort1, serverPort2));
+ // client2.invoke("createExampleRegion", () -> createExampleRegion());
+
+ // Execute client queries
+
+ client1.invoke("executeQueriesFromClient", () -> executeQueriesFromClient(20));
+ client2.invoke("executeQueriesFromClient", () -> executeQueriesFromClient(20));
+
+ stopServer(server1);
+ stopServer(server2);
}
+ /**
+ * Tests query execution on local vm.
+ */
@Test
- public void testOneClientToOneServerOnPartitionedRegion() throws Exception {
- // client3 connects to server1, client4 connects to server2
- int server1Port = server1.getPort();
- int server2Port = server2.getPort();
- client3 = cluster.startClientVM(3, true, server1Port);
- client4 = cluster.startClientVM(4, true, server2Port);
+ public void testQueryExecutionLocally() throws Exception {
+
+ setup(2);
+
+ final Host host = Host.getHost(0);
+
+ VM server1 = host.getVM(0);
+ VM server2 = host.getVM(1);
+
+ final int numberOfEntries = 100;
+
+ // Start server
+ server1.invoke("Create BridgeServer", () -> configServer(20, "testQueryExecutionLocally"));// All
+ // the
+ // queries
+ // taking
+ // more
+ // than
+ // 20ms
+ // should
+ // be
+ // canceled
+ // by
+ // Query
+ // monitor.
+ server1.invoke("createExampleRegion", () -> createExampleRegion());
+
+ server2.invoke("Create BridgeServer", () -> configServer(20, "testQueryExecutionLocally"));// All
+ // the
+ // queries
+ // taking
+ // more
+ // than
+ // 20ms
+ // should
+ // be
+ // canceled
+ // by
+ // Query
+ // monitor.
+ server2.invoke("createExampleRegion", () -> createExampleRegion());
+
+ // Initialize server regions.
+ server1.invoke("Create Bridge Server", () -> populatePortfolioRegions(numberOfEntries));
+
+ // Initialize server regions.
+ server2.invoke("Create Bridge Server", () -> populatePortfolioRegions(numberOfEntries));
+
+ // Execute server queries
+
+ server1.invoke("execute queries on Server", () -> executeQueriesOnServer());
+ server2.invoke("execute queries on Server", () -> executeQueriesOnServer());
+
+ stopServer(server1);
+ stopServer(server2);
+ }
- gfsh.executeAndAssertThat("create region --name=exampleRegion --type=PARTITION")
- .statusIsSuccess();
+ /**
+ * Tests query execution on local vm.
+ */
+ @Test
+ public void testQueryExecutionLocallyAndCacheOp() throws Exception {
+
+ setup(2);
+
+ final Host host = Host.getHost(0);
+
+ VM server1 = host.getVM(0);
+ VM server2 = host.getVM(1);
+
+ final int numberOfEntries = 1000;
+
+ // Start server
+ server1.invoke("Create BridgeServer", () -> configServer(20, "testQueryExecutionLocally"));// All
+ // the
+ // queries
+ // taking
+ // more
+ // than
+ // 20ms
+ // should
+ // be
+ // canceled
+ // by
+ // Query
+ // monitor.
+ server1.invoke("createExampleRegion", () -> createExampleRegion());
+
+ server2.invoke("Create BridgeServer", () -> configServer(20, "testQueryExecutionLocally"));// All
+ // the
+ // queries
+ // taking
+ // more
+ // than
+ // 20ms
+ // should
+ // be
+ // canceled
+ // by
+ // Query
+ // monitor.
+ server2.invoke("createExampleRegion", () -> createExampleRegion());
+
+ // Initialize server regions.
+ server1.invoke("populatePortfolioRegions", () -> populatePortfolioRegions(numberOfEntries));
+
+ // Initialize server regions.
+ server2.invoke("populatePortfolioRegions", () -> populatePortfolioRegions(numberOfEntries));
+
+ // Execute server queries
+ SerializableRunnable executeQuery = new CacheSerializableRunnable("Execute queries") {
+ public void run2() throws CacheException {
+ try {
+ QueryService queryService = GemFireCacheImpl.getInstance().getQueryService();
+ String qStr =
+ "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion x, x.positions.values pos"
+ + " WHERE x.ID = p.ID) as itrX";
+ executeQuery(queryService, qStr);
+
+ // Create index and Perform cache op. Bug#44307
+ queryService.createIndex("idIndex", "ID", "/exampleRegion");
+ queryService.createIndex("statusIndex", "status", "/exampleRegion");
+ Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);
+ for (int i = (1 + 100); i <= (numberOfEntries + 200); i++) {
+ exampleRegion.put("" + i, new Portfolio(i));
+ }
+
+ } catch (Exception ex) {
+ Assert.fail("Exception creating the query service", ex);
+ }
+ }
+ };
+
+ server1.invoke(executeQuery);
+ server2.invoke(executeQuery);
- locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2);
- server1.invoke(() -> populateRegion(0, 100));
- server2.invoke(() -> populateRegion(100, 200));
+ stopServer(server1);
+ stopServer(server2);
+ }
- client3.invoke(() -> exuteQuery());
- client4.invoke(() -> exuteQuery());
+ private void populatePortfolioRegions(int numberOfEntries) {
+ Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);;
+ for (int i = (1 + 100); i <= (numberOfEntries + 100); i++) {
+ exampleRegion.put("" + i, new Portfolio(i));
+ }
}
+ /**
+ * Tests query execution from client to server (multiple server) on Partition Region .
+ */
@Test
- public void testQueryExecutionFromServerAndPerformCacheOp() throws Exception {
- gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE")
- .statusIsSuccess();
-
- locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2);
- server1.invoke(() -> populateRegion(0, 100));
-
- // execute the query from one server
- server1.invoke(() -> exuteQuery());
-
- // Create index and Perform cache op. Bug#44307
- server1.invoke(() -> {
- QueryService queryService = ClusterStartupRule.getCache().getQueryService();
- queryService.createIndex("idIndex", "ID", "/exampleRegion");
- queryService.createIndex("statusIndex", "status", "/exampleRegion");
- populateRegion(100, 10);
- });
+ public void testQueryMonitorOnPR() throws Exception {
+
+ setup(2);
+
+ final Host host = Host.getHost(0);
+
+ VM server1 = host.getVM(0);
+ VM server2 = host.getVM(1);
+ VM client1 = host.getVM(2);
+ VM client2 = host.getVM(3);
+
+ final int numberOfEntries = 100;
+
+ String serverHostName = NetworkUtils.getServerHostName(host);
+
+ // Start server
+ int serverPort1 = server1.invoke("configServer",
+ () -> configServer(20, "testQueryMonitorMultiClientMultiServerOnPR"));// All the queries
+ // taking more than
+ // 100ms should be
+ // canceled by Query
+ // monitor.
+ server1.invoke("createExamplePRRegion", () -> createExamplePRRegion());
+
+ int serverPort2 = server2.invoke("configServer",
+ () -> configServer(20, "testQueryMonitorMultiClientMultiServerOnPR"));// All the queries
+ // taking more than
+ // 100ms should be
+ // canceled by Query
+ // monitor.
+ server2.invoke("createExamplePRRegion", () -> createExamplePRRegion());
+
+ // Initialize server regions.
+ server1.invoke("bulkInsertPorfolio", () -> bulkInsertPorfolio(101, numberOfEntries));
+
+ // Initialize server regions.
+ server2.invoke("bulkInsertPorfolio", () -> bulkInsertPorfolio((numberOfEntries + 100),
+ (numberOfEntries + numberOfEntries + 100)));
+
+ // Initialize Client1
+ client1.invoke("Init client", () -> configClient(serverHostName, serverPort1));
+
+ // Initialize Client2
+ client2.invoke("Init client", () -> configClient(serverHostName, serverPort2));
+
+ // Execute client queries
- // destroy indices created in this test
- gfsh.executeAndAssertThat("destroy index --region=/exampleRegion").statusIsSuccess();
+ client1.invoke("Execute Queries", () -> executeQueriesFromClient(20));
+ client2.invoke("Execute Queries", () -> executeQueriesFromClient(20));
+
+ stopServer(server1);
+ stopServer(server2);
+ }
+
+ private void bulkInsertPorfolio(int startingId, int numberOfEntries) {
+ Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);
+ for (int i = startingId; i <= (numberOfEntries + 100); i++) {
+ exampleRegion.put("" + i, new Portfolio(i));
+ }
}
+ /**
+ * Tests query execution on Partition Region, executes query locally.
+ */
@Test
- public void testQueryExecutionFromServerOnPartitionedRegion() throws Exception {
- gfsh.executeAndAssertThat("create region --name=exampleRegion --type=PARTITION")
- .statusIsSuccess();
+ public void testQueryMonitorWithLocalQueryOnPR() throws Exception {
+
+ setup(2);
+
+ final Host host = Host.getHost(0);
+
+ VM server1 = host.getVM(0);
+ VM server2 = host.getVM(1);
+
+ final int numberOfEntries = 100;
+
+ // Start server
+ server1.invoke("configServer",
+ () -> configServer(20, "testQueryMonitorMultiClientMultiServerOnPR"));// All the queries
+ // taking more than
+ // 100ms should be
+ // canceled by Query
+ // monitor.
+ server1.invoke("Create Partition Regions", () -> createExamplePRRegion());
+
+ server2.invoke("configServer",
+ () -> configServer(20, "testQueryMonitorMultiClientMultiServerOnPR"));// All the queries
+ // taking more than
+ // 100ms should be
+ // canceled by Query
+ // monitor.
+ server2.invoke("Create Partition Regions", () -> createExamplePRRegion());
+
+ // Initialize server regions.
+ server1.invoke(new CacheSerializableRunnable("Create Bridge Server") {
+ public void run2() throws CacheException {
+ bulkInsertPorfolio(101, numberOfEntries);
+ }
+ });
+
+ // Initialize server regions.
+ server2.invoke(new CacheSerializableRunnable("Create Bridge Server") {
+ public void run2() throws CacheException {
+ bulkInsertPorfolio((numberOfEntries + 100), (numberOfEntries + numberOfEntries + 100));
+ }
+ });
- locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2);
- server1.invoke(() -> populateRegion(100, 200));
- server2.invoke(() -> populateRegion(200, 300));
+ // Execute client queries
+ server1.invoke("execute queries on server", () -> executeQueriesOnServer());
+ server2.invoke("execute queries on server", () -> executeQueriesOnServer());
- // execute the query from one server
- server1.invoke(() -> exuteQuery());
- server2.invoke(() -> exuteQuery());
+ stopServer(server1);
+ stopServer(server2);
}
+ /**
+ * Tests query execution from client to server (multiple server) with eviction to disk.
+ */
@Test
- public void testQueryMonitorRegionWithEviction() throws Exception {
- File server1WorkingDir = server1.getWorkingDir();
- File server2WorkingDir = server2.getWorkingDir();
- server1.invoke(() -> createReplicateRegionWithEviction(server1WorkingDir));
- server2.invoke(() -> createReplicateRegionWithEviction(server2WorkingDir));
- server1.invoke(() -> populateRegion(0, 100));
- server2.invoke(() -> populateRegion(100, 200));
-
- // client3 connects to server1, client4 connects to server2
- int server1Port = server1.getPort();
- int server2Port = server2.getPort();
- client3 = cluster.startClientVM(3, ccf -> {
- configureClientCacheFactory(ccf, server1Port);
+ public void testQueryMonitorRegionWithEviction() throws CacheException {
+
+ final Host host = Host.getHost(0);
+
+ VM server1 = host.getVM(0);
+ VM server2 = host.getVM(1);
+ VM client1 = host.getVM(2);
+ VM client2 = host.getVM(3);
+
+ final int numberOfEntries = 100;
+
+ String serverHostName = NetworkUtils.getServerHostName(host);
+
+ // Start server
+ int serverPort1 = server1.invoke("Create BridgeServer",
+ () -> configServer(20, "testQueryMonitorRegionWithEviction"));// All the queries taking more
+ // than 20ms should be
+ // canceled by Query monitor.
+ server1.invoke("createExampleRegion",
+ () -> createExampleRegion(true, "server1_testQueryMonitorRegionWithEviction"));
+
+ int serverPort2 = server2.invoke("Create BridgeServer",
+ () -> configServer(20, "testQueryMonitorRegionWithEviction"));// All the queries taking more
+ // than 20ms should be
+ // canceled by Query monitor.
+ server2.invoke("createExampleRegion",
+ () -> createExampleRegion(true, "server2_testQueryMonitorRegionWithEviction"));
+
+ // Initialize server regions.
+ server1.invoke(new CacheSerializableRunnable("Create Bridge Server") {
+ public void run2() throws CacheException {
+ bulkInsertPorfolio(101, numberOfEntries);
+ }
});
- client4 = cluster.startClientVM(4, ccf -> {
- configureClientCacheFactory(ccf, server2Port);
+ // Initialize server regions.
+ server2.invoke(new CacheSerializableRunnable("Create Bridge Server") {
+ public void run2() throws CacheException {
+ bulkInsertPorfolio((numberOfEntries + 100), (numberOfEntries + numberOfEntries + 100));
+ }
});
- client3.invoke(() -> exuteQuery());
- client4.invoke(() -> exuteQuery());
+
+ // Initialize Client1 and create client regions.
+ client1.invoke("Init client", () -> configClient(serverHostName, serverPort1));
+
+ // Initialize Client2 and create client regions.
+ client2.invoke("Init client", () -> configClient(serverHostName, serverPort2));
+
+ // Execute client queries
+ client1.invoke("Execute Queries", () -> executeQueriesFromClient(20));
+ client2.invoke("Execute Queries", () -> executeQueriesFromClient(20));
+
+ stopServer(server1);
+ stopServer(server2);
}
+ /**
+ * Tests query execution on region with indexes.
+ */
@Test
public void testQueryMonitorRegionWithIndex() throws Exception {
- gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE")
- .statusIsSuccess();
- locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2);
-
- // create the indices using API
- VMProvider.invokeInEveryMember(() -> {
- // create index.
- QueryService cacheQS = ClusterStartupRule.getCache().getQueryService();
- cacheQS.createIndex("idIndex", "p.ID", "/exampleRegion p");
- cacheQS.createIndex("statusIndex", "p.status", "/exampleRegion p");
- cacheQS.createIndex("secIdIndex", "pos.secId", "/exampleRegion p, p.positions.values pos");
- cacheQS.createIndex("posIdIndex", "pos.Id", "/exampleRegion p, p.positions.values pos");
- cacheQS.createKeyIndex("pkIndex", "pk", "/exampleRegion");
- cacheQS.createKeyIndex("pkidIndex", "pkid", "/exampleRegion");
- populateRegion(0, 150);
- }, server1, server2);
-
- // client3 connects to server1, client4 connects to server2
- int server1Port = server1.getPort();
- int server2Port = server2.getPort();
- client3 = cluster.startClientVM(3, true, server1Port);
- client4 = cluster.startClientVM(4, true, server2Port);
-
- client3.invoke(() -> exuteQuery());
- client4.invoke(() -> exuteQuery());
+
+ setup(2);
+
+ final Host host = Host.getHost(0);
+
+ VM server1 = host.getVM(0);
+ VM server2 = host.getVM(1);
+ VM client1 = host.getVM(2);
+ VM client2 = host.getVM(3);
+
+ final int numberOfEntries = 100;
+
+ String serverHostName = NetworkUtils.getServerHostName(host);
+
+ // Start server
+ int serverPort1 =
+ server1.invoke("configServer", () -> configServer(20, "testQueryMonitorRegionWithIndex"));// All
+ // the
+ // queries
+ // taking
+ // more
+ // than
+ // 20ms
+ // should
+ // be
+ // canceled
+ // by
+ // Query
+ // monitor.
+ server1.invoke("createExampleRegion", () -> createExampleRegion());
+
+ int serverPort2 =
+ server2.invoke("configServer", () -> configServer(20, "testQueryMonitorRegionWithIndex"));// All
+ // the
+ // queries
+ // taking
+ // more
+ // than
+ // 20ms
+ // should
+ // be
+ // canceled
+ // by
+ // Query
+ // monitor.
+ server2.invoke("createExampleRegion", () -> createExampleRegion());
+
+ // Initialize server regions.
+ server1.invoke("Create Indexes", () -> createIndexes(numberOfEntries));
+
+ // Initialize server regions.
+ server2.invoke("Create Indexes", () -> createIndexes(numberOfEntries));
+
+ // Initialize Client1
+ client1.invoke("Init client", () -> configClient(serverHostName, serverPort1));
+
+ // Initialize Client2
+ client2.invoke("Init client", () -> configClient(serverHostName, serverPort2));
+
+ // Execute client queries
+ client1.invoke("executeQueriesFromClient", () -> executeQueriesFromClient(20));
+ client2.invoke("executeQueriesFromClient", () -> executeQueriesFromClient(20));
+
+ stopServer(server1);
+ stopServer(server2);
+ }
+
+ private void createIndexes(int numberOfEntries) throws Exception {
+ Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);
+
+ // create index.
+ QueryService cacheQS = cacheRule.getCache().getQueryService();
+ cacheQS.createIndex("idIndex", "p.ID", "/exampleRegion p");
+ cacheQS.createIndex("statusIndex", "p.status", "/exampleRegion p");
+ cacheQS.createIndex("secIdIndex", "pos.secId", "/exampleRegion p, p.positions.values pos");
+ cacheQS.createIndex("posIdIndex", "pos.Id", "/exampleRegion p, p.positions.values pos");
+ cacheQS.createKeyIndex("pkIndex", "pk", "/exampleRegion");
+ cacheQS.createKeyIndex("pkidIndex", "pkid", "/exampleRegion");
+
+ for (int i = (1 + 100); i <= (numberOfEntries + 100); i++) {
+ exampleRegion.put("" + i, new Portfolio(i));
+ }
}
+ /**
+ * The following CQ test is added to make sure testMaxQueryExecutionTime is reset and is not
+ * affecting other query related tests.
+ *
+ */
@Test
public void testCqExecuteDoesNotGetAffectedByTimeout() throws Exception {
- gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE")
- .statusIsSuccess();
- locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2);
- server1.invoke(() -> populateRegion(0, 100));
-
- int server1Port = server1.getPort();
- client3 = cluster.startClientVM(3, ccf -> {
- configureClientCacheFactory(ccf, server1Port);
+ setup(1);
+
+ final Host host = Host.getHost(0);
+ VM server = host.getVM(0);
+ VM client = host.getVM(1);
+ VM producerClient = host.getVM(2);
+
+ // Start server
+ int serverPort =
+ server.invoke("configServer", () -> configServer(20, "testQueryMonitorRegionWithIndex"));// All
+ server.invoke("createExampleRegion", () -> createExampleRegion());
+
+
+ final String host0 = NetworkUtils.getServerHostName(server.getHost());
+
+ // Create client.
+ client.invoke("createClient", () -> configClient(true, host.getHostName(), serverPort));
+
+ final int size = 10;
+ final String name = "testQuery_4";
+ server.invoke(() -> {
+ Region region = cacheRule.getCache().getRegion(exampleRegionName);
+ for (int i = 1; i <= size; i++) {
+ region.put("key" + i, new Portfolio(i));
+ }
});
- client3.invoke(() -> {
+ // create and execute cq
+ client.invoke(() -> {
String cqName = "testCQForQueryMonitorDUnitTest";
- String query = "select * from /exampleRegion";
+ String query = "select * from /" + exampleRegionName;
// Get CQ Service.
- QueryService cqService = ClusterStartupRule.getClientCache().getQueryService();
+ QueryService cqService = null;
+ cqService = clientCacheRule.getClientCache().getQueryService();
// Create CQ Attributes.
CqAttributesFactory cqf = new CqAttributesFactory();
@@ -256,212 +857,278 @@ public class QueryMonitorDUnitTest {
CqQuery cq1 = cqService.newCq(cqName, query, cqa);
cq1.execute();
});
-
- server1.invoke(() -> {
- populateRegion(0, 150);
- });
}
@Test
- public void testCacheOpAfterQueryCancel() throws Exception {
- int locatorPort = locator.getPort();
- // start up more servers
- MemberVM server3 = cluster.startServerVM(3, locatorPort);
+ public void testCqProcessingDoesNotGetAffectedByTimeout() throws Exception {
+ setup(1);
- server3.invoke(() -> {
- DefaultQuery.testHook = new QueryTimeoutHook();
- GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = MAX_QUERY_EXECUTE_TIME;
- });
+ final Host host = Host.getHost(0);
+ VM server = host.getVM(0);
+ VM client = host.getVM(1);
+ VM producerClient = host.getVM(2);
- gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE")
- .statusIsSuccess();
- locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 3);
+ // Start server
+ int serverPort =
+ server.invoke("configServer", () -> configServer(20, "testQueryMonitorRegionWithIndex"));// All
+ server.invoke("createExampleRegion", () -> createExampleRegion());
- server1.invoke(() -> {
- QueryService queryService = ClusterStartupRule.getCache().getQueryService();
- queryService.createIndex("statusIndex", "status", "/exampleRegion");
- queryService.createIndex("secIdIndex", "pos.secId",
- "/exampleRegion p, p.positions.values pos");
- populateRegion(100, 1000);
- });
- AsyncInvocation ai1 = server1.invokeAsync(() -> {
- for (int j = 0; j < 5; j++) {
- populateRegion(0, 2000);
+ final String host0 = NetworkUtils.getServerHostName(server.getHost());
+
+ // Create client.
+ client.invoke("createClient", () -> configClient(true, host.getHostName(), serverPort));
+
+ final int size = 10;
+ final String name = "testQuery_4";
+ server.invoke(() -> {
+ Region region = cacheRule.getCache().getRegion(exampleRegionName);
+ for (int i = 1; i <= size; i++) {
+ region.put("key" + i, new Portfolio(i));
}
});
- AsyncInvocation ai2 = server2.invokeAsync(() -> {
- for (int j = 0; j < 5; j++) {
- populateRegion(1000, 3000);
+ // create and execute cq
+ client.invoke(() -> {
+ String cqName = "testCQForQueryMonitorDUnitTest";
+ String query = "select * from /" + exampleRegionName;
+ // Get CQ Service.
+ QueryService cqService = null;
+ cqService = clientCacheRule.getClientCache().getQueryService();
+
+ // Create CQ Attributes.
+ CqAttributesFactory cqf = new CqAttributesFactory();
+ CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())};
+ cqf.initCqListeners(cqListeners);
+ CqAttributes cqa = cqf.create();
+
+ CqQuery cq1 = cqService.newCq(cqName, query, cqa);
+ cq1.execute();
+ });
+
+ server.invoke(() -> {
+ Region region = cacheRule.getCache().getRegion(exampleRegionName);
+ for (int i = 1; i <= size; i++) {
+ region.put("key" + i, new Portfolio(i));
}
});
+ }
+
+ /**
+ * Tests cache operation right after query cancellation.
+ */
+ @Test
+ public void testCacheOpAfterQueryCancel() throws Exception {
+
+ setup(4);
+
+ final Host host = Host.getHost(0);
+
+ VM server1 = host.getVM(0);
+ VM server2 = host.getVM(1);
+ VM server3 = host.getVM(2);
+ VM server4 = host.getVM(3);
- // server3 performs a region put after a query is canceled.
- AsyncInvocation ai3 = server3.invokeAsync(() -> {
- Region exampleRegion = ClusterStartupRule.getCache().getRegion("exampleRegion");
- QueryService queryService = GemFireCacheImpl.getInstance().getQueryService();
- String qStr =
- "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values pos1, p.positions.values pos"
- + " where p.ID < pos.sharesOutstanding OR p.ID > 0 OR p.position1.mktValue > 0 "
- + " OR pos.secId in SET ('SUN', 'IBM', 'YHOO', 'GOOG', 'MSFT', "
- + " 'AOL', 'APPL', 'ORCL', 'SAP', 'DELL', 'RHAT', 'NOVL', 'HP')"
- + " order by p.status, p.ID desc";
- for (int i = 0; i < 100; i++) {
+ final int numberOfEntries = 1000;
+
+ // Start server
+ server1.invoke("Create BridgeServer", () -> configServer(5, "testQueryExecutionLocally"));
+ server1.invoke("Create Partition Regions", () -> createExamplePRRegion());
+
+ server2.invoke("Create BridgeServer", () -> configServer(5, "testQueryExecutionLocally"));
+ server2.invoke("Create Partition Regions", () -> createExamplePRRegion());
+
+ server3.invoke("Create BridgeServer", () -> configServer(5, "testQueryExecutionLocally"));
+ server3.invoke("Create Partition Regions", () -> createExamplePRRegion());
+
+ server4.invoke("Create BridgeServer", () -> configServer(5, "testQueryExecutionLocally"));
+ server4.invoke("Create Partition Regions", () -> createExamplePRRegion());
+
+ server1.invoke(new CacheSerializableRunnable("Create Bridge Server") {
+ public void run2() throws CacheException {
try {
- Query query = queryService.newQuery(qStr);
- query.execute();
- fail("The query should have been canceled by the QueryMonitor. Query: " + qStr);
- } catch (QueryExecutionTimeoutException qet) {
+ QueryService queryService = GemFireCacheImpl.getInstance().getQueryService();
+ queryService.createIndex("statusIndex", "status", "/exampleRegion");
+ queryService.createIndex("secIdIndex", "pos.secId",
+ "/exampleRegion p, p.positions.values pos");
+ } catch (Exception ex) {
+ fail("Failed to create index.");
+ }
+ Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);
+ for (int i = 100; i <= (numberOfEntries); i++) {
exampleRegion.put("" + i, new Portfolio(i));
}
}
});
- ai1.await();
- ai2.await();
- ai3.await();
+ // Initialize server regions.
+ AsyncInvocation ai1 =
+ server1.invokeAsync(new CacheSerializableRunnable("Create Bridge Server") {
+ public void run2() throws CacheException {
+ Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);
+ for (int j = 0; j < 5; j++) {
+ for (int i = 1; i <= (numberOfEntries + 1000); i++) {
+ exampleRegion.put("" + i, new Portfolio(i));
+ }
+ }
+ LogWriterUtils.getLogWriter()
+ .info("### Completed updates in server1 in testCacheOpAfterQueryCancel");
+ }
+ });
+
+ AsyncInvocation ai2 =
+ server2.invokeAsync(new CacheSerializableRunnable("Create Bridge Server") {
+ public void run2() throws CacheException {
+ Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);
+ for (int j = 0; j < 5; j++) {
+ for (int i = (1 + 1000); i <= (numberOfEntries + 2000); i++) {
+ exampleRegion.put("" + i, new Portfolio(i));
+ }
+ }
+ LogWriterUtils.getLogWriter()
+ .info("### Completed updates in server2 in testCacheOpAfterQueryCancel");
+ }
+ });
+
+ // Execute server queries
+ SerializableRunnable executeQuery = new CacheSerializableRunnable("Execute queries") {
+ public void run2() throws CacheException {
+ try {
+ Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);
+ QueryService queryService = GemFireCacheImpl.getInstance().getQueryService();
+ String qStr =
+ "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values pos1, p.positions.values pos"
+ + " where p.ID < pos.sharesOutstanding OR p.ID > 0 OR p.position1.mktValue > 0 "
+ + " OR pos.secId in SET ('SUN', 'IBM', 'YHOO', 'GOOG', 'MSFT', "
+ + " 'AOL', 'APPL', 'ORCL', 'SAP', 'DELL', 'RHAT', 'NOVL', 'HP')"
+ + " order by p.status, p.ID desc";
+ for (int i = 0; i < 500; i++) {
+ try {
+ GemFireCacheImpl.getInstance().getLogger().info("Executing query :" + qStr);
+ Query query = queryService.newQuery(qStr);
+ query.execute();
+ } catch (QueryExecutionTimeoutException qet) {
+ LogWriterUtils.getLogWriter()
+ .info("### Got Expected QueryExecutionTimeout exception. " + qet.getMessage());
+ if (qet.getMessage().contains("cancelled after exceeding max execution")) {
+ LogWriterUtils.getLogWriter().info("### Doing a put operation");
+ exampleRegion.put("" + i, new Portfolio(i));
+ }
+ } catch (Exception e) {
+ fail("Exception executing query." + e.getMessage());
+ }
+ }
+ LogWriterUtils.getLogWriter()
+ .info("### Completed Executing queries in testCacheOpAfterQueryCancel");
+ } catch (Exception ex) {
+ Assert.fail("Exception creating the query service", ex);
+ }
+ }
+ };
+
+ AsyncInvocation ai3 = server3.invokeAsync(executeQuery);
+ AsyncInvocation ai4 = server4.invokeAsync(executeQuery);
+
+ LogWriterUtils.getLogWriter()
+ .info("### Waiting for async threads to join in testCacheOpAfterQueryCancel");
+ try {
+ ThreadUtils.join(ai1, 5 * 60 * 1000);
+ ThreadUtils.join(ai2, 5 * 60 * 1000);
+ ThreadUtils.join(ai3, 5 * 60 * 1000);
+ ThreadUtils.join(ai4, 5 * 60 * 1000);
+ } catch (Exception ex) {
+ fail("Async thread join failure");
+ }
+ LogWriterUtils.getLogWriter()
+ .info("### DONE Waiting for async threads to join in testCacheOpAfterQueryCancel");
- server3.invoke(() -> {
- DefaultQuery.testHook = null;
- GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = -1;
- });
- }
+ validateQueryMonitorThreadCnt(server1, 0, 1000);
+ validateQueryMonitorThreadCnt(server2, 0, 1000);
+ validateQueryMonitorThreadCnt(server3, 0, 1000);
+ validateQueryMonitorThreadCnt(server4, 0, 1000);
+ LogWriterUtils.getLogWriter()
+ .info("### DONE validating query monitor threads testCacheOpAfterQueryCancel");
- private static void populateRegion(int startingId, int endingId) {
- Region exampleRegion = ClusterStartupRule.getCache().getRegion("exampleRegion");
- for (int i = startingId; i < endingId; i++) {
- exampleRegion.put("" + i, new Portfolio(i));
- }
+ stopServer(server1);
+ stopServer(server2);
+ stopServer(server3);
+ stopServer(server4);
}
- private static void exuteQuery() {
- QueryService queryService;
- if (ClusterStartupRule.getClientCache() == null) {
- queryService = ClusterStartupRule.getCache().getQueryService();
- } else {
- queryService = ClusterStartupRule.getClientCache().getQueryService();
- }
- for (int k = 0; k < queryStr.length; k++) {
- String qStr = queryStr[k];
- try {
- Query query = queryService.newQuery(qStr);
- query.execute();
- fail("The query should have been canceled by the QueryMonitor. Query: " + qStr);
- } catch (Exception e) {
- verifyException(e);
- }
- }
+ private void validateQueryMonitorThreadCnt(VM vm, final int threadCount, final int waitTime) {
+ SerializableRunnable validateThreadCnt =
+ new CacheSerializableRunnable("validateQueryMonitorThreadCnt") {
+ public void run2() throws CacheException {
+ Cache cache = cacheRule.getCache();
+ QueryMonitor qm = ((GemFireCacheImpl) cache).getQueryMonitor();
+ if (qm == null) {
+ fail("Didn't found query monitor.");
+ }
+ int waited = 0;
+ while (true) {
+ if (qm.getQueryMonitorThreadCount() != threadCount) {
+ if (waited <= waitTime) {
+ Wait.pause(10);
+ waited += 10;
+ continue;
+ } else {
+ fail("Didn't found expected monitoring thread. Expected: " + threadCount
+ + " found :" + qm.getQueryMonitorThreadCount());
+ }
+ }
+ break;
+ }
+ }
+ };
+ vm.invoke(validateThreadCnt);
}
- private static void configureClientCacheFactory(ClientCacheFactory ccf, int... serverPorts) {
- for (int serverPort : serverPorts) {
- ccf.addPoolServer("localhost", serverPort);
- }
- ccf.setPoolReadTimeout(10 * 60 * 1000); // 10 min
- ccf.setPoolSubscriptionEnabled(true);
+ /**
+ * Starts a bridge server on the given port, using the given deserializeValues and
+ * notifyBySubscription to serve up the given region.
+ */
+ protected int startBridgeServer(int port, boolean notifyBySubscription) throws IOException {
+
+ Cache cache = cacheRule.getCache();
+ CacheServer bridge = cache.addCacheServer();
+ bridge.setPort(port);
+ bridge.setNotifyBySubscription(notifyBySubscription);
+ bridge.start();
+ return bridge.getPort();
}
- private static void createReplicateRegionWithEviction(File workingDir) {
- InternalCache cache = ClusterStartupRule.getCache();
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- dsf.setDiskDirs(new File[] {workingDir}).create("ds");
- EvictionAttributes evictAttrs =
- EvictionAttributes.createLRUEntryAttributes(100, EvictionAction.OVERFLOW_TO_DISK);
- cache.createRegionFactory(RegionShortcut.REPLICATE)
- .setDiskStoreName("ds")
- .setEvictionAttributes(evictAttrs)
- .create("exampleRegion");
+ /**
+ * Stops the bridge server that serves up the given cache.
+ */
+ private void stopBridgeServer(Cache cache) {
+ CacheServer bridge = (CacheServer) cache.getCacheServers().iterator().next();
+ bridge.stop();
+ assertFalse(bridge.isRunning());
}
- private static void verifyException(Exception e) {
- String error = e.getMessage();
- if (e.getCause() != null) {
- error = e.getCause().getMessage();
- }
+ private class QueryTimeoutHook implements DefaultQuery.TestHook {
- if (error.contains("Query execution cancelled after exceeding max execution time")
- || error.contains("The Query completed sucessfully before it got canceled")
- || error.contains("The QueryMonitor thread may be sleeping longer than the set sleep time")
- || error.contains(
- "The query task could not be found but the query is marked as having been canceled")) {
- // Expected exception.
- return;
- }
+ long timeout;
- System.out.println("Unexpected exception:");
- if (e.getCause() != null) {
- e.getCause().printStackTrace();
- } else {
- e.printStackTrace();
+ private QueryTimeoutHook(long timeout) {
+ this.timeout = timeout;
}
- fail("Expected exception Not found. Expected exception with message: \n"
- + "\"Query execution taking more than the max execution time\"" + "\n Found \n" + error);
- }
-
-
- @After
- public void reset() {
- VMProvider.invokeInEveryMember(() -> {
- DefaultQuery.testHook = null;
- GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = -1;
- }, server1, server2);
- }
+ public void doTestHook(String description) {
+ if (description.equals("6")) {
+ try {
+ Thread.sleep(timeout * 2);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
- private static class QueryTimeoutHook implements DefaultQuery.TestHook {
public void doTestHook(int spot) {
- if (spot != 6) {
- return;
- }
- try {
- TimeUnit.MILLISECONDS.sleep(20);
- } catch (InterruptedException e) {
- throw new RuntimeException(e.getMessage(), e);
- }
+ doTestHook("" + spot);
}
- }
- private static String[] queryStr = {"SELECT ID FROM /exampleRegion p WHERE p.ID > 100",
- "SELECT DISTINCT * FROM /exampleRegion x, x.positions.values WHERE x.pk != '1000'",
- "SELECT DISTINCT * FROM /exampleRegion x, x.positions.values WHERE x.pkid != '1'",
- "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values WHERE p.pk > '1'",
- "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values WHERE p.pkid != '53'",
- "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE pos.Id > 100",
- "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE pos.Id > 100 and pos.secId IN SET('YHOO', 'IBM', 'AMZN')",
- "SELECT * FROM /exampleRegion p WHERE p.ID > 100 and p.status = 'active' and p.ID < 100000",
- "SELECT * FROM /exampleRegion WHERE ID > 100 and status = 'active'",
- "SELECT DISTINCT * FROM /exampleRegion p WHERE p.ID > 100 and p.status = 'active' and p.ID < 100000",
- "SELECT DISTINCT ID FROM /exampleRegion WHERE status = 'active'",
- "SELECT DISTINCT ID FROM /exampleRegion p WHERE p.status = 'active'",
- "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE pos.secId IN SET('YHOO', 'IBM', 'AMZN')",
- "SELECT DISTINCT proj1:p, proj2:itrX FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos"
- + " WHERE pos.secId = 'YHOO') as itrX",
- "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos"
- + " WHERE pos.secId = 'YHOO') as itrX",
- "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT p.ID FROM /exampleRegion x"
- + " WHERE x.ID = p.ID) as itrX",
- "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion x, x.positions.values pos"
- + " WHERE x.ID = p.ID) as itrX",
- "SELECT DISTINCT x.ID FROM /exampleRegion x, x.positions.values v WHERE "
- + "v.secId = element(SELECT DISTINCT vals.secId FROM /exampleRegion p, p.positions.values vals WHERE vals.secId = 'YHOO')",
- "SELECT DISTINCT * FROM /exampleRegion p, positions.values pos WHERE (p.ID > 1 or p.status = 'active') or (true AND pos.secId ='IBM')",
- "SELECT DISTINCT * FROM /exampleRegion p, positions.values pos WHERE (p.ID > 1 or p.status = 'active') or (true AND pos.secId !='IBM')",
- "SELECT DISTINCT structset.sos, structset.key "
- + "FROM /exampleRegion p, p.positions.values outerPos, "
- + "(SELECT DISTINCT key: key, sos: pos.sharesOutstanding "
- + "FROM /exampleRegion.entries pf, pf.value.positions.values pos "
- + "where outerPos.secId != 'IBM' AND "
- + "pf.key IN (SELECT DISTINCT * FROM pf.value.collectionHolderMap['0'].arr)) structset "
- + "where structset.sos > 2000",
- "SELECT DISTINCT * " + "FROM /exampleRegion p, p.positions.values outerPos, "
- + "(SELECT DISTINCT key: key, sos: pos.sharesOutstanding "
- + "FROM /exampleRegion.entries pf, pf.value.positions.values pos "
- + "where outerPos.secId != 'IBM' AND "
- + "pf.key IN (SELECT DISTINCT * FROM pf.value.collectionHolderMap['0'].arr)) structset "
- + "where structset.sos > 2000",
- "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values position "
- + "WHERE (true = null OR position.secId = 'SUN') AND true",};
+ }
}