You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mc...@apache.org on 2018/11/09 23:12:14 UTC

[geode] branch develop updated: GEODE-5993: Eliminate race in monitorQueryThread() (#2818)

This is an automated email from the ASF dual-hosted git repository.

mcmellawatt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new aab0198  GEODE-5993: Eliminate race in monitorQueryThread() (#2818)
aab0198 is described below

commit aab0198e8478d4246042b2eb889c8ce7e28bb52e
Author: Bill Burcham <bi...@gmail.com>
AuthorDate: Fri Nov 9 15:12:03 2018 -0800

    GEODE-5993: Eliminate race in monitorQueryThread() (#2818)
    
    A race existed between setting low memory in the heap monitor thread,
    and checking the low memory state in query monitoring thread.
    
    The cancelation executor was shut down and no longer accepting new
    tasks when this race occurred, causing a RejectedExecutionException.
    
    This commit solves that problem by encapsulating the scheduling
    behavior using the state design pattern.
    
    Co-authored-by: Ryan McMahon <rm...@pivotal.io>
    Co-authored-by: Bill Burcham <bb...@pivotal.io>
---
 .../internal/QueryMonitorIntegrationTest.java      |  10 +-
 .../apache/geode/codeAnalysis/excludedClasses.txt  |   5 +-
 .../MonitorQueryUnderContentionBenchmark.java      |   2 +-
 .../cache/query/internal/DefaultQueryService.java  |   2 +-
 .../geode/cache/query/internal/QueryMonitor.java   | 349 +++++++++++++--------
 .../geode/internal/cache/GemFireCacheImpl.java     |   2 +-
 .../internal/cache/partitioned/QueryMessage.java   |   6 +-
 .../cache/query/internal/QueryMonitorTest.java     |   8 +-
 8 files changed, 234 insertions(+), 150 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryMonitorIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryMonitorIntegrationTest.java
index ff7035a..41add0e 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryMonitorIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryMonitorIntegrationTest.java
@@ -19,6 +19,8 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
@@ -27,6 +29,7 @@ import org.junit.Test;
 import org.mockito.stubbing.Answer;
 
 import org.apache.geode.cache.CacheRuntimeException;
+import org.apache.geode.cache.query.QueryExecutionLowMemoryException;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.test.awaitility.GeodeAwaitility;
 
@@ -66,7 +69,7 @@ public class QueryMonitorIntegrationTest {
 
     try {
       queryMonitor = new QueryMonitor(
-          () -> scheduledThreadPoolExecutor,
+          scheduledThreadPoolExecutor,
           cache,
           NEVER_EXPIRE_MILLIS);
 
@@ -74,6 +77,9 @@ public class QueryMonitorIntegrationTest {
 
       queryMonitor.setLowMemory(true, 1);
 
+      verify(query, times(1))
+          .setQueryCanceledException(any(QueryExecutionLowMemoryException.class));
+
       assertThatThrownBy(QueryMonitor::throwExceptionIfQueryOnCurrentThreadIsCanceled,
           "Expected setLowMemory(true,_) to cancel query immediately, but it didn't.",
           QueryExecutionCanceledException.class);
@@ -95,7 +101,7 @@ public class QueryMonitorIntegrationTest {
   public void monitorQueryThreadCancelsLongRunningQueriesAndSetsExceptionAndThrowsException() {
 
     QueryMonitor queryMonitor = new QueryMonitor(
-        () -> new ScheduledThreadPoolExecutor(1),
+        new ScheduledThreadPoolExecutor(1),
         cache,
         EXPIRE_QUICK_MILLIS);
 
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index f3c3476..f67da94 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -85,6 +85,10 @@ org/apache/geode/distributed/internal/RuntimeDistributionConfigImpl
 org/apache/geode/pdx/internal/PdxInstanceEnum
 org/apache/geode/pdx/internal/PdxInstanceImpl
 org/apache/geode/pdx/internal/WritablePdxInstanceImpl
+org/apache/geode/cache/query/internal/QueryMonitor$MemoryStateImpl
+org/apache/geode/cache/query/internal/QueryMonitor$MemoryStateImpl$1
+org/apache/geode/cache/query/internal/QueryMonitor$MemoryStateImpl$2
+org/apache/geode/cache/query/internal/parse/ASTArithmeticOp
 org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy
 org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy$1
 org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy$2
@@ -92,6 +96,5 @@ org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy$3
 org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy$4
 org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy$5
 org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl$ClosedPoolConnectionList
-org/apache/geode/cache/query/internal/parse/ASTArithmeticOp
 org/apache/geode/internal/cache/partitioned/ManageBackupBucketMessage$ReplyType
 org/apache/geode/internal/cache/AfterCompletion$Action
diff --git a/geode-core/src/jmh/java/org/apache/geode/cache/query/internal/MonitorQueryUnderContentionBenchmark.java b/geode-core/src/jmh/java/org/apache/geode/cache/query/internal/MonitorQueryUnderContentionBenchmark.java
index b6d12dd..f65b801 100644
--- a/geode-core/src/jmh/java/org/apache/geode/cache/query/internal/MonitorQueryUnderContentionBenchmark.java
+++ b/geode-core/src/jmh/java/org/apache/geode/cache/query/internal/MonitorQueryUnderContentionBenchmark.java
@@ -97,7 +97,7 @@ public class MonitorQueryUnderContentionBenchmark {
     LogService.setBaseLogLevel(org.apache.logging.log4j.Level.OFF);
 
     queryMonitor =
-        new QueryMonitor(() -> (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1),
+        new QueryMonitor((ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1),
             mock(InternalCache.class), QUERY_MAX_EXECUTION_TIME);
 
     final int numberOfThreads =
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java
index b5d85bb..7b107ae 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java
@@ -140,7 +140,7 @@ public class DefaultQueryService implements InternalQueryService {
     if (QueryMonitor.isLowMemory()) {
       String reason = String.format(
           "Query execution canceled due to memory threshold crossed in system, memory used: %s bytes.",
-          QueryMonitor.getMemoryUsedDuringLowMemory());
+          QueryMonitor.getMemoryUsedBytes());
       throw new QueryExecutionLowMemoryException(reason);
     }
     if (queryString == null)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java
index 7103006..94b90ab 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java
@@ -16,6 +16,7 @@ package org.apache.geode.cache.query.internal;
 
 import java.util.Objects;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -32,32 +33,33 @@ import org.apache.geode.internal.logging.LogService;
 
 /**
  * {@link QueryMonitor} class, monitors the query execution time. In typical usage, the maximum
- * query execution time might be set (upon construction) via the system property
- * {@link GemFireCacheImpl#MAX_QUERY_EXECUTION_TIME}. The number of threads allocated to query
- * monitoring is determined by the instance of {@link ScheduledThreadPoolExecutorFactory} passed
- * to the constructor.
+ * query execution time might be set (upon construction) via the system property {@link
+ * GemFireCacheImpl#MAX_QUERY_EXECUTION_TIME}. The number of threads allocated to query monitoring
+ * is determined by the instance of {@link ScheduledThreadPoolExecutor} passed to the
+ * constructor.
  *
- * This class supports a low-memory mode, established by {@link #setLowMemory(boolean, long)}. \
- * In that mode, any attempt to monitor a (new) query will throw an exception.
+ * This class supports a low-memory mode, established by {@link #setLowMemory(boolean, long)}
+ * with {@code isLowMemory=true}. In that mode, any attempt to monitor a (new) query will
+ * throw an exception.
  *
- * The {@link #monitorQueryThread(DefaultQuery)} method initiates monitoring of a query.
- * {@link #stopMonitoringQueryThread(DefaultQuery)} stops monitoring a query.
+ * The {@link #monitorQueryThread(DefaultQuery)} method initiates monitoring of a query. {@link
+ * #stopMonitoringQueryThread(DefaultQuery)} stops monitoring a query.
  *
- * If the {@link QueryMonitor} determines a query needs to be canceled: either because it is
- * taking too long, or because memory is running low, it does two things:
+ * If the {@link QueryMonitor} determines a query needs to be canceled: either because it is taking
+ * too long, or because memory is running low, it does two things:
  *
  * <ul>
  * <li>registers an exception on the query via
  * {@link DefaultQuery#setQueryCanceledException(CacheRuntimeException)}</li>
  * <li>sets the {@link DefaultQuery#queryCanceled} thread-local variable to {@code true}
- * so that subsequent calls to {@link #throwExceptionIfQueryOnCurrentThreadIsCanceled()}
- * will throw an exception</li>
+ * so that subsequent calls to {@link #throwExceptionIfQueryOnCurrentThreadIsCanceled()} will throw
+ * an exception</li>
  * </ul>
  *
- * Code outside this class, that wishes to participate in cooperative cancelation of queries
- * calls {@link #throwExceptionIfQueryOnCurrentThreadIsCanceled()} at various yield points.
- * In catch blocks, {@link DefaultQuery#getQueryCanceledException()} is interrogated to learn
- * the cancelation cause.
+ * Code outside this class, that wishes to participate in cooperative cancelation of queries calls
+ * {@link #throwExceptionIfQueryOnCurrentThreadIsCanceled()} at various yield points. In catch
+ * blocks, {@link DefaultQuery#getQueryCanceledException()} is interrogated to learn the cancelation
+ * cause.
  *
  * @since GemFire 6.0
  */
@@ -68,72 +70,59 @@ public class QueryMonitor {
 
   private final long defaultMaxQueryExecutionTime;
 
-  private final ScheduledThreadPoolExecutorFactory executorFactory;
+  private final ScheduledThreadPoolExecutor executor;
 
-  private volatile ScheduledThreadPoolExecutor executor;
+  private static volatile MemoryState memoryState = MemoryStateImpl.HEAP_AVAILABLE;
 
-  private volatile boolean cancelingDueToLowMemory;
-
-  private static volatile Boolean LOW_MEMORY = Boolean.FALSE;
-
-  private static volatile long LOW_MEMORY_USED_BYTES = 0;
-
-  @FunctionalInterface
-  public interface ScheduledThreadPoolExecutorFactory {
-    ScheduledThreadPoolExecutor create();
-  }
+  private static volatile long memoryUsedBytes = 0;
 
   /**
-   * This class will call {@link ScheduledThreadPoolExecutor#setRemoveOnCancelPolicy(boolean)}
-   * on {@link ScheduledThreadPoolExecutor} instances returned by the
-   * {@link ScheduledThreadPoolExecutorFactory} to set that property to {@code true}.
+   * This class will call {@link ScheduledThreadPoolExecutor#setRemoveOnCancelPolicy(boolean)} on
+   * {@code executor} to set that property to {@code true}.
    *
-   * The default behavior of a {@link ScheduledThreadPoolExecutor} is to keep canceled
-   * tasks in the queue, relying on the timeout processing loop to remove them
-   * when their time is up. That behaviour would result in tasks for completed
-   * queries to remain in the queue until their timeout deadline was reached,
-   * resulting in queue growth.
+   * The default behavior of a {@link ScheduledThreadPoolExecutor} is to keep canceled tasks in the
+   * queue, relying on the timeout processing loop to remove them when their time is up. That
+   * behaviour would cause tasks for completed queries to remain in the queue until their
+   * timeout deadline was reached, resulting in queue growth.
    *
-   * Setting the remove-on-cancel-policy to {@code true} changes that behavior so tasks are
-   * removed immediately upon cancelation (via {@link #stopMonitoringQueryThread(DefaultQuery)}).
+   * Setting the remove-on-cancel-policy to {@code true} changes that behavior so tasks are removed
+   * immediately upon cancelation (via {@link #stopMonitoringQueryThread(DefaultQuery)}).
    *
-   * @param executorFactory is called to construct the initial executor. It's called subsequently
-   *        every time the QueryMonitor moves out of the low-memory state, to create a new executor.
+   * @param executor is responsible for processing scheduled cancelation tasks
    * @param cache is interrogated via {@link InternalCache#isQueryMonitorDisabledForLowMemory} at
    *        each low-memory state change
-   * @param defaultMaxQueryExecutionTime is the maximum time, in milliseconds, that any query
-   *        is allowed to run
+   * @param defaultMaxQueryExecutionTime is the maximum time, in milliseconds, that any query is
+   *        allowed to run
    */
-  public QueryMonitor(final ScheduledThreadPoolExecutorFactory executorFactory,
+  public QueryMonitor(final ScheduledThreadPoolExecutor executor,
       final InternalCache cache,
       final long defaultMaxQueryExecutionTime) {
-    Objects.requireNonNull(executorFactory);
+    Objects.requireNonNull(executor);
     Objects.requireNonNull(cache);
 
     this.cache = cache;
     this.defaultMaxQueryExecutionTime = defaultMaxQueryExecutionTime;
 
-    this.executorFactory = executorFactory;
-    this.executor = executorFactory.create();
+    this.executor = executor;
     this.executor.setRemoveOnCancelPolicy(true);
   }
 
   /**
-   * Add query to be monitored.
+   * Start monitoring the query.
    *
-   * Must not be called from a thread that is not the query thread,
-   * because this class uses a ThreadLocal on the query thread!
+   * Must not be called from a thread that is not the query thread, because this class uses a
+   * ThreadLocal on the query thread!
    */
   public void monitorQueryThread(final DefaultQuery query) {
     monitorQueryThread(query, defaultMaxQueryExecutionTime);
   }
 
   /**
-   * Each query can have a different maxQueryExecution time. Make this method public to
-   * expose that feature to callers.
+   * Each query can have a different maxQueryExecution time. Make this method public to expose that
+   * feature to callers.
    *
-   * Must not be called from a thread that is not the query thread,
-   * because this class uses a ThreadLocal on the query thread!
+   * Must not be called from a thread that is not the query thread, because this class uses a
+   * ThreadLocal on the query thread!
    */
   private void monitorQueryThread(final DefaultQuery query,
       final long maxQueryExecutionTime) {
@@ -143,12 +132,6 @@ public class QueryMonitor {
       return;
     }
 
-    if (LOW_MEMORY) {
-      final QueryExecutionLowMemoryException lowMemoryException = createLowMemoryException();
-      query.setQueryCanceledException(lowMemoryException);
-      throw lowMemoryException;
-    }
-
     query.setCancelationTask(scheduleCancelationTask(query, maxQueryExecutionTime));
 
     if (logger.isDebugEnabled()) {
@@ -157,10 +140,10 @@ public class QueryMonitor {
   }
 
   /**
-   * Stops monitoring the query.
+   * Stop monitoring the query.
    *
-   * Must not be called from a thread that is not the query thread,
-   * because this class uses a ThreadLocal on the query thread!
+   * Must not be called from a thread that is not the query thread, because this class uses a
+   * ThreadLocal on the query thread!
    */
   public void stopMonitoringQueryThread(final DefaultQuery query) {
     query.getCancelationTask().ifPresent(task -> task.cancel(false));
@@ -171,9 +154,9 @@ public class QueryMonitor {
   }
 
   /**
-   * Throw an exception if the query has been canceled. The {@link QueryMonitor} cancels the
-   * query if it takes more than the max query execution time or in low memory situations where
-   * critical heap percentage has been set on the resource manager.
+   * Throw an exception if the query has been canceled. The {@link QueryMonitor} cancels the query
+   * if it takes more than the max query execution time or in low memory situations where critical
+   * heap percentage has been set on the resource manager.
    *
    * @throws QueryExecutionCanceledException if the query has been canceled
    */
@@ -190,71 +173,174 @@ public class QueryMonitor {
     executor.shutdownNow();
   }
 
+  public static boolean isLowMemory() {
+    return memoryState.isLowMemory();
+  }
+
+  public static long getMemoryUsedBytes() {
+    return memoryUsedBytes;
+  }
+
   /**
-   * Assumes LOW_MEMORY will only be set if query monitor is enabled
+   * Caller must not call this method concurrently from multiple threads.
+   * In addition to causing data inconsistency, concurrent calls will result in
+   * lost updates e.g. transitions to low-memory status could be missed,
+   * resulting in a failure to cancel queries.
    */
-  public static boolean isLowMemory() {
-    return LOW_MEMORY;
+  public void setLowMemory(final boolean isLowMemory, final long usedBytes) {
+    memoryState.setLowMemory(executor, isLowMemory, usedBytes, cache);
   }
 
-  public static long getMemoryUsedDuringLowMemory() {
-    return LOW_MEMORY_USED_BYTES;
+  /**
+   * This interface plays the role of the "State" interface in the GoF "State" design pattern.
+   * Its implementations embodied in the {@link MemoryStateImpl} enum (an abstract base class,
+   * or ABC) and its enum constants (subclasses of the ABC) play the role of "ConcreteState"
+   * classes in that design pattern.
+   *
+   * The "Context" role is fulfilled by the melange of behavior
+   * and state embodied in the (static) {@link #isLowMemory()} and
+   * {@link #getMemoryUsedBytes()} methods and the {@link #setLowMemory(boolean, long)}
+   * method and the static fields they manipulate.
+   */
+  private interface MemoryState {
+    void setLowMemory(ScheduledThreadPoolExecutor executor,
+        boolean isLowMemory,
+        long usedBytes,
+        InternalCache cache);
+
+    ScheduledFuture<?> schedule(Runnable command,
+        long delay,
+        TimeUnit unit,
+        ScheduledExecutorService scheduledExecutorService,
+        DefaultQuery query);
+
+    boolean isLowMemory();
+
+    CacheRuntimeException createCancelationException(long timeLimitMillis,
+        DefaultQuery query);
   }
 
   /**
-   * Caller should not call this method concurrently from multiple threads. Doing so can
-   * result in lost low memory state updates due to lock unfairness.
+   * This enum (an abstract base class or ABC) and its enum constants (subclasses of the ABC)
+   * play the role of "ConcreteState" classes in the GoF "State" pattern.
+   *
+   * See {@link MemoryState} for details.
    */
-  public synchronized void setLowMemory(final boolean isLowMemory, final long usedBytes) {
-    if (!cache.isQueryMonitorDisabledForLowMemory()) {
-      QueryMonitor.LOW_MEMORY_USED_BYTES = usedBytes;
-      final boolean memoryStateChanged = isLowMemory != QueryMonitor.LOW_MEMORY;
-      if (memoryStateChanged) {
+  private enum MemoryStateImpl implements MemoryState {
+    HEAP_AVAILABLE {
+      @Override
+      public void _setLowMemory(final ScheduledThreadPoolExecutor executor,
+          final boolean isLowMemory,
+          final long usedBytes,
+          final InternalCache cache) {
         if (isLowMemory) {
-          cancelAllQueriesDueToMemory();
-        } else {
+          memoryState = HEAP_EXHAUSTED;
+
           /*
-           * Executor was shut down and made permanently unusable when we went into
-           * the low-memory state. We have to make a new executor now that we're monitoring
-           * queries again.
+           * We need to already be in the HEAP_EXHAUSTED state because we want the
+           * cancelation behavior associated with that state.
            */
-          executor = executorFactory.create();
+          cancelAllQueries(executor);
         }
+        // Otherwise, no state change
       }
-      QueryMonitor.LOW_MEMORY = isLowMemory;
-    }
-  }
 
-  /**
-   * Stop accepting new monitoring requests. Run all cancelation tasks with
-   * {@link #cancelingDueToLowMemory} set. Leave the executor's task queue empty.
-   */
-  private synchronized void cancelAllQueriesDueToMemory() {
+      @Override
+      public boolean isLowMemory() {
+        return false;
+      }
 
-    /*
-     * A cancelation task is dual-purpose. Its primary purpose is to cancel
-     * a query if the query runs too long. Alternately, if this flag
-     * {@link #cancelingDueToLowMemory} is set, the cancelation task will cancel the query
-     * due to low memory.
-     */
-    cancelingDueToLowMemory = true;
-
-    try {
-      /*
-       * It's tempting to try to process the list of tasks returned from shutdownNow().
-       * Unfortunately, that call leaves the executor in a state that causes the task's
-       * run() to cancel the task, instead of actually running it. By calling shutdown()
-       * we block new task additions and put the executor in a state that allows the
-       * task's run() to actually run the task logic.
+      @Override
+      public ScheduledFuture<?> schedule(final Runnable command, final long delay,
+          final TimeUnit unit,
+          final ScheduledExecutorService scheduledExecutorService,
+          final DefaultQuery query) {
+        return scheduledExecutorService.schedule(command, delay, unit);
+      }
+
+      @Override
+      public CacheRuntimeException createCancelationException(final long timeLimitMillis,
+          final DefaultQuery query) {
+        final String message = String.format(
+            "Query execution canceled after exceeding max execution time %sms.",
+            timeLimitMillis);
+        if (logger.isInfoEnabled()) {
+          logger.info(String.format("%s %s", message, query));
+        }
+        return new QueryExecutionTimeoutException(message);
+      }
+
+      /**
+       * Run all cancelation tasks. Leave the executor's task queue empty.
        */
-      executor.shutdown(); // executor won't accept new work ever again
-      final BlockingQueue<Runnable> expirationTaskQueue = executor.getQueue();
-      for (final Runnable cancelationTask : expirationTaskQueue) {
-        cancelationTask.run();
+      private void cancelAllQueries(final ScheduledThreadPoolExecutor executor) {
+        final BlockingQueue<Runnable> expirationTaskQueue = executor.getQueue();
+        for (final Runnable cancelationTask : expirationTaskQueue) {
+          if (expirationTaskQueue.remove(cancelationTask)) {
+            cancelationTask.run();
+          }
+        }
+      }
+
+    },
+    HEAP_EXHAUSTED {
+      @Override
+      public void _setLowMemory(final ScheduledThreadPoolExecutor executor,
+          final boolean isLowMemory,
+          final long usedBytes,
+          final InternalCache cache) {
+        if (!isLowMemory) {
+          memoryState = HEAP_AVAILABLE;
+        }
+        // Otherwise, no state change
+      }
+
+      @Override
+      public boolean isLowMemory() {
+        return true;
+      }
+
+      @Override
+      public ScheduledFuture<?> schedule(final Runnable command, final long timeLimitMillis,
+          final TimeUnit unit,
+          final ScheduledExecutorService scheduledExecutorService,
+          final DefaultQuery query) {
+        final CacheRuntimeException lowMemoryException =
+            createCancelationException(timeLimitMillis, query);
+        query.setQueryCanceledException(lowMemoryException);
+        throw lowMemoryException;
       }
-      expirationTaskQueue.clear();
-    } finally {
-      cancelingDueToLowMemory = false;
+
+      @Override
+      public CacheRuntimeException createCancelationException(final long timeLimitMillis,
+          final DefaultQuery query) {
+        return new QueryExecutionLowMemoryException(
+            String.format(
+                "Query execution canceled due to memory threshold crossed in system, memory used: %s bytes.",
+                memoryUsedBytes));
+      }
+
+    };
+
+    @Override
+    public void setLowMemory(final ScheduledThreadPoolExecutor executor,
+        final boolean isLowMemory,
+        final long usedBytes,
+        final InternalCache cache) {
+      if (cache.isQueryMonitorDisabledForLowMemory()) {
+        return;
+      }
+
+      memoryUsedBytes = usedBytes;
+
+      _setLowMemory(executor, isLowMemory, usedBytes, cache);
+    }
+
+    void _setLowMemory(final ScheduledThreadPoolExecutor executor,
+        final boolean isLowMemory,
+        final long usedBytes,
+        final InternalCache cache) {
+      throw new IllegalStateException("subclass must override");
     }
 
   }
@@ -266,31 +352,26 @@ public class QueryMonitor {
     final AtomicBoolean queryCanceledThreadLocal =
         DefaultQuery.queryCanceled.get();
 
-    return executor.schedule(() -> {
-      final CacheRuntimeException exception = cancelingDueToLowMemory ? createLowMemoryException()
-          : createExpirationException(timeLimitMillis);
+    /*
+     * This is where the GoF "State" design pattern comes home to roost.
+     *
+     * memoryState.schedule() is going to either schedule or throw an exception depending on what
+     * state we are _currently_ in. Remember the switching of that state (reference) happens
+     * in a separate thread, up in the setLowMemory() method, generally called by the
+     * HeapMemoryMonitor.
+     *
+     * The first line of the lambda/closure, when it _eventually_ runs (in yet another thread--
+     * a thread from the executor), will access what is _then_ the current state, through
+     * memoryState, to createCancelationException().
+     */
+    return memoryState.schedule(() -> {
+      final CacheRuntimeException exception = memoryState
+          .createCancelationException(timeLimitMillis, query);
 
       query.setQueryCanceledException(exception);
       queryCanceledThreadLocal.set(true);
 
-      if (logger.isInfoEnabled() && !cancelingDueToLowMemory) {
-        logger.info(String.format("%s %s", exception.getMessage(), query));
-      }
-    }, timeLimitMillis, TimeUnit.MILLISECONDS);
-  }
-
-  private QueryExecutionTimeoutException createExpirationException(final long timeLimitMillis) {
-    return new QueryExecutionTimeoutException(
-        String.format(
-            "Query execution canceled after exceeding max execution time %sms.",
-            timeLimitMillis));
-  }
-
-  private QueryExecutionLowMemoryException createLowMemoryException() {
-    return new QueryExecutionLowMemoryException(
-        String.format(
-            "Query execution canceled due to memory threshold crossed in system, memory used: %s bytes.",
-            LOW_MEMORY_USED_BYTES));
+    }, timeLimitMillis, TimeUnit.MILLISECONDS, executor, query);
   }
 
   private void logDebug(final DefaultQuery query, final String message) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 069adc1..e4fa2c3 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -4468,7 +4468,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
           }
 
           this.queryMonitor =
-              new QueryMonitor(() -> (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(
+              new QueryMonitor((ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(
                   QUERY_MONITOR_THREAD_POOL_SIZE,
                   (runnable) -> new LoggingThread("QueryMonitor Thread", runnable)),
                   this,
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
index da999e1..21f1c9a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/QueryMessage.java
@@ -97,7 +97,7 @@ public class QueryMessage extends StreamingPartitionOperation.StreamingPartition
     if (QueryMonitor.isLowMemory()) {
       String reason = String.format(
           "Query execution canceled due to memory threshold crossed in system, memory used: %s bytes.",
-          QueryMonitor.getMemoryUsedDuringLowMemory());
+          QueryMonitor.getMemoryUsedBytes());
       throw new QueryExecutionLowMemoryException(reason);
     }
     if (Thread.interrupted()) {
@@ -162,7 +162,7 @@ public class QueryMessage extends StreamingPartitionOperation.StreamingPartition
     if (QueryMonitor.isLowMemory()) {
       String reason = String.format(
           "Query execution canceled due to memory threshold crossed in system, memory used: %s bytes.",
-          QueryMonitor.getMemoryUsedDuringLowMemory());
+          QueryMonitor.getMemoryUsedBytes());
       // throw query exception to piggyback on existing error handling as qp.executeQuery also
       // throws the same error for low memory
       throw new QueryExecutionLowMemoryException(reason);
@@ -246,7 +246,7 @@ public class QueryMessage extends StreamingPartitionOperation.StreamingPartition
       if (QueryMonitor.isLowMemory()) {
         String reason = String.format(
             "Query execution canceled due to memory threshold crossed in system, memory used: %s bytes.",
-            QueryMonitor.getMemoryUsedDuringLowMemory());
+            QueryMonitor.getMemoryUsedBytes());
         throw new QueryExecutionLowMemoryException(reason);
       } else if (query.isCanceled()) {
         throw query.getQueryCanceledException();
diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java
index 1681509..36716b9 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java
@@ -55,7 +55,7 @@ public class QueryMonitorTest {
   public void setUp() {
     scheduledThreadPoolExecutor = mock(ScheduledThreadPoolExecutor.class);
     when(scheduledThreadPoolExecutor.getQueue()).thenReturn(new ArrayBlockingQueue<>(1));
-    monitor = new QueryMonitor(() -> scheduledThreadPoolExecutor, mock(InternalCache.class),
+    monitor = new QueryMonitor(scheduledThreadPoolExecutor, mock(InternalCache.class),
         max_execution_time);
     captor = ArgumentCaptor.forClass(Runnable.class);
   }
@@ -105,12 +105,6 @@ public class QueryMonitorTest {
   }
 
   @Test
-  public void setLowMemoryTrueShutsDownExecutor() {
-    monitor.setLowMemory(true, 1);
-    Mockito.verify(scheduledThreadPoolExecutor, times(1)).shutdown();
-  }
-
-  @Test
   public void setLowMemoryTrueThenFalseAllowsSubsequentMonitoring() {
     monitor.setLowMemory(true, 1);
     monitor.setLowMemory(false, 1);