You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2018/04/02 17:10:33 UTC
[geode] branch develop updated: GEODE-4867: Added query time
expiration checks. (#1624)
This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 750417f GEODE-4867: Added query time expiration checks. (#1624)
750417f is described below
commit 750417f7b8213e23dc70823256754df3698ad1c6
Author: Nabarun Nag <na...@users.noreply.github.com>
AuthorDate: Mon Apr 2 10:10:30 2018 -0700
GEODE-4867: Added query time expiration checks. (#1624)
* Along with low memory checks, the query execution time limit is also checked.
---
.../cache/PartitionedRegionQueryEvaluator.java | 28 ++--
.../internal/cache/partitioned/QueryMessage.java | 4 +
.../ResourceManagerWithQueryMonitorDUnitTest.java | 143 +++++++++++++++++++++
3 files changed, 166 insertions(+), 9 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
index 4120851..7b7229c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
@@ -38,6 +38,7 @@ import org.apache.geode.CopyHelper;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.query.QueryException;
import org.apache.geode.cache.query.QueryExecutionLowMemoryException;
+import org.apache.geode.cache.query.QueryExecutionTimeoutException;
import org.apache.geode.cache.query.QueryInvocationTargetException;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.Struct;
@@ -230,16 +231,23 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
}
synchronized (results) {
- if (!QueryMonitor.isLowMemory()) {
+ if (!QueryMonitor.isLowMemory() && !this.query.isCanceled()) {
results.add(objects);
} else {
if (logger.isDebugEnabled()) {
logger.debug("query canceled while gathering results, aborting");
}
- String reason =
- LocalizedStrings.QueryMonitor_LOW_MEMORY_WHILE_GATHERING_RESULTS_FROM_PARTITION_REGION
- .toLocalizedString();
- query.setCanceled(true, new QueryExecutionLowMemoryException(reason));
+ if (QueryMonitor.isLowMemory()) {
+ String reason =
+ LocalizedStrings.QueryMonitor_LOW_MEMORY_WHILE_GATHERING_RESULTS_FROM_PARTITION_REGION
+ .toLocalizedString();
+ query.setCanceled(true, new QueryExecutionLowMemoryException(reason));
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("query cancelled while gathering results, aborting due to exception "
+ + query.getQueryCanceledException());
+ }
+ }
return false;
}
@@ -659,7 +667,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
for (Map.Entry<InternalDistributedMember, Collection<Collection>> e : this.resultsPerMember
.entrySet()) {
- checkLowMemory();
+ checkIfQueryShouldBeCancelled();
// If its a local query, the results should contain domain objects.
// in case of client/server query the objects from PdxInstances were
// retrieved on the client side.
@@ -692,7 +700,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
}
} else {
for (Collection res : e.getValue()) {
- checkLowMemory();
+ checkIfQueryShouldBeCancelled();
// final TaintableArrayList res = (TaintableArrayList) e.getValue();
if (res != null) {
if (isDebugEnabled) {
@@ -705,7 +713,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
boolean[] objectChangedMarker = new boolean[1];
for (Object obj : res) {
- checkLowMemory();
+ checkIfQueryShouldBeCancelled();
int occurence = 0;
obj = PDXUtils.convertPDX(obj, isStruct, getDomainObjectForPdx, getDeserializedObject,
localResults, objectChangedMarker, true);
@@ -750,7 +758,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
return this.cumulativeResults;
}
- private void checkLowMemory() {
+ private void checkIfQueryShouldBeCancelled() {
if (QueryMonitor.isLowMemory()) {
String reason =
LocalizedStrings.QueryMonitor_LOW_MEMORY_WHILE_GATHERING_RESULTS_FROM_PARTITION_REGION
@@ -760,6 +768,8 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
DefaultQuery.testHook.doTestHook(5);
}
throw query.getQueryCanceledException();
+ } else if (query.isCanceled()) {
+ throw query.getQueryCanceledException();
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
index 1e9f6e8..ad3fb19 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
@@ -30,6 +30,7 @@ import org.apache.geode.DataSerializer;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.query.QueryException;
import org.apache.geode.cache.query.QueryExecutionLowMemoryException;
+import org.apache.geode.cache.query.QueryExecutionTimeoutException;
import org.apache.geode.cache.query.Struct;
import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.cache.query.internal.IndexTrackingQueryObserver;
@@ -48,6 +49,7 @@ import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.NanoTimer;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.ForceReattemptException;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.PRQueryProcessor;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.Token;
@@ -244,6 +246,8 @@ public class QueryMessage extends StreamingPartitionOperation.StreamingPartition
String reason = LocalizedStrings.QueryMonitor_LOW_MEMORY_CANCELED_QUERY
.toLocalizedString(QueryMonitor.getMemoryUsedDuringLowMemory());
throw new QueryExecutionLowMemoryException(reason);
+ } else if (query.isCanceled()) {
+ throw query.getQueryCanceledException();
}
super.operateOnPartitionedRegion(dm, pr, startTime);
} finally {
diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java
index 8e54faf..b6debfa 100755
--- a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java
@@ -28,6 +28,7 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.logging.log4j.Logger;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -72,6 +73,7 @@ import org.apache.geode.internal.cache.control.MemoryEvent;
import org.apache.geode.internal.cache.control.ResourceListener;
import org.apache.geode.internal.cache.control.TestMemoryThresholdListener;
import org.apache.geode.internal.i18n.LocalizedStrings;
+import org.apache.geode.internal.logging.LogService;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.Host;
@@ -87,6 +89,8 @@ import org.apache.geode.test.junit.categories.OQLQueryTest;
@Category({DistributedTest.class, OQLQueryTest.class})
public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCase {
+ private static final Logger logger = LogService.getLogger();
+
private static int MAX_TEST_QUERY_TIMEOUT = 4000;
private static int TEST_QUERY_TIMEOUT = 1000;
private static final int CRITICAL_HEAP_USED = 950;
@@ -187,6 +191,27 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
doCriticalMemoryHitTestOnServer("portfolios", false, 85/* crit threshold */, false, -1, true);
}
+ // Query directly on member with RM and QM set
+ @Test
+ public void whenTimeoutIsSetAndAQueryIsExecutedThenTimeoutMustStopTheQueryBeforeCriticalMemory()
+ throws Exception {
+ // Timeout is set along with critical heap but it be called after the timeout expires
+ // Timeout is set to 1ms which is very unrealistic time period for a query to be able to fetch
+ // 200 entries from the region successfully, hence a timeout is expected.
+ executeQueryWithTimeoutSetAndCriticalThreshold("portfolios", false, 85/* crit threshold */,
+ false, 1, true);
+ }
+
+ @Test
+ public void whenTimeoutIsSetAndAQueryIsExecutedFromClientThenTimeoutMustStopTheQueryBeforeCriticalMemory()
+ throws Exception {
+ // Timeout is set along with critical heap but it be called after the timeout expires
+ // Timeout is set to 1ms which is very unrealistic time period for a query to be able to fetch
+ // 200 entries from the region successfully, hence a timeout is expected.
+ executeQueryFromClientWithTimeoutSetAndCriticalThreshold("portfolios", false,
+ 85/* crit threshold */, false, 1, true);
+ }
+
@Test
public void testRMAndNoTimeoutSetParRegOnServer() throws Exception {
doCriticalMemoryHitTestOnServer("portfolios", true, 85/* crit threshold */, false, -1, true);
@@ -700,6 +725,54 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
}
+ private void executeQueryFromClientWithTimeoutSetAndCriticalThreshold(final String regionName,
+ boolean createPR, final int criticalThreshold, final boolean disabledQueryMonitorForLowMem,
+ final int queryTimeout, final boolean hitCriticalThreshold) throws Exception {
+ // create region on the server
+ final Host host = Host.getHost(0);
+ final VM server = host.getVM(0);
+ final VM client = host.getVM(1);
+ final int numObjects = 200;
+ try {
+ final int port = AvailablePortHelper.getRandomAvailableTCPPort();
+ startCacheServer(server, port, criticalThreshold, disabledQueryMonitorForLowMem, queryTimeout,
+ regionName, createPR, 0);
+ startClient(client, server, port, regionName);
+ populateData(server, regionName, numObjects);
+ executeQueryWithCriticalHeapCalledAfterTimeout(server, client, regionName, queryTimeout,
+ hitCriticalThreshold);
+ if (hitCriticalThreshold) {
+ vmRecoversFromCriticalHeap(server);
+ }
+
+ } finally {
+ stopServer(server);
+ }
+ }
+
+ private void executeQueryWithTimeoutSetAndCriticalThreshold(final String regionName,
+ boolean createPR, final int criticalThreshold, final boolean disabledQueryMonitorForLowMem,
+ final int queryTimeout, final boolean hitCriticalThreshold) throws Exception {
+ // create region on the server
+ final Host host = Host.getHost(0);
+ final VM server = host.getVM(0);
+ final int numObjects = 200;
+ try {
+ final int port = AvailablePortHelper.getRandomAvailableTCPPort();
+ startCacheServer(server, port, criticalThreshold, disabledQueryMonitorForLowMem, queryTimeout,
+ regionName, createPR, 0);
+ populateData(server, regionName, numObjects);
+ executeQueryWithCriticalHeapCalledAfterTimeout(server, server, regionName, queryTimeout,
+ hitCriticalThreshold);
+ if (hitCriticalThreshold) {
+ vmRecoversFromCriticalHeap(server);
+ }
+
+ } finally {
+ stopServer(server);
+ }
+ }
+
// This helper method will set up a test hook
// Execute a query on the server, pause due to the test hook
// Execute a critical heap event
@@ -748,6 +821,76 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
}
}
+ private void executeQueryWithCriticalHeapCalledAfterTimeout(VM server, VM client,
+ final String regionName, final int queryTimeout, final boolean hitCriticalThreshold) {
+ createLatchTestHook(server);
+ AsyncInvocation queryExecution = executeQueryWithTimeout(client, regionName, queryTimeout);
+
+ // Wait till the timeout expires on the query
+ try {
+ Thread.sleep(queryTimeout + 1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ // We simulate a low memory/critical heap percentage hit
+ // But by design of this test the query must have been already terminated because of a 1ms
+ // timeout
+ if (hitCriticalThreshold) {
+ vmHitsCriticalHeap(server);
+ }
+
+ releaseHook(server);
+
+ ThreadUtils.join(queryExecution, 60000);
+ // Make sure no exceptions were thrown during query testing
+ try {
+ assertEquals(0, queryExecution.getResult());
+ } catch (Throwable e) {
+ e.printStackTrace();
+ fail("queryExecution.getResult() threw Exception " + e.toString());
+ }
+ }
+
+ private AsyncInvocation executeQueryWithTimeout(VM client, final String regionName,
+ final int queryTimeout) {
+ return client.invokeAsync(new SerializableCallable("execute query from client") {
+ public Object call() throws CacheException {
+ QueryService qs = null;
+ try {
+ qs = getCache().getQueryService();
+ Query query = qs.newQuery("Select * From /" + regionName);
+ query.execute();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (e instanceof QueryExecutionTimeoutException) {
+ logger.info("Query Execution must be terminated by a timeout.");
+ return 0;
+ }
+ if (e instanceof ServerOperationException) {
+ ServerOperationException soe = (ServerOperationException) e;
+ if (soe.getRootCause() instanceof QueryException) {
+ QueryException qe = (QueryException) soe.getRootCause();
+ if (isExceptionDueToTimeout(qe, queryTimeout)) {
+ logger.info("Query Execution must be terminated by a timeout. Expected behavior");
+ return 0;
+ }
+ } else if (soe.getRootCause() instanceof QueryExecutionTimeoutException) {
+ logger.info("Query Execution must be terminated by a timeout.");
+ return 0;
+ }
+ }
+ e.printStackTrace();
+ throw new CacheException(
+ "The query should have been terminated by a timeout exception but instead hit a different exception :"
+ + e) {};
+ }
+ return -1;
+ }
+ });
+
+ }
+
private AsyncInvocation invokeClientQuery(VM client, final String regionName,
final boolean disabledQueryMonitorForLowMem, final int queryTimeout,
final boolean hitCriticalThreshold) {
--
To stop receiving notification emails like this one, please contact
nnag@apache.org.