You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2018/04/02 17:10:33 UTC

[geode] branch develop updated: GEODE-4867: Added query time expiration checks. (#1624)

This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 750417f  GEODE-4867: Added query time expiration checks. (#1624)
750417f is described below

commit 750417f7b8213e23dc70823256754df3698ad1c6
Author: Nabarun Nag <na...@users.noreply.github.com>
AuthorDate: Mon Apr 2 10:10:30 2018 -0700

    GEODE-4867: Added query time expiration checks. (#1624)
    
    * Along with low memory checks, the query execution time limit is also checked.
---
 .../cache/PartitionedRegionQueryEvaluator.java     |  28 ++--
 .../internal/cache/partitioned/QueryMessage.java   |   4 +
 .../ResourceManagerWithQueryMonitorDUnitTest.java  | 143 +++++++++++++++++++++
 3 files changed, 166 insertions(+), 9 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
index 4120851..7b7229c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
@@ -38,6 +38,7 @@ import org.apache.geode.CopyHelper;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.query.QueryException;
 import org.apache.geode.cache.query.QueryExecutionLowMemoryException;
+import org.apache.geode.cache.query.QueryExecutionTimeoutException;
 import org.apache.geode.cache.query.QueryInvocationTargetException;
 import org.apache.geode.cache.query.SelectResults;
 import org.apache.geode.cache.query.Struct;
@@ -230,16 +231,23 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
     }
 
     synchronized (results) {
-      if (!QueryMonitor.isLowMemory()) {
+      if (!QueryMonitor.isLowMemory() && !this.query.isCanceled()) {
         results.add(objects);
       } else {
         if (logger.isDebugEnabled()) {
           logger.debug("query canceled while gathering results, aborting");
         }
-        String reason =
-            LocalizedStrings.QueryMonitor_LOW_MEMORY_WHILE_GATHERING_RESULTS_FROM_PARTITION_REGION
-                .toLocalizedString();
-        query.setCanceled(true, new QueryExecutionLowMemoryException(reason));
+        if (QueryMonitor.isLowMemory()) {
+          String reason =
+              LocalizedStrings.QueryMonitor_LOW_MEMORY_WHILE_GATHERING_RESULTS_FROM_PARTITION_REGION
+                  .toLocalizedString();
+          query.setCanceled(true, new QueryExecutionLowMemoryException(reason));
+        } else {
+          if (logger.isDebugEnabled()) {
+            logger.debug("query cancelled while gathering results, aborting due to exception "
+                + query.getQueryCanceledException());
+          }
+        }
         return false;
       }
 
@@ -659,7 +667,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
 
     for (Map.Entry<InternalDistributedMember, Collection<Collection>> e : this.resultsPerMember
         .entrySet()) {
-      checkLowMemory();
+      checkIfQueryShouldBeCancelled();
       // If its a local query, the results should contain domain objects.
       // in case of client/server query the objects from PdxInstances were
       // retrieved on the client side.
@@ -692,7 +700,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
         }
       } else {
         for (Collection res : e.getValue()) {
-          checkLowMemory();
+          checkIfQueryShouldBeCancelled();
           // final TaintableArrayList res = (TaintableArrayList) e.getValue();
           if (res != null) {
             if (isDebugEnabled) {
@@ -705,7 +713,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
             boolean[] objectChangedMarker = new boolean[1];
 
             for (Object obj : res) {
-              checkLowMemory();
+              checkIfQueryShouldBeCancelled();
               int occurence = 0;
               obj = PDXUtils.convertPDX(obj, isStruct, getDomainObjectForPdx, getDeserializedObject,
                   localResults, objectChangedMarker, true);
@@ -750,7 +758,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
     return this.cumulativeResults;
   }
 
-  private void checkLowMemory() {
+  private void checkIfQueryShouldBeCancelled() {
     if (QueryMonitor.isLowMemory()) {
       String reason =
           LocalizedStrings.QueryMonitor_LOW_MEMORY_WHILE_GATHERING_RESULTS_FROM_PARTITION_REGION
@@ -760,6 +768,8 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
         DefaultQuery.testHook.doTestHook(5);
       }
       throw query.getQueryCanceledException();
+    } else if (query.isCanceled()) {
+      throw query.getQueryCanceledException();
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
index 1e9f6e8..ad3fb19 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
@@ -30,6 +30,7 @@ import org.apache.geode.DataSerializer;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.query.QueryException;
 import org.apache.geode.cache.query.QueryExecutionLowMemoryException;
+import org.apache.geode.cache.query.QueryExecutionTimeoutException;
 import org.apache.geode.cache.query.Struct;
 import org.apache.geode.cache.query.internal.DefaultQuery;
 import org.apache.geode.cache.query.internal.IndexTrackingQueryObserver;
@@ -48,6 +49,7 @@ import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.NanoTimer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.ForceReattemptException;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.PRQueryProcessor;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.Token;
@@ -244,6 +246,8 @@ public class QueryMessage extends StreamingPartitionOperation.StreamingPartition
         String reason = LocalizedStrings.QueryMonitor_LOW_MEMORY_CANCELED_QUERY
             .toLocalizedString(QueryMonitor.getMemoryUsedDuringLowMemory());
         throw new QueryExecutionLowMemoryException(reason);
+      } else if (query.isCanceled()) {
+        throw query.getQueryCanceledException();
       }
       super.operateOnPartitionedRegion(dm, pr, startTime);
     } finally {
diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java
index 8e54faf..b6debfa 100755
--- a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.logging.log4j.Logger;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -72,6 +73,7 @@ import org.apache.geode.internal.cache.control.MemoryEvent;
 import org.apache.geode.internal.cache.control.ResourceListener;
 import org.apache.geode.internal.cache.control.TestMemoryThresholdListener;
 import org.apache.geode.internal.i18n.LocalizedStrings;
+import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.DistributedTestUtils;
 import org.apache.geode.test.dunit.Host;
@@ -87,6 +89,8 @@ import org.apache.geode.test.junit.categories.OQLQueryTest;
 @Category({DistributedTest.class, OQLQueryTest.class})
 public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCase {
 
+  private static final Logger logger = LogService.getLogger();
+
   private static int MAX_TEST_QUERY_TIMEOUT = 4000;
   private static int TEST_QUERY_TIMEOUT = 1000;
   private static final int CRITICAL_HEAP_USED = 950;
@@ -187,6 +191,27 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
     doCriticalMemoryHitTestOnServer("portfolios", false, 85/* crit threshold */, false, -1, true);
   }
 
+  // Query directly on member with RM and QM set
+  @Test
+  public void whenTimeoutIsSetAndAQueryIsExecutedThenTimeoutMustStopTheQueryBeforeCriticalMemory()
+      throws Exception {
+    // Timeout is set along with critical heap but it be called after the timeout expires
+    // Timeout is set to 1ms which is very unrealistic time period for a query to be able to fetch
+    // 200 entries from the region successfully, hence a timeout is expected.
+    executeQueryWithTimeoutSetAndCriticalThreshold("portfolios", false, 85/* crit threshold */,
+        false, 1, true);
+  }
+
+  @Test
+  public void whenTimeoutIsSetAndAQueryIsExecutedFromClientThenTimeoutMustStopTheQueryBeforeCriticalMemory()
+      throws Exception {
+    // Timeout is set along with critical heap but it be called after the timeout expires
+    // Timeout is set to 1ms which is very unrealistic time period for a query to be able to fetch
+    // 200 entries from the region successfully, hence a timeout is expected.
+    executeQueryFromClientWithTimeoutSetAndCriticalThreshold("portfolios", false,
+        85/* crit threshold */, false, 1, true);
+  }
+
   @Test
   public void testRMAndNoTimeoutSetParRegOnServer() throws Exception {
     doCriticalMemoryHitTestOnServer("portfolios", true, 85/* crit threshold */, false, -1, true);
@@ -700,6 +725,54 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
   }
 
 
+  private void executeQueryFromClientWithTimeoutSetAndCriticalThreshold(final String regionName,
+      boolean createPR, final int criticalThreshold, final boolean disabledQueryMonitorForLowMem,
+      final int queryTimeout, final boolean hitCriticalThreshold) throws Exception {
+    // create region on the server
+    final Host host = Host.getHost(0);
+    final VM server = host.getVM(0);
+    final VM client = host.getVM(1);
+    final int numObjects = 200;
+    try {
+      final int port = AvailablePortHelper.getRandomAvailableTCPPort();
+      startCacheServer(server, port, criticalThreshold, disabledQueryMonitorForLowMem, queryTimeout,
+          regionName, createPR, 0);
+      startClient(client, server, port, regionName);
+      populateData(server, regionName, numObjects);
+      executeQueryWithCriticalHeapCalledAfterTimeout(server, client, regionName, queryTimeout,
+          hitCriticalThreshold);
+      if (hitCriticalThreshold) {
+        vmRecoversFromCriticalHeap(server);
+      }
+
+    } finally {
+      stopServer(server);
+    }
+  }
+
+  private void executeQueryWithTimeoutSetAndCriticalThreshold(final String regionName,
+      boolean createPR, final int criticalThreshold, final boolean disabledQueryMonitorForLowMem,
+      final int queryTimeout, final boolean hitCriticalThreshold) throws Exception {
+    // create region on the server
+    final Host host = Host.getHost(0);
+    final VM server = host.getVM(0);
+    final int numObjects = 200;
+    try {
+      final int port = AvailablePortHelper.getRandomAvailableTCPPort();
+      startCacheServer(server, port, criticalThreshold, disabledQueryMonitorForLowMem, queryTimeout,
+          regionName, createPR, 0);
+      populateData(server, regionName, numObjects);
+      executeQueryWithCriticalHeapCalledAfterTimeout(server, server, regionName, queryTimeout,
+          hitCriticalThreshold);
+      if (hitCriticalThreshold) {
+        vmRecoversFromCriticalHeap(server);
+      }
+
+    } finally {
+      stopServer(server);
+    }
+  }
+
   // This helper method will set up a test hook
   // Execute a query on the server, pause due to the test hook
   // Execute a critical heap event
@@ -748,6 +821,76 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa
     }
   }
 
+  private void executeQueryWithCriticalHeapCalledAfterTimeout(VM server, VM client,
+      final String regionName, final int queryTimeout, final boolean hitCriticalThreshold) {
+    createLatchTestHook(server);
+    AsyncInvocation queryExecution = executeQueryWithTimeout(client, regionName, queryTimeout);
+
+    // Wait till the timeout expires on the query
+    try {
+      Thread.sleep(queryTimeout + 1000);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+    // We simulate a low memory/critical heap percentage hit
+    // But by design of this test the query must have been already terminated because of a 1ms
+    // timeout
+    if (hitCriticalThreshold) {
+      vmHitsCriticalHeap(server);
+    }
+
+    releaseHook(server);
+
+    ThreadUtils.join(queryExecution, 60000);
+    // Make sure no exceptions were thrown during query testing
+    try {
+      assertEquals(0, queryExecution.getResult());
+    } catch (Throwable e) {
+      e.printStackTrace();
+      fail("queryExecution.getResult() threw Exception " + e.toString());
+    }
+  }
+
+  private AsyncInvocation executeQueryWithTimeout(VM client, final String regionName,
+      final int queryTimeout) {
+    return client.invokeAsync(new SerializableCallable("execute query from client") {
+      public Object call() throws CacheException {
+        QueryService qs = null;
+        try {
+          qs = getCache().getQueryService();
+          Query query = qs.newQuery("Select * From /" + regionName);
+          query.execute();
+
+        } catch (Exception e) {
+          e.printStackTrace();
+          if (e instanceof QueryExecutionTimeoutException) {
+            logger.info("Query Execution must be terminated by a timeout.");
+            return 0;
+          }
+          if (e instanceof ServerOperationException) {
+            ServerOperationException soe = (ServerOperationException) e;
+            if (soe.getRootCause() instanceof QueryException) {
+              QueryException qe = (QueryException) soe.getRootCause();
+              if (isExceptionDueToTimeout(qe, queryTimeout)) {
+                logger.info("Query Execution must be terminated by a timeout. Expected behavior");
+                return 0;
+              }
+            } else if (soe.getRootCause() instanceof QueryExecutionTimeoutException) {
+              logger.info("Query Execution must be terminated by a timeout.");
+              return 0;
+            }
+          }
+          e.printStackTrace();
+          throw new CacheException(
+              "The query should have been terminated by a timeout exception but instead hit a different exception :"
+                  + e) {};
+        }
+        return -1;
+      }
+    });
+
+  }
+
   private AsyncInvocation invokeClientQuery(VM client, final String regionName,
       final boolean disabledQueryMonitorForLowMem, final int queryTimeout,
       final boolean hitCriticalThreshold) {

-- 
To stop receiving notification emails like this one, please contact
nnag@apache.org.