You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2018/08/06 20:28:48 UTC
[geode] branch develop updated: GEODE-577: rewrite
QueryMonitorDUnitTest (#2179)
This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 38e1714 GEODE-577: rewrite QueryMonitorDUnitTest (#2179)
38e1714 is described below
commit 38e1714b54894caafa508dab9634d62a9c4c42fc
Author: jinmeiliao <ji...@pivotal.io>
AuthorDate: Mon Aug 6 13:28:44 2018 -0700
GEODE-577: rewrite QueryMonitorDUnitTest (#2179)
* code cleanup.
* add QueryMonitor unit test
* do not add cq query to the monitor queue
---
.../ResourceManagerWithQueryMonitorDUnitTest.java | 23 +-
.../geode/cache/query/internal/DefaultQuery.java | 10 +-
.../geode/cache/query/internal/QueryMonitor.java | 102 +-
.../cache/PartitionedRegionQueryEvaluator.java | 6 +-
.../cache/query/internal/QueryMonitorTest.java | 91 ++
.../cache/query/dunit/QueryMonitorDUnitTest.java | 1299 +++++---------------
6 files changed, 457 insertions(+), 1074 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java
index 598bde1..80f2192 100755
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java
@@ -870,7 +870,7 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
ServerOperationException soe = (ServerOperationException) e;
if (soe.getRootCause() instanceof QueryException) {
QueryException qe = (QueryException) soe.getRootCause();
- if (isExceptionDueToTimeout(qe, queryTimeout)) {
+ if (isExceptionDueToTimeout(qe)) {
logger.info("Query Execution must be terminated by a timeout. Expected behavior");
return 0;
}
@@ -952,7 +952,7 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
// meaning the query should not be canceled due to low memory
throw new CacheException("Query should not have been canceled due to memory") {};
}
- } else if (isExceptionDueToTimeout((QueryException) e, queryTimeout)) {
+ } else if (isExceptionDueToTimeout((QueryException) e)) {
if (queryTimeout == -1) {
// no time out set, this should not be thrown
@@ -976,7 +976,7 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
// meaning the query should not be canceled due to low memory
throw new CacheException("Query should not have been canceled due to memory") {};
}
- } else if (isExceptionDueToTimeout(qe, queryTimeout)) {
+ } else if (isExceptionDueToTimeout(qe)) {
if (queryTimeout == -1) {
e.printStackTrace();
// no time out set, this should not be thrown
@@ -1219,13 +1219,12 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
.toLocalizedString()));
}
- private boolean isExceptionDueToTimeout(QueryException e, long queryTimeout) {
+ private boolean isExceptionDueToTimeout(QueryException e) {
String message = e.getMessage();
// -1 needs to be matched due to client/server set up, BaseCommand uses the
// MAX_QUERY_EXECUTION_TIME and not the testMaxQueryExecutionTime
return (message.contains("The QueryMonitor thread may be sleeping longer than")
- || message.contains(LocalizedStrings.QueryMonitor_LONG_RUNNING_QUERY_CANCELED
- .toLocalizedString(queryTimeout))
+ || message.contains("Query execution cancelled after exceeding max execution time")
|| message.contains(
LocalizedStrings.QueryMonitor_LONG_RUNNING_QUERY_CANCELED.toLocalizedString(-1)));
}
@@ -1261,10 +1260,6 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
}
}
- public void doTestHook(String description) {
-
- }
-
public void countDown() {
latch.countDown();
}
@@ -1288,10 +1283,6 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
}
}
}
-
- public void doTestHook(String description) {
-
- }
}
private class CancelDuringAddResultsHook implements DefaultQuery.TestHook {
@@ -1315,9 +1306,5 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
rejectedObjects = true;
}
}
-
- public void doTestHook(String description) {
-
- }
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
index fad0e0f..1053d8d 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
@@ -716,11 +716,9 @@ public class DefaultQuery implements Query {
/**
* The query gets canceled by the QueryMonitor with the reason being specified
- * <p>
- * TODO: parameter isCanceled is always true
*/
- public void setCanceled(boolean isCanceled, CacheRuntimeException canceledException) {
- this.isCanceled = isCanceled;
+ public void setCanceled(CacheRuntimeException canceledException) {
+ this.isCanceled = true;
this.canceledException = canceledException;
}
@@ -985,8 +983,8 @@ public class DefaultQuery implements Query {
}
public interface TestHook {
- void doTestHook(int spot);
+ default void doTestHook(int spot) {};
- void doTestHook(String spot);
+ default void doTestHook(String spot) {};
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java
index ccba334..f7750bc 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java
@@ -14,21 +14,16 @@
*/
package org.apache.geode.cache.query.internal;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.Logger;
-import org.apache.geode.cache.query.Query;
import org.apache.geode.cache.query.QueryExecutionLowMemoryException;
import org.apache.geode.cache.query.QueryExecutionTimeoutException;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.logging.log4j.LocalizedMessage;
/**
* QueryMonitor class, monitors the query execution time. Instantiated based on the system property
@@ -46,13 +41,12 @@ public class QueryMonitor implements Runnable {
private static final Logger logger = LogService.getLogger();
private final InternalCache cache;
- private boolean testingQueryMonitor = false;
/**
* Holds the query execution status for the thread executing the query. FALSE if the query is not
* canceled due to max query execution timeout. TRUE it the query is canceled due to max query
* execution timeout timeout.
*/
- private static final ThreadLocal<AtomicBoolean> queryExecutionStatus =
+ private static final ThreadLocal<AtomicBoolean> queryCancelled =
ThreadLocal.withInitial(() -> new AtomicBoolean(Boolean.FALSE));
private final long maxQueryExecutionTime;
@@ -63,9 +57,6 @@ public class QueryMonitor implements Runnable {
private final AtomicBoolean stopped = new AtomicBoolean(Boolean.FALSE);
- /** For DUnit test purpose TODO: delete this ConcurrentMap */
- private ConcurrentMap queryMonitorTasks = null;
-
// Variables for cancelling queries due to low memory
private static volatile Boolean LOW_MEMORY = Boolean.FALSE;
@@ -82,14 +73,19 @@ public class QueryMonitor implements Runnable {
* @param queryThread Thread executing the query.
* @param query Query.
*/
- public void monitorQueryThread(Thread queryThread, Query query) {
+ public void monitorQueryThread(Thread queryThread, DefaultQuery query) {
+ // cq query is not monitored
+ if (query.isCqQuery()) {
+ return;
+ }
+
if (LOW_MEMORY) {
String reason = LocalizedStrings.QueryMonitor_LOW_MEMORY_CANCELED_QUERY
.toLocalizedString(LOW_MEMORY_USED_BYTES);
- ((DefaultQuery) query).setCanceled(true, new QueryExecutionLowMemoryException(reason));
+ query.setCanceled(new QueryExecutionLowMemoryException(reason));
throw new QueryExecutionLowMemoryException(reason);
}
- QueryThreadTask queryTask = new QueryThreadTask(queryThread, query, queryExecutionStatus.get());
+ QueryThreadTask queryTask = new QueryThreadTask(queryThread, query, queryCancelled.get());
synchronized (queryThreads) {
queryThreads.add(queryTask);
queryThreads.notifyAll();
@@ -101,28 +97,19 @@ public class QueryMonitor implements Runnable {
queryThreads.size(), queryThread.getId(), query.getQueryString(), queryThread);
}
- // For dunit test purpose
- if (cache != null && testingQueryMonitor) {
- if (this.queryMonitorTasks == null) {
- this.queryMonitorTasks = new ConcurrentHashMap();
- }
- this.queryMonitorTasks.put(queryThread, queryTask);
- }
}
/**
* Stops monitoring the query. Removes the passed thread from QueryMonitor queue.
*/
- public void stopMonitoringQueryThread(Thread queryThread, Query query) {
+ public void stopMonitoringQueryThread(Thread queryThread, DefaultQuery query) {
// Re-Set the queryExecution status on the LocalThread.
QueryExecutionTimeoutException testException = null;
- DefaultQuery defaultQuery = (DefaultQuery) query;
- boolean[] queryCompleted = defaultQuery.getQueryCompletedForMonitoring();
+ boolean[] queryCompleted = query.getQueryCompletedForMonitoring();
synchronized (queryCompleted) {
- queryExecutionStatus.get().getAndSet(Boolean.FALSE);
-
- defaultQuery.setQueryCompletedForMonitoring(true);
+ queryCancelled.get().getAndSet(Boolean.FALSE);
+ query.setQueryCompletedForMonitoring(true);
// Remove the query task from the queue.
queryThreads.remove(new QueryThreadTask(queryThread, null, null));
}
@@ -147,7 +134,7 @@ public class QueryMonitor implements Runnable {
* gemfire.Cache.MAX_QUERY_EXECUTION_TIME
*/
public static void isQueryExecutionCanceled() {
- if (queryExecutionStatus.get() != null && queryExecutionStatus.get().get()) {
+ if (queryCancelled.get() != null && queryCancelled.get().get()) {
throw new QueryExecutionCanceledException();
}
}
@@ -182,24 +169,21 @@ public class QueryMonitor implements Runnable {
try {
QueryThreadTask queryTask;
long sleepTime;
- // TODO: while-block cannot complete without throwing
while (true) {
// Get the first query task from the queue. This query will have the shortest
// remaining time that needs to canceled first.
queryTask = (QueryThreadTask) queryThreads.peek();
if (queryTask == null) {
- // Empty queue.
synchronized (queryThreads) {
queryThreads.wait();
}
continue;
}
- long currentTime = System.currentTimeMillis();
-
+ long executionTime = System.currentTimeMillis() - queryTask.StartTime;
// Check if the sleepTime is greater than the remaining query execution time.
- if (currentTime - queryTask.StartTime < this.maxQueryExecutionTime) {
- sleepTime = this.maxQueryExecutionTime - (currentTime - queryTask.StartTime);
+ if (executionTime < this.maxQueryExecutionTime) {
+ sleepTime = this.maxQueryExecutionTime - executionTime;
// Its been noted that the sleep is not guaranteed to wait for the specified
// time (as stated in Suns doc too), it depends on the OSs thread scheduling
// behavior, hence thread may sleep for longer than the specified time.
@@ -207,33 +191,23 @@ public class QueryMonitor implements Runnable {
Thread.sleep(sleepTime);
continue;
}
-
- // Query execution has taken more than the max time, Set queryExecutionStatus flag
- // to canceled (TRUE).
- boolean[] queryCompleted =
- ((DefaultQuery) queryTask.query).getQueryCompletedForMonitoring();
+ // Query execution has taken more than the max time, Set queryCancelled flag
+ // to true.
+ boolean[] queryCompleted = queryTask.query.getQueryCompletedForMonitoring();
synchronized (queryCompleted) {
- if (!queryCompleted[0] && !((DefaultQuery) queryTask.query).isCqQuery()) { // Check if the
- // query is
- // already
- // completed.
- ((DefaultQuery) queryTask.query).setCanceled(true,
- new QueryExecutionTimeoutException(
- LocalizedStrings.QueryMonitor_LONG_RUNNING_QUERY_CANCELED
- .toLocalizedString(GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME)));
- queryTask.queryExecutionStatus.set(Boolean.TRUE);
- // Remove the task from queue.
+ // check if query is already completed
+ if (!queryCompleted[0]) {
+ queryTask.query.setCanceled(new QueryExecutionTimeoutException(String
+ .format("Query execution cancelled after exceeding max execution time %sms.",
+ this.maxQueryExecutionTime)));
+ queryTask.queryCancelled.set(Boolean.TRUE);
+ // remove the query threads from monitoring queue
queryThreads.poll();
+ logger.info(String.format(
+ "%s is set as canceled after %s milliseconds", queryTask.toString(),
+ executionTime));
}
}
-
- logger.info(LocalizedMessage.create(
- LocalizedStrings.GemFireCache_LONG_RUNNING_QUERY_EXECUTION_CANCELED,
- new Object[] {queryTask.query.getQueryString(), queryTask.queryThread.getId()}));
-
- if (logger.isDebugEnabled()) {
- logger.debug("Query Execution for the thread {} got canceled.", queryTask.queryThread);
- }
}
} catch (InterruptedException ignore) {
if (logger.isDebugEnabled()) {
@@ -275,15 +249,15 @@ public class QueryMonitor implements Runnable {
}
private void cancelQueryDueToLowMemory(QueryThreadTask queryTask, long memoryThreshold) {
- boolean[] queryCompleted = ((DefaultQuery) queryTask.query).getQueryCompletedForMonitoring();
+ boolean[] queryCompleted = queryTask.query.getQueryCompletedForMonitoring();
synchronized (queryCompleted) {
if (!queryCompleted[0]) {
// cancel if query is not completed
String reason = LocalizedStrings.QueryMonitor_LOW_MEMORY_CANCELED_QUERY
.toLocalizedString(memoryThreshold);
- ((DefaultQuery) queryTask.query).setCanceled(true,
+ queryTask.query.setCanceled(
new QueryExecutionLowMemoryException(reason));
- queryTask.queryExecutionStatus.set(Boolean.TRUE);
+ queryTask.queryCancelled.set(Boolean.TRUE);
}
}
}
@@ -305,16 +279,16 @@ public class QueryMonitor implements Runnable {
final Thread queryThread;
// package-private to avoid synthetic accessor
- final Query query;
+ final DefaultQuery query;
// package-private to avoid synthetic accessor
- final AtomicBoolean queryExecutionStatus;
+ final AtomicBoolean queryCancelled;
- QueryThreadTask(Thread queryThread, Query query, AtomicBoolean queryExecutionStatus) {
+ QueryThreadTask(Thread queryThread, DefaultQuery query, AtomicBoolean queryCancelled) {
this.StartTime = System.currentTimeMillis();
this.queryThread = queryThread;
this.query = query;
- this.queryExecutionStatus = queryExecutionStatus;
+ this.queryCancelled = queryCancelled;
}
@Override
@@ -341,7 +315,7 @@ public class QueryMonitor implements Runnable {
return new StringBuilder().append("QueryThreadTask[StartTime:").append(this.StartTime)
.append(", queryThread:").append(this.queryThread).append(", threadId:")
.append(this.queryThread.getId()).append(", query:").append(this.query.getQueryString())
- .append(", queryExecutionStatus:").append(this.queryExecutionStatus).append(']')
+ .append(", queryCancelled:").append(this.queryCancelled).append(']')
.toString();
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
index 8a8c61c..76262fc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
@@ -241,7 +241,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
String reason =
LocalizedStrings.QueryMonitor_LOW_MEMORY_WHILE_GATHERING_RESULTS_FROM_PARTITION_REGION
.toLocalizedString();
- query.setCanceled(true, new QueryExecutionLowMemoryException(reason));
+ query.setCanceled(new QueryExecutionLowMemoryException(reason));
} else {
if (logger.isDebugEnabled()) {
logger.debug("query cancelled while gathering results, aborting due to exception "
@@ -762,7 +762,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
String reason =
LocalizedStrings.QueryMonitor_LOW_MEMORY_WHILE_GATHERING_RESULTS_FROM_PARTITION_REGION
.toLocalizedString();
- query.setCanceled(true, new QueryExecutionLowMemoryException(reason));
+ query.setCanceled(new QueryExecutionLowMemoryException(reason));
if (DefaultQuery.testHook != null) {
DefaultQuery.testHook.doTestHook(5);
}
@@ -1103,7 +1103,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
String reason =
LocalizedStrings.QueryMonitor_LOW_MEMORY_WHILE_GATHERING_RESULTS_FROM_PARTITION_REGION
.toLocalizedString();
- query.setCanceled(true, new QueryExecutionLowMemoryException(reason));
+ query.setCanceled(new QueryExecutionLowMemoryException(reason));
this.abort = true;
}
diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java
new file mode 100644
index 0000000..ea04c18
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.awaitility.Awaitility;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.geode.internal.cache.InternalCache;
+
+/**
+ * although max_execution_time is set as 10ms, the monitor thread can sleep more than the specified
+ * time, so query will be cancelled at un-deterministic time after 10ms. We cannot assert on
+ * specific time at which the query will be cancelled. We can only assert that the query will be
+ * cancelled at one point after 10ms.
+ */
+public class QueryMonitorTest {
+
+ private static InternalCache cache;
+ private static QueryMonitor monitor;
+ private static long max_execution_time = 5;
+
+ @BeforeClass
+ public static void setUp() {
+ cache = mock(InternalCache.class);
+ monitor = new QueryMonitor(cache, max_execution_time);
+ Thread monitorThread = new Thread(() -> monitor.run(), "query monitor thread");
+ monitorThread.setDaemon(true);
+ monitorThread.start();
+ }
+
+ @Test
+ public void queryIsCancelled() {
+ List<DefaultQuery> queries = new ArrayList<>();
+ List<Thread> threads = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ DefaultQuery query = new DefaultQuery("query" + i, cache, false);
+ queries.add(query);
+ Thread queryExecutionThread = createQueryExecutionThread(i);
+ threads.add(queryExecutionThread);
+ monitor.monitorQueryThread(queryExecutionThread, query);
+ }
+
+ for (DefaultQuery query : queries) {
+ // make sure the isCancelled flag in Query is set correctly
+ Awaitility.await().until(() -> query.isCanceled());
+ }
+ Awaitility.await().until(() -> monitor.getQueryMonitorThreadCount() == 0);
+ // make sure all thread died
+ for (Thread thread : threads) {
+ Awaitility.await().until(() -> !thread.isAlive());
+ }
+ }
+
+ @Test
+ public void cqQueryIsNotMonitored() {
+ DefaultQuery query = mock(DefaultQuery.class);
+ when(query.isCqQuery()).thenReturn(true);
+ monitor.monitorQueryThread(mock(Thread.class), query);
+ assertThat(monitor.getQueryMonitorThreadCount()).isEqualTo(0);
+ }
+
+ private Thread createQueryExecutionThread(int i) {
+ Thread thread = new Thread(() -> {
+ // make sure the threadlocal variable is updated
+ Awaitility.await().until(() -> QueryMonitor.isQueryExecutionCanceled());
+ });
+ thread.setName("query" + i);
+ return thread;
+ }
+
+}
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryMonitorDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryMonitorDUnitTest.java
index 4bcfd01..871c0b4 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryMonitorDUnitTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryMonitorDUnitTest.java
@@ -12,31 +12,23 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.cache.query.dunit;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
-import org.apache.logging.log4j.Logger;
import org.junit.After;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.apache.geode.cache.AttributesFactory;
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.DiskStoreFactory;
import org.apache.geode.cache.EvictionAction;
import org.apache.geode.cache.EvictionAttributes;
-import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.query.CqAttributes;
@@ -49,804 +41,211 @@ import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.cq.dunit.CqQueryTestListener;
import org.apache.geode.cache.query.data.Portfolio;
import org.apache.geode.cache.query.internal.DefaultQuery;
-import org.apache.geode.cache.query.internal.QueryMonitor;
-import org.apache.geode.cache.server.CacheServer;
-import org.apache.geode.cache30.CacheSerializableRunnable;
-import org.apache.geode.cache30.CacheTestCase;
import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.test.dunit.Assert;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.test.dunit.AsyncInvocation;
-import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.LogWriterUtils;
-import org.apache.geode.test.dunit.NetworkUtils;
-import org.apache.geode.test.dunit.SerializableRunnable;
-import org.apache.geode.test.dunit.ThreadUtils;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.rules.CacheRule;
-import org.apache.geode.test.dunit.rules.ClientCacheRule;
-import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.junit.categories.OQLQueryTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+import org.apache.geode.test.junit.rules.VMProvider;
/**
- * Tests for QueryMonitoring service.
+ * These tests add a test hook to make sure query execution sleeps for at least 20 ms, so the
+ * queryMonitor thread will get a chance to cancel the query before it completes.
+ *
+ * The MAX_QUERY_EXECUTION_TIME is set as 1 ms, i.e, theoretically all queries will be cancelled
+ * after 1ms, but due to thread scheduling, this may not be true. we can only decrease the flakyness
+ * of the test by making MAX_QUERY_EXECUTION_TIME the smallest possible (1) and making the query
+ * execution time longer (but not too long to make the test run too slow).
*
- * @since GemFire 6.0
*/
@Category({OQLQueryTest.class})
-public class QueryMonitorDUnitTest implements Serializable {
-
- private static final Logger logger = LogService.getLogger();
-
- @Rule
- public DistributedTestRule distributedTestRule = new DistributedTestRule();
-
+public class QueryMonitorDUnitTest {
+ private static int MAX_QUERY_EXECUTE_TIME = 1;
@Rule
- public CacheRule cacheRule = new CacheRule();
+ public ClusterStartupRule cluster = new ClusterStartupRule(5);
@Rule
- public ClientCacheRule clientCacheRule = new ClientCacheRule();
-
- private final String exampleRegionName = "exampleRegion";
-
-
- /* Some of the queries are commented out as they were taking less time */
- String[] queryStr = {"SELECT ID FROM /exampleRegion p WHERE p.ID > 100",
- "SELECT DISTINCT * FROM /exampleRegion x, x.positions.values WHERE x.pk != '1000'",
- "SELECT DISTINCT * FROM /exampleRegion x, x.positions.values WHERE x.pkid != '1'",
- "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values WHERE p.pk > '1'",
- "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values WHERE p.pkid != '53'",
- "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE pos.Id > 100",
- "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE pos.Id > 100 and pos.secId IN SET('YHOO', 'IBM', 'AMZN')",
- "SELECT * FROM /exampleRegion p WHERE p.ID > 100 and p.status = 'active' and p.ID < 100000",
- "SELECT * FROM /exampleRegion WHERE ID > 100 and status = 'active'",
- "SELECT DISTINCT * FROM /exampleRegion p WHERE p.ID > 100 and p.status = 'active' and p.ID < 100000",
- "SELECT DISTINCT ID FROM /exampleRegion WHERE status = 'active'",
- "SELECT DISTINCT ID FROM /exampleRegion p WHERE p.status = 'active'",
- "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE pos.secId IN SET('YHOO', 'IBM', 'AMZN')",
- "SELECT DISTINCT proj1:p, proj2:itrX FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos"
- + " WHERE pos.secId = 'YHOO') as itrX",
- "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos"
- + " WHERE pos.secId = 'YHOO') as itrX",
- "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT p.ID FROM /exampleRegion x"
- + " WHERE x.ID = p.ID) as itrX",
- "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion x, x.positions.values pos"
- + " WHERE x.ID = p.ID) as itrX",
- "SELECT DISTINCT x.ID FROM /exampleRegion x, x.positions.values v WHERE "
- + "v.secId = element(SELECT DISTINCT vals.secId FROM /exampleRegion p, p.positions.values vals WHERE vals.secId = 'YHOO')",
- "SELECT DISTINCT * FROM /exampleRegion p, positions.values pos WHERE (p.ID > 1 or p.status = 'active') or (true AND pos.secId ='IBM')",
- "SELECT DISTINCT * FROM /exampleRegion p, positions.values pos WHERE (p.ID > 1 or p.status = 'active') or (true AND pos.secId !='IBM')",
- "SELECT DISTINCT structset.sos, structset.key "
- + "FROM /exampleRegion p, p.positions.values outerPos, "
- + "(SELECT DISTINCT key: key, sos: pos.sharesOutstanding "
- + "FROM /exampleRegion.entries pf, pf.value.positions.values pos "
- + "where outerPos.secId != 'IBM' AND "
- + "pf.key IN (SELECT DISTINCT * FROM pf.value.collectionHolderMap['0'].arr)) structset "
- + "where structset.sos > 2000",
- "SELECT DISTINCT * " + "FROM /exampleRegion p, p.positions.values outerPos, "
- + "(SELECT DISTINCT key: key, sos: pos.sharesOutstanding "
- + "FROM /exampleRegion.entries pf, pf.value.positions.values pos "
- + "where outerPos.secId != 'IBM' AND "
- + "pf.key IN (SELECT DISTINCT * FROM pf.value.collectionHolderMap['0'].arr)) structset "
- + "where structset.sos > 2000",
- "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values position "
- + "WHERE (true = null OR position.secId = 'SUN') AND true",};
-
- String[] prQueryStr = {
- "SELECT ID FROM /exampleRegion p WHERE p.ID > 100 and p.status = 'active'",
- "SELECT * FROM /exampleRegion WHERE ID > 100 and status = 'active'",
- "SELECT DISTINCT * FROM /exampleRegion p WHERE p.ID > 100 and p.status = 'active' and p.ID < 100000",
- "SELECT DISTINCT p.ID FROM /exampleRegion p WHERE p.ID > 100 and p.ID < 100000 and p.status = 'active'",
- "SELECT DISTINCT * FROM /exampleRegion p, positions.values pos WHERE (p.ID > 1 or p.status = 'active') or (pos.secId != 'IBM')",};
-
- private int numServers;
-
- @After
- public final void preTearDownCacheTestCase() throws Exception {
- Host host = Host.getHost(0);
- // shut down clients before servers
- for (int i = numServers; i < 4; i++) {
- host.getVM(i).invoke(() -> CacheTestCase.disconnectFromDS());
- }
- }
-
- public void setup(int numServers) throws Exception {
- Host host = Host.getHost(0);
- this.numServers = numServers;
- }
-
- public void createExampleRegion() {
- createExampleRegion(false, null);
- }
-
- private void createExampleRegion(final boolean eviction, final String dirName) {
- RegionFactory regionFactory =
- cacheRule.getCache().createRegionFactory(RegionShortcut.REPLICATE);
-
- // setting the eviction attributes.
- if (eviction) {
- File[] f = new File[1];
- f[0] = new File(dirName);
- f[0].mkdir();
- DiskStoreFactory dsf = cacheRule.getCache().createDiskStoreFactory();
- dsf.setDiskDirs(f).create("ds1");
- EvictionAttributes evictAttrs =
- EvictionAttributes.createLRUEntryAttributes(100, EvictionAction.OVERFLOW_TO_DISK);
- regionFactory.setDiskStoreName("ds1").setEvictionAttributes(evictAttrs);
- }
- regionFactory.create(exampleRegionName);
- }
-
- private void createExamplePRRegion() {
- RegionFactory regionFactory =
- cacheRule.getCache().createRegionFactory(RegionShortcut.PARTITION);
-
- AttributesFactory factory = new AttributesFactory();
- // factory.setDataPolicy(DataPolicy.PARTITION);
- regionFactory
- .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(8).create());
- regionFactory.create(exampleRegionName);
- }
-
- private int configServer(final int queryMonitorTime, final String testName) throws IOException {
- cacheRule.createCache();
- CacheServer cacheServer = cacheRule.getCache().addCacheServer();
- cacheServer.setPort(0);
- cacheServer.start();
- GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = queryMonitorTime;
- cacheRule.getCache().getLogger().fine("#### RUNNING TEST : " + testName);
- DefaultQuery.testHook = new QueryTimeoutHook(queryMonitorTime);
- return cacheServer.getPort();
- }
-
- // Stop server
- private void stopServer(VM server) {
- SerializableRunnable stopServer = new SerializableRunnable("Stop CacheServer") {
- public void run() {
- // Reset the test flag.
- Cache cache = cacheRule.getCache();
- DefaultQuery.testHook = null;
- GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = -1;
- stopBridgeServer(cacheRule.getCache());
- }
- };
- server.invoke(stopServer);
- }
-
- private void configClient(String host, int... ports) {
- configClient(false, host, ports);
- }
-
- private void configClient(boolean enableSubscriptions, String host, int... ports) {
- ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
- for (int port : ports) {
- clientCacheFactory.addPoolServer(host, port);
- }
- clientCacheFactory.setPoolSubscriptionEnabled(true);
- clientCacheFactory.setPoolReadTimeout(10 * 60 * 1000); // 10 mins
- clientCacheRule.createClientCache(clientCacheFactory);
- }
+ public GfshCommandRule gfsh = new GfshCommandRule();
- private void verifyException(Exception e) {
- e.printStackTrace();
- String error = e.getMessage();
- if (e.getCause() != null) {
- error = e.getCause().getMessage();
- }
+ private MemberVM locator, server1, server2;
+ private ClientVM client3, client4;
- if (error.contains("Query execution cancelled after exceeding max execution time")
- || error.contains("The Query completed sucessfully before it got canceled")
- || error.contains("The QueryMonitor thread may be sleeping longer than the set sleep time")
- || error.contains(
- "The query task could not be found but the query is marked as having been canceled")) {
- // Expected exception.
- return;
- }
-
- System.out.println("Unexpected exception:");
- if (e.getCause() != null) {
- e.getCause().printStackTrace();
- } else {
- e.printStackTrace();
- }
+ @Before
+ public void setUpServers() throws Exception {
+ locator = cluster.startLocatorVM(0, l -> l.withoutClusterConfigurationService());
+ server1 = cluster.startServerVM(1, locator.getPort());
+ server2 = cluster.startServerVM(2, locator.getPort());
- fail("Expected exception Not found. Expected exception with message: \n"
- + "\"Query execution taking more than the max execution time\"" + "\n Found \n" + error);
+ // configure the server to make the query to wait for at least 1 second in every execution spot
+ // and set a MAX_QUERY_EXECUTION_TIME to be 10ms
+ VMProvider.invokeInEveryMember(() -> {
+ DefaultQuery.testHook = new QueryTimeoutHook();
+ GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = MAX_QUERY_EXECUTE_TIME;
+ }, server1, server2);
+ gfsh.connectAndVerify(locator);
}
- /**
- * Tests query execution from client to server (single server).
- */
@Test
- public void testQueryMonitorClientServer() throws Exception {
-
- setup(1);
-
- final Host host = Host.getHost(0);
-
- VM server = host.getVM(0);
- VM client1 = host.getVM(1);
- VM client2 = host.getVM(2);
- VM client3 = host.getVM(3);
-
- final int numberOfEntries = 100;
- String serverHostName = NetworkUtils.getServerHostName(host);
-
- // Start server
- int serverPort = server.invoke("Create BridgeServer",
- () -> configServer(10, "testQueryMonitorClientServer")); // All the queries taking more than
- // 20ms should be canceled by Query
- // monitor.
- server.invoke("createExampleRegion", () -> createExampleRegion());
-
- // Initialize server regions.
- server.invoke("populatePortfolioRegions", () -> populatePortfolioRegions(numberOfEntries));
-
- // Initialize Client1
- client1.invoke("Init client", () -> configClient(serverHostName, serverPort));
-
- // Initialize Client2
- client2.invoke("Init client", () -> configClient(serverHostName, serverPort));
+ public void testMultipleClientToOneServer() throws Exception {
+ int server1Port = server1.getPort();
+ client3 = cluster.startClientVM(3, true, server1Port);
+ client4 = cluster.startClientVM(4, true, server1Port);
- // Initialize Client3
- client3.invoke("Init client", () -> configClient(serverHostName, serverPort));
+ gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE")
+ .statusIsSuccess();
- // Execute client queries
+ locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2);
+ server1.invoke(() -> populateRegion(0, 100));
- client1.invoke("execute Queries", () -> executeQueriesFromClient(10));
- client2.invoke("execute Queries", () -> executeQueriesFromClient(10));
- client3.invoke("execute Queries", () -> executeQueriesFromClient(10));
-
- stopServer(server);
- }
-
- private void executeQueriesFromClient(int timeout) {
- GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = timeout;
- QueryService queryService = clientCacheRule.getClientCache().getQueryService();
- executeQueriesAgainstQueryService(queryService);
- }
-
- private void executeQueriesOnServer() {
- QueryService queryService = GemFireCacheImpl.getInstance().getQueryService();
- executeQueriesAgainstQueryService(queryService);
+ // execute the query
+ VMProvider.invokeInEveryMember(() -> exuteQuery(), client3, client4);
}
- private void executeQueriesAgainstQueryService(QueryService queryService) {
- for (int k = 0; k < queryStr.length; k++) {
- String qStr = queryStr[k];
- executeQuery(queryService, qStr);
- }
- }
-
- private void executeQuery(QueryService queryService, String qStr) {
- try {
- logger.info("Executing query :" + qStr);
- Query query = queryService.newQuery(qStr);
- query.execute();
- fail("The query should have been canceled by the QueryMonitor. Query: " + qStr);
- } catch (Exception e) {
- verifyException(e);
- }
- }
-
- /**
- * Tests query execution from client to server (multi server).
- */
@Test
- public void testQueryMonitorMultiClientMultiServer() throws Exception {
-
- setup(2);
-
- final Host host = Host.getHost(0);
-
- VM server1 = host.getVM(0);
- VM server2 = host.getVM(1);
- VM client1 = host.getVM(2);
- VM client2 = host.getVM(3);
-
- final int numberOfEntries = 100;
-
- String serverHostName = NetworkUtils.getServerHostName(host);
-
- // Start server
- int serverPort1 = server1.invoke("Create BridgeServer",
- () -> configServer(20, "testQueryMonitorMultiClientMultiServer"));// All the queries taking
- // more than 20ms should
- // be canceled by Query
- // monitor.
- server1.invoke("createExampleRegion", () -> createExampleRegion());
+ public void testOneClientToMultipleServerOnReplicateRegion() throws Exception {
+ int server1Port = server1.getPort();
+ int server2Port = server2.getPort();
+ client3 = cluster.startClientVM(3, true, server1Port, server2Port);
- int serverPort2 = server2.invoke("Create BridgeServer",
- () -> configServer(20, "testQueryMonitorMultiClientMultiServer"));// All the queries taking
- // more than 20ms should
- // be canceled by Query
- // monitor.
- server2.invoke("createExampleRegion", () -> createExampleRegion());
+ gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE")
+ .statusIsSuccess();
- // Initialize server regions.
- server1.invoke("Create Bridge Server", () -> populatePortfolioRegions(numberOfEntries));
+ locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2);
+ server1.invoke(() -> populateRegion(0, 100));
- // Initialize server regions.
- server2.invoke("Create Bridge Server", () -> populatePortfolioRegions(numberOfEntries));
-
- // Initialize Client1 and create client regions.
- client1.invoke("Init client", () -> configClient(serverHostName, serverPort1, serverPort2));
- // client1.invoke("createExampleRegion", () -> createExampleRegion());
-
- // Initialize Client2 and create client regions.
- client2.invoke("Init client", () -> configClient(serverHostName, serverPort1, serverPort2));
- // client2.invoke("createExampleRegion", () -> createExampleRegion());
-
- // Execute client queries
-
- client1.invoke("executeQueriesFromClient", () -> executeQueriesFromClient(20));
- client2.invoke("executeQueriesFromClient", () -> executeQueriesFromClient(20));
-
- stopServer(server1);
- stopServer(server2);
+ // execute the query from client3
+ client3.invoke(() -> exuteQuery());
}
- /**
- * Tests query execution on local vm.
- */
@Test
- public void testQueryExecutionLocally() throws Exception {
-
- setup(2);
-
- final Host host = Host.getHost(0);
-
- VM server1 = host.getVM(0);
- VM server2 = host.getVM(1);
-
- final int numberOfEntries = 100;
-
- // Start server
- server1.invoke("Create BridgeServer", () -> configServer(20, "testQueryExecutionLocally"));// All
- // the
- // queries
- // taking
- // more
- // than
- // 20ms
- // should
- // be
- // canceled
- // by
- // Query
- // monitor.
- server1.invoke("createExampleRegion", () -> createExampleRegion());
-
- server2.invoke("Create BridgeServer", () -> configServer(20, "testQueryExecutionLocally"));// All
- // the
- // queries
- // taking
- // more
- // than
- // 20ms
- // should
- // be
- // canceled
- // by
- // Query
- // monitor.
- server2.invoke("createExampleRegion", () -> createExampleRegion());
-
- // Initialize server regions.
- server1.invoke("Create Bridge Server", () -> populatePortfolioRegions(numberOfEntries));
-
- // Initialize server regions.
- server2.invoke("Create Bridge Server", () -> populatePortfolioRegions(numberOfEntries));
-
- // Execute server queries
-
- server1.invoke("execute queries on Server", () -> executeQueriesOnServer());
- server2.invoke("execute queries on Server", () -> executeQueriesOnServer());
-
- stopServer(server1);
- stopServer(server2);
- }
+ public void testOneClientToOneServerOnPartitionedRegion() throws Exception {
+ // client3 connects to server1, client4 connects to server2
+ int server1Port = server1.getPort();
+ int server2Port = server2.getPort();
+ client3 = cluster.startClientVM(3, true, server1Port);
+ client4 = cluster.startClientVM(4, true, server2Port);
- /**
- * Tests query execution on local vm.
- */
- @Test
- public void testQueryExecutionLocallyAndCacheOp() throws Exception {
-
- setup(2);
-
- final Host host = Host.getHost(0);
-
- VM server1 = host.getVM(0);
- VM server2 = host.getVM(1);
-
- final int numberOfEntries = 1000;
-
- // Start server
- server1.invoke("Create BridgeServer", () -> configServer(20, "testQueryExecutionLocally"));// All
- // the
- // queries
- // taking
- // more
- // than
- // 20ms
- // should
- // be
- // canceled
- // by
- // Query
- // monitor.
- server1.invoke("createExampleRegion", () -> createExampleRegion());
-
- server2.invoke("Create BridgeServer", () -> configServer(20, "testQueryExecutionLocally"));// All
- // the
- // queries
- // taking
- // more
- // than
- // 20ms
- // should
- // be
- // canceled
- // by
- // Query
- // monitor.
- server2.invoke("createExampleRegion", () -> createExampleRegion());
-
- // Initialize server regions.
- server1.invoke("populatePortfolioRegions", () -> populatePortfolioRegions(numberOfEntries));
-
- // Initialize server regions.
- server2.invoke("populatePortfolioRegions", () -> populatePortfolioRegions(numberOfEntries));
-
- // Execute server queries
- SerializableRunnable executeQuery = new CacheSerializableRunnable("Execute queries") {
- public void run2() throws CacheException {
- try {
- QueryService queryService = GemFireCacheImpl.getInstance().getQueryService();
- String qStr =
- "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion x, x.positions.values pos"
- + " WHERE x.ID = p.ID) as itrX";
- executeQuery(queryService, qStr);
-
- // Create index and Perform cache op. Bug#44307
- queryService.createIndex("idIndex", "ID", "/exampleRegion");
- queryService.createIndex("statusIndex", "status", "/exampleRegion");
- Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);
- for (int i = (1 + 100); i <= (numberOfEntries + 200); i++) {
- exampleRegion.put("" + i, new Portfolio(i));
- }
-
- } catch (Exception ex) {
- Assert.fail("Exception creating the query service", ex);
- }
- }
- };
-
- server1.invoke(executeQuery);
- server2.invoke(executeQuery);
+ gfsh.executeAndAssertThat("create region --name=exampleRegion --type=PARTITION")
+ .statusIsSuccess();
- stopServer(server1);
- stopServer(server2);
- }
+ locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2);
+ server1.invoke(() -> populateRegion(0, 100));
+ server2.invoke(() -> populateRegion(100, 200));
- private void populatePortfolioRegions(int numberOfEntries) {
- Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);;
- for (int i = (1 + 100); i <= (numberOfEntries + 100); i++) {
- exampleRegion.put("" + i, new Portfolio(i));
- }
+ client3.invoke(() -> exuteQuery());
+ client4.invoke(() -> exuteQuery());
}
- /**
- * Tests query execution from client to server (multiple server) on Partition Region .
- */
@Test
- public void testQueryMonitorOnPR() throws Exception {
-
- setup(2);
-
- final Host host = Host.getHost(0);
-
- VM server1 = host.getVM(0);
- VM server2 = host.getVM(1);
- VM client1 = host.getVM(2);
- VM client2 = host.getVM(3);
-
- final int numberOfEntries = 100;
-
- String serverHostName = NetworkUtils.getServerHostName(host);
-
- // Start server
- int serverPort1 = server1.invoke("configServer",
- () -> configServer(20, "testQueryMonitorMultiClientMultiServerOnPR"));// All the queries
- // taking more than
- // 100ms should be
- // canceled by Query
- // monitor.
- server1.invoke("createExamplePRRegion", () -> createExamplePRRegion());
-
- int serverPort2 = server2.invoke("configServer",
- () -> configServer(20, "testQueryMonitorMultiClientMultiServerOnPR"));// All the queries
- // taking more than
- // 100ms should be
- // canceled by Query
- // monitor.
- server2.invoke("createExamplePRRegion", () -> createExamplePRRegion());
-
- // Initialize server regions.
- server1.invoke("bulkInsertPorfolio", () -> bulkInsertPorfolio(101, numberOfEntries));
-
- // Initialize server regions.
- server2.invoke("bulkInsertPorfolio", () -> bulkInsertPorfolio((numberOfEntries + 100),
- (numberOfEntries + numberOfEntries + 100)));
-
- // Initialize Client1
- client1.invoke("Init client", () -> configClient(serverHostName, serverPort1));
-
- // Initialize Client2
- client2.invoke("Init client", () -> configClient(serverHostName, serverPort2));
-
- // Execute client queries
-
- client1.invoke("Execute Queries", () -> executeQueriesFromClient(20));
- client2.invoke("Execute Queries", () -> executeQueriesFromClient(20));
-
- stopServer(server1);
- stopServer(server2);
- }
+ public void testQueryExecutionFromServerAndPerformCacheOp() throws Exception {
+ gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE")
+ .statusIsSuccess();
+
+ locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2);
+ server1.invoke(() -> populateRegion(0, 100));
+
+ // execute the query from one server
+ server1.invoke(() -> exuteQuery());
+
+ // Create index and Perform cache op. Bug#44307
+ server1.invoke(() -> {
+ QueryService queryService = ClusterStartupRule.getCache().getQueryService();
+ queryService.createIndex("idIndex", "ID", "/exampleRegion");
+ queryService.createIndex("statusIndex", "status", "/exampleRegion");
+ populateRegion(100, 10);
+ });
- private void bulkInsertPorfolio(int startingId, int numberOfEntries) {
- Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);
- for (int i = startingId; i <= (numberOfEntries + 100); i++) {
- exampleRegion.put("" + i, new Portfolio(i));
- }
+ // destroy indices created in this test
+ gfsh.executeAndAssertThat("destroy index --region=/exampleRegion").statusIsSuccess();
}
- /**
- * Tests query execution on Partition Region, executes query locally.
- */
@Test
- public void testQueryMonitorWithLocalQueryOnPR() throws Exception {
-
- setup(2);
-
- final Host host = Host.getHost(0);
-
- VM server1 = host.getVM(0);
- VM server2 = host.getVM(1);
-
- final int numberOfEntries = 100;
-
- // Start server
- server1.invoke("configServer",
- () -> configServer(20, "testQueryMonitorMultiClientMultiServerOnPR"));// All the queries
- // taking more than
- // 100ms should be
- // canceled by Query
- // monitor.
- server1.invoke("Create Partition Regions", () -> createExamplePRRegion());
-
- server2.invoke("configServer",
- () -> configServer(20, "testQueryMonitorMultiClientMultiServerOnPR"));// All the queries
- // taking more than
- // 100ms should be
- // canceled by Query
- // monitor.
- server2.invoke("Create Partition Regions", () -> createExamplePRRegion());
-
- // Initialize server regions.
- server1.invoke(new CacheSerializableRunnable("Create Bridge Server") {
- public void run2() throws CacheException {
- bulkInsertPorfolio(101, numberOfEntries);
- }
- });
-
- // Initialize server regions.
- server2.invoke(new CacheSerializableRunnable("Create Bridge Server") {
- public void run2() throws CacheException {
- bulkInsertPorfolio((numberOfEntries + 100), (numberOfEntries + numberOfEntries + 100));
- }
- });
+ public void testQueryExecutionFromServerOnPartitionedRegion() throws Exception {
+ gfsh.executeAndAssertThat("create region --name=exampleRegion --type=PARTITION")
+ .statusIsSuccess();
- // Execute client queries
- server1.invoke("execute queries on server", () -> executeQueriesOnServer());
- server2.invoke("execute queries on server", () -> executeQueriesOnServer());
+ locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2);
+ server1.invoke(() -> populateRegion(100, 200));
+ server2.invoke(() -> populateRegion(200, 300));
- stopServer(server1);
- stopServer(server2);
+ // execute the query from one server
+ server1.invoke(() -> exuteQuery());
+ server2.invoke(() -> exuteQuery());
}
- /**
- * Tests query execution from client to server (multiple server) with eviction to disk.
- */
@Test
- public void testQueryMonitorRegionWithEviction() throws CacheException {
-
- final Host host = Host.getHost(0);
-
- VM server1 = host.getVM(0);
- VM server2 = host.getVM(1);
- VM client1 = host.getVM(2);
- VM client2 = host.getVM(3);
-
- final int numberOfEntries = 100;
-
- String serverHostName = NetworkUtils.getServerHostName(host);
-
- // Start server
- int serverPort1 = server1.invoke("Create BridgeServer",
- () -> configServer(20, "testQueryMonitorRegionWithEviction"));// All the queries taking more
- // than 20ms should be
- // canceled by Query monitor.
- server1.invoke("createExampleRegion",
- () -> createExampleRegion(true, "server1_testQueryMonitorRegionWithEviction"));
-
- int serverPort2 = server2.invoke("Create BridgeServer",
- () -> configServer(20, "testQueryMonitorRegionWithEviction"));// All the queries taking more
- // than 20ms should be
- // canceled by Query monitor.
- server2.invoke("createExampleRegion",
- () -> createExampleRegion(true, "server2_testQueryMonitorRegionWithEviction"));
-
- // Initialize server regions.
- server1.invoke(new CacheSerializableRunnable("Create Bridge Server") {
- public void run2() throws CacheException {
- bulkInsertPorfolio(101, numberOfEntries);
- }
+ public void testQueryMonitorRegionWithEviction() throws Exception {
+ File server1WorkingDir = server1.getWorkingDir();
+ File server2WorkingDir = server2.getWorkingDir();
+ server1.invoke(() -> createReplicateRegionWithEviction(server1WorkingDir));
+ server2.invoke(() -> createReplicateRegionWithEviction(server2WorkingDir));
+ server1.invoke(() -> populateRegion(0, 100));
+ server2.invoke(() -> populateRegion(100, 200));
+
+ // client3 connects to server1, client4 connects to server2
+ int server1Port = server1.getPort();
+ int server2Port = server2.getPort();
+ client3 = cluster.startClientVM(3, ccf -> {
+ configureClientCacheFactory(ccf, server1Port);
});
- // Initialize server regions.
- server2.invoke(new CacheSerializableRunnable("Create Bridge Server") {
- public void run2() throws CacheException {
- bulkInsertPorfolio((numberOfEntries + 100), (numberOfEntries + numberOfEntries + 100));
- }
+ client4 = cluster.startClientVM(4, ccf -> {
+ configureClientCacheFactory(ccf, server2Port);
});
-
- // Initialize Client1 and create client regions.
- client1.invoke("Init client", () -> configClient(serverHostName, serverPort1));
-
- // Initialize Client2 and create client regions.
- client2.invoke("Init client", () -> configClient(serverHostName, serverPort2));
-
- // Execute client queries
- client1.invoke("Execute Queries", () -> executeQueriesFromClient(20));
- client2.invoke("Execute Queries", () -> executeQueriesFromClient(20));
-
- stopServer(server1);
- stopServer(server2);
+ client3.invoke(() -> exuteQuery());
+ client4.invoke(() -> exuteQuery());
}
- /**
- * Tests query execution on region with indexes.
- */
@Test
public void testQueryMonitorRegionWithIndex() throws Exception {
-
- setup(2);
-
- final Host host = Host.getHost(0);
-
- VM server1 = host.getVM(0);
- VM server2 = host.getVM(1);
- VM client1 = host.getVM(2);
- VM client2 = host.getVM(3);
-
- final int numberOfEntries = 100;
-
- String serverHostName = NetworkUtils.getServerHostName(host);
-
- // Start server
- int serverPort1 =
- server1.invoke("configServer", () -> configServer(20, "testQueryMonitorRegionWithIndex"));// All
- // the
- // queries
- // taking
- // more
- // than
- // 20ms
- // should
- // be
- // canceled
- // by
- // Query
- // monitor.
- server1.invoke("createExampleRegion", () -> createExampleRegion());
-
- int serverPort2 =
- server2.invoke("configServer", () -> configServer(20, "testQueryMonitorRegionWithIndex"));// All
- // the
- // queries
- // taking
- // more
- // than
- // 20ms
- // should
- // be
- // canceled
- // by
- // Query
- // monitor.
- server2.invoke("createExampleRegion", () -> createExampleRegion());
-
- // Initialize server regions.
- server1.invoke("Create Indexes", () -> createIndexes(numberOfEntries));
-
- // Initialize server regions.
- server2.invoke("Create Indexes", () -> createIndexes(numberOfEntries));
-
- // Initialize Client1
- client1.invoke("Init client", () -> configClient(serverHostName, serverPort1));
-
- // Initialize Client2
- client2.invoke("Init client", () -> configClient(serverHostName, serverPort2));
-
- // Execute client queries
- client1.invoke("executeQueriesFromClient", () -> executeQueriesFromClient(20));
- client2.invoke("executeQueriesFromClient", () -> executeQueriesFromClient(20));
-
- stopServer(server1);
- stopServer(server2);
- }
-
- private void createIndexes(int numberOfEntries) throws Exception {
- Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);
-
- // create index.
- QueryService cacheQS = cacheRule.getCache().getQueryService();
- cacheQS.createIndex("idIndex", "p.ID", "/exampleRegion p");
- cacheQS.createIndex("statusIndex", "p.status", "/exampleRegion p");
- cacheQS.createIndex("secIdIndex", "pos.secId", "/exampleRegion p, p.positions.values pos");
- cacheQS.createIndex("posIdIndex", "pos.Id", "/exampleRegion p, p.positions.values pos");
- cacheQS.createKeyIndex("pkIndex", "pk", "/exampleRegion");
- cacheQS.createKeyIndex("pkidIndex", "pkid", "/exampleRegion");
-
- for (int i = (1 + 100); i <= (numberOfEntries + 100); i++) {
- exampleRegion.put("" + i, new Portfolio(i));
- }
+ gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE")
+ .statusIsSuccess();
+ locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2);
+
+ // create the indices using API
+ VMProvider.invokeInEveryMember(() -> {
+ // create index.
+ QueryService cacheQS = ClusterStartupRule.getCache().getQueryService();
+ cacheQS.createIndex("idIndex", "p.ID", "/exampleRegion p");
+ cacheQS.createIndex("statusIndex", "p.status", "/exampleRegion p");
+ cacheQS.createIndex("secIdIndex", "pos.secId", "/exampleRegion p, p.positions.values pos");
+ cacheQS.createIndex("posIdIndex", "pos.Id", "/exampleRegion p, p.positions.values pos");
+ cacheQS.createKeyIndex("pkIndex", "pk", "/exampleRegion");
+ cacheQS.createKeyIndex("pkidIndex", "pkid", "/exampleRegion");
+ populateRegion(0, 150);
+ }, server1, server2);
+
+ // client3 connects to server1, client4 connects to server2
+ int server1Port = server1.getPort();
+ int server2Port = server2.getPort();
+ client3 = cluster.startClientVM(3, true, server1Port);
+ client4 = cluster.startClientVM(4, true, server2Port);
+
+ client3.invoke(() -> exuteQuery());
+ client4.invoke(() -> exuteQuery());
}
- /**
- * The following CQ test is added to make sure testMaxQueryExecutionTime is reset and is not
- * affecting other query related tests.
- *
- */
@Test
public void testCqExecuteDoesNotGetAffectedByTimeout() throws Exception {
- setup(1);
-
- final Host host = Host.getHost(0);
- VM server = host.getVM(0);
- VM client = host.getVM(1);
- VM producerClient = host.getVM(2);
-
- // Start server
- int serverPort =
- server.invoke("configServer", () -> configServer(20, "testQueryMonitorRegionWithIndex"));// All
- server.invoke("createExampleRegion", () -> createExampleRegion());
-
-
- final String host0 = NetworkUtils.getServerHostName(server.getHost());
-
- // Create client.
- client.invoke("createClient", () -> configClient(true, host.getHostName(), serverPort));
-
- final int size = 10;
- final String name = "testQuery_4";
- server.invoke(() -> {
- Region region = cacheRule.getCache().getRegion(exampleRegionName);
- for (int i = 1; i <= size; i++) {
- region.put("key" + i, new Portfolio(i));
- }
+ gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE")
+ .statusIsSuccess();
+ locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2);
+ server1.invoke(() -> populateRegion(0, 100));
+
+ int server1Port = server1.getPort();
+ client3 = cluster.startClientVM(3, ccf -> {
+ configureClientCacheFactory(ccf, server1Port);
});
- // create and execute cq
- client.invoke(() -> {
+ client3.invoke(() -> {
String cqName = "testCQForQueryMonitorDUnitTest";
- String query = "select * from /" + exampleRegionName;
+ String query = "select * from /exampleRegion";
// Get CQ Service.
- QueryService cqService = null;
- cqService = clientCacheRule.getClientCache().getQueryService();
+ QueryService cqService = ClusterStartupRule.getClientCache().getQueryService();
// Create CQ Attributes.
CqAttributesFactory cqf = new CqAttributesFactory();
@@ -857,278 +256,212 @@ public class QueryMonitorDUnitTest implements Serializable {
CqQuery cq1 = cqService.newCq(cqName, query, cqa);
cq1.execute();
});
+
+ server1.invoke(() -> {
+ populateRegion(0, 150);
+ });
}
@Test
- public void testCqProcessingDoesNotGetAffectedByTimeout() throws Exception {
- setup(1);
-
- final Host host = Host.getHost(0);
- VM server = host.getVM(0);
- VM client = host.getVM(1);
- VM producerClient = host.getVM(2);
-
- // Start server
- int serverPort =
- server.invoke("configServer", () -> configServer(20, "testQueryMonitorRegionWithIndex"));// All
- server.invoke("createExampleRegion", () -> createExampleRegion());
-
-
- final String host0 = NetworkUtils.getServerHostName(server.getHost());
-
- // Create client.
- client.invoke("createClient", () -> configClient(true, host.getHostName(), serverPort));
+ public void testCacheOpAfterQueryCancel() throws Exception {
+ int locatorPort = locator.getPort();
+ // start up more servers
+ MemberVM server3 = cluster.startServerVM(3, locatorPort);
- final int size = 10;
- final String name = "testQuery_4";
- server.invoke(() -> {
- Region region = cacheRule.getCache().getRegion(exampleRegionName);
- for (int i = 1; i <= size; i++) {
- region.put("key" + i, new Portfolio(i));
- }
+ server3.invoke(() -> {
+ DefaultQuery.testHook = new QueryTimeoutHook();
+ GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = MAX_QUERY_EXECUTE_TIME;
});
- // create and execute cq
- client.invoke(() -> {
- String cqName = "testCQForQueryMonitorDUnitTest";
- String query = "select * from /" + exampleRegionName;
- // Get CQ Service.
- QueryService cqService = null;
- cqService = clientCacheRule.getClientCache().getQueryService();
-
- // Create CQ Attributes.
- CqAttributesFactory cqf = new CqAttributesFactory();
- CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())};
- cqf.initCqListeners(cqListeners);
- CqAttributes cqa = cqf.create();
+ gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE")
+ .statusIsSuccess();
+ locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 3);
- CqQuery cq1 = cqService.newCq(cqName, query, cqa);
- cq1.execute();
+ server1.invoke(() -> {
+ QueryService queryService = ClusterStartupRule.getCache().getQueryService();
+ queryService.createIndex("statusIndex", "status", "/exampleRegion");
+ queryService.createIndex("secIdIndex", "pos.secId",
+ "/exampleRegion p, p.positions.values pos");
+ populateRegion(100, 1000);
});
- server.invoke(() -> {
- Region region = cacheRule.getCache().getRegion(exampleRegionName);
- for (int i = 1; i <= size; i++) {
- region.put("key" + i, new Portfolio(i));
+ AsyncInvocation ai1 = server1.invokeAsync(() -> {
+ for (int j = 0; j < 5; j++) {
+ populateRegion(0, 2000);
}
});
- }
-
- /**
- * Tests cache operation right after query cancellation.
- */
- @Test
- public void testCacheOpAfterQueryCancel() throws Exception {
-
- setup(4);
-
- final Host host = Host.getHost(0);
-
- VM server1 = host.getVM(0);
- VM server2 = host.getVM(1);
- VM server3 = host.getVM(2);
- VM server4 = host.getVM(3);
-
- final int numberOfEntries = 1000;
- // Start server
- server1.invoke("Create BridgeServer", () -> configServer(5, "testQueryExecutionLocally"));
- server1.invoke("Create Partition Regions", () -> createExamplePRRegion());
-
- server2.invoke("Create BridgeServer", () -> configServer(5, "testQueryExecutionLocally"));
- server2.invoke("Create Partition Regions", () -> createExamplePRRegion());
-
- server3.invoke("Create BridgeServer", () -> configServer(5, "testQueryExecutionLocally"));
- server3.invoke("Create Partition Regions", () -> createExamplePRRegion());
-
- server4.invoke("Create BridgeServer", () -> configServer(5, "testQueryExecutionLocally"));
- server4.invoke("Create Partition Regions", () -> createExamplePRRegion());
+ AsyncInvocation ai2 = server2.invokeAsync(() -> {
+ for (int j = 0; j < 5; j++) {
+ populateRegion(1000, 3000);
+ }
+ });
- server1.invoke(new CacheSerializableRunnable("Create Bridge Server") {
- public void run2() throws CacheException {
+ // server3 performs a region put after a query is canceled.
+ AsyncInvocation ai3 = server3.invokeAsync(() -> {
+ Region exampleRegion = ClusterStartupRule.getCache().getRegion("exampleRegion");
+ QueryService queryService = GemFireCacheImpl.getInstance().getQueryService();
+ String qStr =
+ "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values pos1, p.positions.values pos"
+ + " where p.ID < pos.sharesOutstanding OR p.ID > 0 OR p.position1.mktValue > 0 "
+ + " OR pos.secId in SET ('SUN', 'IBM', 'YHOO', 'GOOG', 'MSFT', "
+ + " 'AOL', 'APPL', 'ORCL', 'SAP', 'DELL', 'RHAT', 'NOVL', 'HP')"
+ + " order by p.status, p.ID desc";
+ for (int i = 0; i < 100; i++) {
try {
- QueryService queryService = GemFireCacheImpl.getInstance().getQueryService();
- queryService.createIndex("statusIndex", "status", "/exampleRegion");
- queryService.createIndex("secIdIndex", "pos.secId",
- "/exampleRegion p, p.positions.values pos");
- } catch (Exception ex) {
- fail("Failed to create index.");
- }
- Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);
- for (int i = 100; i <= (numberOfEntries); i++) {
+ Query query = queryService.newQuery(qStr);
+ query.execute();
+ fail("The query should have been canceled by the QueryMonitor. Query: " + qStr);
+ } catch (QueryExecutionTimeoutException qet) {
exampleRegion.put("" + i, new Portfolio(i));
}
}
});
- // Initialize server regions.
- AsyncInvocation ai1 =
- server1.invokeAsync(new CacheSerializableRunnable("Create Bridge Server") {
- public void run2() throws CacheException {
- Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);
- for (int j = 0; j < 5; j++) {
- for (int i = 1; i <= (numberOfEntries + 1000); i++) {
- exampleRegion.put("" + i, new Portfolio(i));
- }
- }
- LogWriterUtils.getLogWriter()
- .info("### Completed updates in server1 in testCacheOpAfterQueryCancel");
- }
- });
-
- AsyncInvocation ai2 =
- server2.invokeAsync(new CacheSerializableRunnable("Create Bridge Server") {
- public void run2() throws CacheException {
- Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);
- for (int j = 0; j < 5; j++) {
- for (int i = (1 + 1000); i <= (numberOfEntries + 2000); i++) {
- exampleRegion.put("" + i, new Portfolio(i));
- }
- }
- LogWriterUtils.getLogWriter()
- .info("### Completed updates in server2 in testCacheOpAfterQueryCancel");
- }
- });
-
- // Execute server queries
- SerializableRunnable executeQuery = new CacheSerializableRunnable("Execute queries") {
- public void run2() throws CacheException {
- try {
- Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);
- QueryService queryService = GemFireCacheImpl.getInstance().getQueryService();
- String qStr =
- "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values pos1, p.positions.values pos"
- + " where p.ID < pos.sharesOutstanding OR p.ID > 0 OR p.position1.mktValue > 0 "
- + " OR pos.secId in SET ('SUN', 'IBM', 'YHOO', 'GOOG', 'MSFT', "
- + " 'AOL', 'APPL', 'ORCL', 'SAP', 'DELL', 'RHAT', 'NOVL', 'HP')"
- + " order by p.status, p.ID desc";
- for (int i = 0; i < 500; i++) {
- try {
- GemFireCacheImpl.getInstance().getLogger().info("Executing query :" + qStr);
- Query query = queryService.newQuery(qStr);
- query.execute();
- } catch (QueryExecutionTimeoutException qet) {
- LogWriterUtils.getLogWriter()
- .info("### Got Expected QueryExecutionTimeout exception. " + qet.getMessage());
- if (qet.getMessage().contains("cancelled after exceeding max execution")) {
- LogWriterUtils.getLogWriter().info("### Doing a put operation");
- exampleRegion.put("" + i, new Portfolio(i));
- }
- } catch (Exception e) {
- fail("Exception executing query." + e.getMessage());
- }
- }
- LogWriterUtils.getLogWriter()
- .info("### Completed Executing queries in testCacheOpAfterQueryCancel");
- } catch (Exception ex) {
- Assert.fail("Exception creating the query service", ex);
- }
- }
- };
-
- AsyncInvocation ai3 = server3.invokeAsync(executeQuery);
- AsyncInvocation ai4 = server4.invokeAsync(executeQuery);
-
- LogWriterUtils.getLogWriter()
- .info("### Waiting for async threads to join in testCacheOpAfterQueryCancel");
- try {
- ThreadUtils.join(ai1, 5 * 60 * 1000);
- ThreadUtils.join(ai2, 5 * 60 * 1000);
- ThreadUtils.join(ai3, 5 * 60 * 1000);
- ThreadUtils.join(ai4, 5 * 60 * 1000);
- } catch (Exception ex) {
- fail("Async thread join failure");
- }
- LogWriterUtils.getLogWriter()
- .info("### DONE Waiting for async threads to join in testCacheOpAfterQueryCancel");
+ ai1.await();
+ ai2.await();
+ ai3.await();
- validateQueryMonitorThreadCnt(server1, 0, 1000);
- validateQueryMonitorThreadCnt(server2, 0, 1000);
- validateQueryMonitorThreadCnt(server3, 0, 1000);
- validateQueryMonitorThreadCnt(server4, 0, 1000);
+ server3.invoke(() -> {
+ DefaultQuery.testHook = null;
+ GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = -1;
+ });
+ }
- LogWriterUtils.getLogWriter()
- .info("### DONE validating query monitor threads testCacheOpAfterQueryCancel");
- stopServer(server1);
- stopServer(server2);
- stopServer(server3);
- stopServer(server4);
+ private static void populateRegion(int startingId, int endingId) {
+ Region exampleRegion = ClusterStartupRule.getCache().getRegion("exampleRegion");
+ for (int i = startingId; i < endingId; i++) {
+ exampleRegion.put("" + i, new Portfolio(i));
+ }
}
- private void validateQueryMonitorThreadCnt(VM vm, final int threadCount, final int waitTime) {
- SerializableRunnable validateThreadCnt =
- new CacheSerializableRunnable("validateQueryMonitorThreadCnt") {
- public void run2() throws CacheException {
- Cache cache = cacheRule.getCache();
- QueryMonitor qm = ((GemFireCacheImpl) cache).getQueryMonitor();
- if (qm == null) {
- fail("Didn't found query monitor.");
- }
- int waited = 0;
- while (true) {
- if (qm.getQueryMonitorThreadCount() != threadCount) {
- if (waited <= waitTime) {
- Wait.pause(10);
- waited += 10;
- continue;
- } else {
- fail("Didn't found expected monitoring thread. Expected: " + threadCount
- + " found :" + qm.getQueryMonitorThreadCount());
- }
- }
- break;
- }
- }
- };
- vm.invoke(validateThreadCnt);
+ private static void exuteQuery() {
+ QueryService queryService;
+ if (ClusterStartupRule.getClientCache() == null) {
+ queryService = ClusterStartupRule.getCache().getQueryService();
+ } else {
+ queryService = ClusterStartupRule.getClientCache().getQueryService();
+ }
+ for (int k = 0; k < queryStr.length; k++) {
+ String qStr = queryStr[k];
+ try {
+ Query query = queryService.newQuery(qStr);
+ query.execute();
+ fail("The query should have been canceled by the QueryMonitor. Query: " + qStr);
+ } catch (Exception e) {
+ verifyException(e);
+ }
+ }
}
- /**
- * Starts a bridge server on the given port, using the given deserializeValues and
- * notifyBySubscription to serve up the given region.
- */
- protected int startBridgeServer(int port, boolean notifyBySubscription) throws IOException {
-
- Cache cache = cacheRule.getCache();
- CacheServer bridge = cache.addCacheServer();
- bridge.setPort(port);
- bridge.setNotifyBySubscription(notifyBySubscription);
- bridge.start();
- return bridge.getPort();
+ private static void configureClientCacheFactory(ClientCacheFactory ccf, int... serverPorts) {
+ for (int serverPort : serverPorts) {
+ ccf.addPoolServer("localhost", serverPort);
+ }
+ ccf.setPoolReadTimeout(10 * 60 * 1000); // 10 min
+ ccf.setPoolSubscriptionEnabled(true);
}
- /**
- * Stops the bridge server that serves up the given cache.
- */
- private void stopBridgeServer(Cache cache) {
- CacheServer bridge = (CacheServer) cache.getCacheServers().iterator().next();
- bridge.stop();
- assertFalse(bridge.isRunning());
+ private static void createReplicateRegionWithEviction(File workingDir) {
+ InternalCache cache = ClusterStartupRule.getCache();
+ DiskStoreFactory dsf = cache.createDiskStoreFactory();
+ dsf.setDiskDirs(new File[] {workingDir}).create("ds");
+ EvictionAttributes evictAttrs =
+ EvictionAttributes.createLRUEntryAttributes(100, EvictionAction.OVERFLOW_TO_DISK);
+ cache.createRegionFactory(RegionShortcut.REPLICATE)
+ .setDiskStoreName("ds")
+ .setEvictionAttributes(evictAttrs)
+ .create("exampleRegion");
}
- private class QueryTimeoutHook implements DefaultQuery.TestHook {
-
- long timeout;
+ private static void verifyException(Exception e) {
+ String error = e.getMessage();
+ if (e.getCause() != null) {
+ error = e.getCause().getMessage();
+ }
- private QueryTimeoutHook(long timeout) {
- this.timeout = timeout;
+ if (error.contains("Query execution cancelled after exceeding max execution time")
+ || error.contains("The Query completed sucessfully before it got canceled")
+ || error.contains("The QueryMonitor thread may be sleeping longer than the set sleep time")
+ || error.contains(
+ "The query task could not be found but the query is marked as having been canceled")) {
+ // Expected exception.
+ return;
}
- public void doTestHook(String description) {
- if (description.equals("6")) {
- try {
- Thread.sleep(timeout * 2);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- }
+ System.out.println("Unexpected exception:");
+ if (e.getCause() != null) {
+ e.getCause().printStackTrace();
+ } else {
+ e.printStackTrace();
}
+ fail("Expected exception Not found. Expected exception with message: \n"
+ + "\"Query execution taking more than the max execution time\"" + "\n Found \n" + error);
+ }
+
+
+ @After
+ public void reset() {
+ VMProvider.invokeInEveryMember(() -> {
+ DefaultQuery.testHook = null;
+ GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = -1;
+ }, server1, server2);
+ }
+
+ private static class QueryTimeoutHook implements DefaultQuery.TestHook {
public void doTestHook(int spot) {
- doTestHook("" + spot);
+ if (spot != 6) {
+ return;
+ }
+ try {
+ TimeUnit.MILLISECONDS.sleep(20);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
}
-
}
+ private static String[] queryStr = {"SELECT ID FROM /exampleRegion p WHERE p.ID > 100",
+ "SELECT DISTINCT * FROM /exampleRegion x, x.positions.values WHERE x.pk != '1000'",
+ "SELECT DISTINCT * FROM /exampleRegion x, x.positions.values WHERE x.pkid != '1'",
+ "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values WHERE p.pk > '1'",
+ "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values WHERE p.pkid != '53'",
+ "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE pos.Id > 100",
+ "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE pos.Id > 100 and pos.secId IN SET('YHOO', 'IBM', 'AMZN')",
+ "SELECT * FROM /exampleRegion p WHERE p.ID > 100 and p.status = 'active' and p.ID < 100000",
+ "SELECT * FROM /exampleRegion WHERE ID > 100 and status = 'active'",
+ "SELECT DISTINCT * FROM /exampleRegion p WHERE p.ID > 100 and p.status = 'active' and p.ID < 100000",
+ "SELECT DISTINCT ID FROM /exampleRegion WHERE status = 'active'",
+ "SELECT DISTINCT ID FROM /exampleRegion p WHERE p.status = 'active'",
+ "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE pos.secId IN SET('YHOO', 'IBM', 'AMZN')",
+ "SELECT DISTINCT proj1:p, proj2:itrX FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos"
+ + " WHERE pos.secId = 'YHOO') as itrX",
+ "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos"
+ + " WHERE pos.secId = 'YHOO') as itrX",
+ "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT p.ID FROM /exampleRegion x"
+ + " WHERE x.ID = p.ID) as itrX",
+ "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion x, x.positions.values pos"
+ + " WHERE x.ID = p.ID) as itrX",
+ "SELECT DISTINCT x.ID FROM /exampleRegion x, x.positions.values v WHERE "
+ + "v.secId = element(SELECT DISTINCT vals.secId FROM /exampleRegion p, p.positions.values vals WHERE vals.secId = 'YHOO')",
+ "SELECT DISTINCT * FROM /exampleRegion p, positions.values pos WHERE (p.ID > 1 or p.status = 'active') or (true AND pos.secId ='IBM')",
+ "SELECT DISTINCT * FROM /exampleRegion p, positions.values pos WHERE (p.ID > 1 or p.status = 'active') or (true AND pos.secId !='IBM')",
+ "SELECT DISTINCT structset.sos, structset.key "
+ + "FROM /exampleRegion p, p.positions.values outerPos, "
+ + "(SELECT DISTINCT key: key, sos: pos.sharesOutstanding "
+ + "FROM /exampleRegion.entries pf, pf.value.positions.values pos "
+ + "where outerPos.secId != 'IBM' AND "
+ + "pf.key IN (SELECT DISTINCT * FROM pf.value.collectionHolderMap['0'].arr)) structset "
+ + "where structset.sos > 2000",
+ "SELECT DISTINCT * " + "FROM /exampleRegion p, p.positions.values outerPos, "
+ + "(SELECT DISTINCT key: key, sos: pos.sharesOutstanding "
+ + "FROM /exampleRegion.entries pf, pf.value.positions.values pos "
+ + "where outerPos.secId != 'IBM' AND "
+ + "pf.key IN (SELECT DISTINCT * FROM pf.value.collectionHolderMap['0'].arr)) structset "
+ + "where structset.sos > 2000",
+ "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values position "
+ + "WHERE (true = null OR position.secId = 'SUN') AND true",};
+
}