You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ed...@apache.org on 2023/06/23 20:28:59 UTC

[accumulo] branch main updated: fix low memory detector and test updates (#3508)

This is an automated email from the ASF dual-hosted git repository.

edcoleman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new e1320df709 fix low memory detector and test updates (#3508)
e1320df709 is described below

commit e1320df709630be844bddd9fd0d8c6045bc5111e
Author: EdColeman <de...@etcoleman.com>
AuthorDate: Fri Jun 23 20:28:53 2023 +0000

    fix low memory detector and test updates (#3508)
    
    * fix low memory detector and test updates
    
    Co-authored-by: Dave Marion <dl...@apache.org>
---
 .../accumulo/server/mem/LowMemoryDetector.java     |  79 ++++-----
 .../test/functional/MemoryFreeingIterator.java     |  30 ++--
 .../test/functional/MemoryStarvedMajCIT.java       |   8 +-
 .../test/functional/MemoryStarvedMinCIT.java       |   2 +-
 .../test/functional/MemoryStarvedScanIT.java       | 192 ++++++++++++++++-----
 5 files changed, 203 insertions(+), 108 deletions(-)

diff --git a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
index e08eed772b..fb18c7c682 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
@@ -36,20 +36,21 @@ import org.slf4j.LoggerFactory;
 
 public class LowMemoryDetector {
 
+  private static final Logger LOG = LoggerFactory.getLogger(LowMemoryDetector.class);
+
   @FunctionalInterface
-  public static interface Action {
+  public interface Action {
     void execute();
   }
 
   public enum DetectionScope {
     MINC, MAJC, SCAN
-  };
-
-  private static final Logger log = LoggerFactory.getLogger(LowMemoryDetector.class);
+  }
 
   private final HashMap<String,Long> prevGcTime = new HashMap<>();
+
   private long lastMemorySize = 0;
-  private long gcTimeIncreasedCount = 0;
+  private int lowMemCount = 0;
   private long lastMemoryCheckTime = 0;
   private final Lock memCheckTimeLock = new ReentrantLock();
   private volatile boolean runningLowOnMemory = false;
@@ -65,15 +66,15 @@ public class LowMemoryDetector {
   /**
    * @param context server context
    * @param scope whether this is being checked in the context of scan or compact code
-   * @param isUserTable boolean as to whether the table being scanned / compacted is a user table.
-   *        No action is taken for system tables.
+   * @param isUserTable boolean set true if the table being scanned / compacted is a user table. No
+   *        action is taken for system tables.
    * @param action Action to perform when this method returns true
    * @return true if server running low on memory
    */
   public boolean isRunningLowOnMemory(ServerContext context, DetectionScope scope,
       Supplier<Boolean> isUserTable, Action action) {
     if (isUserTable.get()) {
-      Property p = null;
+      Property p;
       switch (scope) {
         case SCAN:
           p = Property.GENERAL_LOW_MEM_SCAN_PROTECTION;
@@ -99,21 +100,19 @@ public class LowMemoryDetector {
 
   public void logGCInfo(AccumuloConfiguration conf) {
 
-    Double freeMemoryPercentage = conf.getFraction(Property.GENERAL_LOW_MEM_DETECTOR_THRESHOLD);
+    double freeMemoryPercentage = conf.getFraction(Property.GENERAL_LOW_MEM_DETECTOR_THRESHOLD);
 
     memCheckTimeLock.lock();
     try {
       final long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
 
       List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
-      Runtime rt = Runtime.getRuntime();
 
       StringBuilder sb = new StringBuilder("gc");
 
       boolean sawChange = false;
 
       long maxIncreaseInCollectionTime = 0;
-
       for (GarbageCollectorMXBean gcBean : gcmBeans) {
         Long prevTime = prevGcTime.get(gcBean.getName());
         long pt = 0;
@@ -135,48 +134,50 @@ public class LowMemoryDetector {
         prevGcTime.put(gcBean.getName(), time);
       }
 
-      long mem = rt.freeMemory();
-      if (maxIncreaseInCollectionTime == 0) {
-        gcTimeIncreasedCount = 0;
-      } else {
-        gcTimeIncreasedCount++;
-        if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * freeMemoryPercentage) {
+      Runtime rt = Runtime.getRuntime();
+      final long maxConfiguredMemory = rt.maxMemory();
+      final long allocatedMemory = rt.totalMemory();
+      final long allocatedFreeMemory = rt.freeMemory();
+      final long freeMemory = maxConfiguredMemory - (allocatedMemory - allocatedFreeMemory);
+      final long lowMemoryThreshold = (long) (maxConfiguredMemory * freeMemoryPercentage);
+      LOG.trace("Memory info: max={}, allocated={}, free={}, free threshold={}",
+          maxConfiguredMemory, allocatedMemory, freeMemory, lowMemoryThreshold);
+
+      if (freeMemory < lowMemoryThreshold) {
+        lowMemCount++;
+        if (lowMemCount > 3 && !runningLowOnMemory) {
           runningLowOnMemory = true;
-          log.warn("Running low on memory");
-          gcTimeIncreasedCount = 0;
+          LOG.warn("Running low on memory: max={}, allocated={}, free={}, free threshold={}",
+              maxConfiguredMemory, allocatedMemory, freeMemory, lowMemoryThreshold);
+        }
+      } else {
+        // If we were running low on memory, but are not any longer, than log at warn
+        // so that it shows up in the logs
+        if (runningLowOnMemory) {
+          LOG.warn("Recovered from low memory condition");
         } else {
-          // If we were running low on memory, but are not any longer, than log at warn
-          // so that it shows up in the logs
-          if (runningLowOnMemory) {
-            log.warn("Recovered from low memory condition");
-          } else {
-            log.trace("Not running low on memory");
-          }
-          runningLowOnMemory = false;
+          LOG.trace("Not running low on memory");
         }
+        runningLowOnMemory = false;
+        lowMemCount = 0;
       }
 
-      if (mem != lastMemorySize) {
+      if (freeMemory != lastMemorySize) {
         sawChange = true;
       }
 
-      String sign = "+";
-      if (mem - lastMemorySize <= 0) {
-        sign = "";
-      }
-
-      sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, (mem - lastMemorySize),
-          rt.totalMemory()));
+      sb.append(String.format(" freemem=%,d(%+,d) totalmem=%,d", freeMemory,
+          (freeMemory - lastMemorySize), rt.totalMemory()));
 
       if (sawChange) {
-        log.debug(sb.toString());
+        LOG.debug(sb.toString());
       }
 
       final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
       if (lastMemoryCheckTime > 0 && lastMemoryCheckTime < now) {
         final long diff = now - lastMemoryCheckTime;
         if (diff > keepAliveTimeout + 1000) {
-          log.warn(String.format(
+          LOG.warn(String.format(
               "GC pause checker not called in a timely"
                   + " fashion. Expected every %.1f seconds but was %.1f seconds since last check",
               keepAliveTimeout / 1000., diff / 1000.));
@@ -186,10 +187,10 @@ public class LowMemoryDetector {
       }
 
       if (maxIncreaseInCollectionTime > keepAliveTimeout) {
-        Halt.halt("Garbage collection may be interfering with lock keep-alive.  Halting.", -1);
+        Halt.halt("Garbage collection may be interfering with lock keep-alive. Halting.", -1);
       }
 
-      lastMemorySize = mem;
+      lastMemorySize = freeMemory;
       lastMemoryCheckTime = now;
     } finally {
       memCheckTimeLock.unlock();
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java
index 2aa86e9ddd..95e7e2e7a4 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java
@@ -20,31 +20,25 @@ package org.apache.accumulo.test.functional;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
 
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 
 public class MemoryFreeingIterator extends WrappingIterator {
 
-  @Override
-  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
-      IteratorEnvironment env) throws IOException {
-    super.init(source, options, env);
+  private static final Logger LOG = LoggerFactory.getLogger(MemoryFreeingIterator.class);
+
+  @SuppressFBWarnings(value = "DM_GC", justification = "gc is okay for test")
+  public MemoryFreeingIterator() throws InterruptedException {
+    LOG.info("Try to free consumed memory - will block until isRunningLowOnMemory returns false.");
     MemoryConsumingIterator.freeBuffers();
     while (this.isRunningLowOnMemory()) {
+      System.gc();
       // wait for LowMemoryDetector to recognize the memory is free.
-      try {
-        Thread.sleep(SECONDS.toMillis(1));
-      } catch (InterruptedException ex) {
-        Thread.currentThread().interrupt();
-        throw new IOException("wait for low memory detector interrupted", ex);
-      }
+      Thread.sleep(SECONDS.toMillis(1));
     }
+    LOG.info("isRunningLowOnMemory returned false - memory available");
   }
-
 }
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
index e4ded303e6..8c9d0f7db3 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
@@ -18,6 +18,7 @@
  */
 package org.apache.accumulo.test.functional;
 
+import static org.apache.accumulo.test.util.Wait.waitFor;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -148,12 +149,9 @@ public class MemoryStarvedMajCIT extends SharedMiniClusterBase {
         ReadWriteIT.ingest(client, 100, 100, 100, 0, table);
         compactionThread.start();
 
-        while (paused <= 0) {
-          Thread.sleep(1000);
-          paused = MAJC_PAUSED.intValue();
-        }
+        assertTrue(waitFor(() -> MAJC_PAUSED.intValue() > 0));
 
-        MemoryStarvedScanIT.freeServerMemory(client, table);
+        MemoryStarvedScanIT.freeServerMemory(client);
         compactionThread.interrupt();
         compactionThread.join();
         assertNull(error.get());
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java
index 6418d98ef4..0a001047cc 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java
@@ -153,7 +153,7 @@ public class MemoryStarvedMinCIT extends SharedMiniClusterBase {
           paused = MINC_PAUSED.intValue();
         }
 
-        MemoryStarvedScanIT.freeServerMemory(client, table);
+        MemoryStarvedScanIT.freeServerMemory(client);
         ingestThread.interrupt();
         ingestThread.join();
         assertNull(error.get());
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
index c0036d358d..607a59ece1 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.test.functional;
 
 import static org.apache.accumulo.core.metrics.MetricsProducer.METRICS_LOW_MEMORY;
+import static org.apache.accumulo.test.util.Wait.waitFor;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -40,6 +41,7 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.WrappingIterator;
 import org.apache.accumulo.core.metrics.MetricsProducer;
 import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
@@ -54,6 +56,8 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MemoryStarvedScanIT extends SharedMiniClusterBase {
 
@@ -83,6 +87,7 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
 
   public static final double FREE_MEMORY_THRESHOLD = 0.20D;
 
+  private static final Logger LOG = LoggerFactory.getLogger(MemoryStarvedScanIT.class);
   private static final DoubleAdder SCAN_START_DELAYED = new DoubleAdder();
   private static final DoubleAdder SCAN_RETURNED_EARLY = new DoubleAdder();
   private static final AtomicInteger LOW_MEM_DETECTED = new AtomicInteger(0);
@@ -132,11 +137,16 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
   }
 
   @BeforeEach
-  public void beforeEach() {
+  public void beforeEach() throws Exception {
+    // Free the server side memory
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      freeServerMemory(client);
+    }
+    // allow metric collection to cycle and metric value to reset to zero
+    waitFor(() -> 0 == LOW_MEM_DETECTED.get());
     // Reset the client side counters
     SCAN_START_DELAYED.reset();
     SCAN_START_DELAYED.reset();
-    LOW_MEM_DETECTED.set(0);
   }
 
   static void consumeServerMemory(Scanner scanner) {
@@ -162,13 +172,11 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
     assertTrue(iter.hasNext());
   }
 
-  static void freeServerMemory(AccumuloClient client, String table) throws Exception {
-    try (Scanner scanner = client.createScanner(table)) {
-      scanner.addScanIterator(new IteratorSetting(11, MemoryFreeingIterator.class, Map.of()));
-      @SuppressWarnings("unused")
-      Iterator<Entry<Key,Value>> iter = scanner.iterator(); // init'ing the iterator should be
-                                                            // enough to free the memory
-    }
+  static void freeServerMemory(AccumuloClient client) throws Exception {
+    // Instantiating this class on the TabletServer will free the memory as it
+    // frees the buffers created by the MemoryConsumingIterator in its constructor.
+    client.instanceOperations().testClassLoad(MemoryFreeingIterator.class.getName(),
+        WrappingIterator.class.getName());
   }
 
   @Test
@@ -183,19 +191,16 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
       ReadWriteIT.ingest(client, 10, 10, 10, 0, table);
 
       try (Scanner scanner = client.createScanner(table)) {
-        double returned = SCAN_RETURNED_EARLY.doubleValue();
-        double paused = SCAN_START_DELAYED.doubleValue();
+        final double returned = SCAN_RETURNED_EARLY.doubleValue();
+        final double paused = SCAN_START_DELAYED.doubleValue();
 
         consumeServerMemory(scanner);
 
-        // Wait for longer than the memory check interval
-        Thread.sleep(6_000);
+        // Wait for The metric that indicates a scan was returned early due to low memory
+        waitFor(() -> SCAN_RETURNED_EARLY.doubleValue() > returned
+            && SCAN_START_DELAYED.doubleValue() >= paused);
 
-        // The metric that indicates a scan was returned early due to low memory should
-        // have been incremented.
-        assertTrue(SCAN_RETURNED_EARLY.doubleValue() > returned);
-        assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
-        freeServerMemory(client, table);
+        freeServerMemory(client);
       } finally {
         to.delete(table);
       }
@@ -236,6 +241,7 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
         memoryConsumingScanner.setReadaheadThreshold(Long.MAX_VALUE);
 
         t.start();
+        LOG.info("Waiting for memory to be consumed");
 
         // Wait until the dataConsumingScanner has started fetching data
         int currentCount = fetched.get();
@@ -252,30 +258,32 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
         // Confirm that some data was fetched by the memoryConsumingScanner
         currentCount = fetched.get();
         assertTrue(currentCount > 0 && currentCount < 100);
+        LOG.info("Memory consumed");
 
         // Grab the current metric counts, wait
-        double returned = SCAN_RETURNED_EARLY.doubleValue();
-        double paused = SCAN_START_DELAYED.doubleValue();
+        final double returned = SCAN_RETURNED_EARLY.doubleValue();
+        final double paused = SCAN_START_DELAYED.doubleValue();
         Thread.sleep(1500);
         // One of two conditions could exist here:
         // The number of fetched rows equals the current count before the wait above
         // and the SCAN_START_DELAYED has been incremented OR the number of fetched
         // rows is one more than the current count and the SCAN_RETURNED_EARLY has
         // been incremented.
-        assertTrue((currentCount == fetched.get() && SCAN_START_DELAYED.doubleValue() > paused)
-            || (currentCount + 1 == fetched.get() && SCAN_RETURNED_EARLY.doubleValue() > returned));
+        final int currentCountCopy = currentCount;
+        waitFor(
+            () -> (currentCountCopy == fetched.get() && SCAN_START_DELAYED.doubleValue() > paused)
+                || (currentCountCopy + 1 == fetched.get()
+                    && SCAN_RETURNED_EARLY.doubleValue() > returned));
         currentCount = fetched.get();
 
         // Perform the check again
-        paused = SCAN_START_DELAYED.doubleValue();
-        returned = SCAN_RETURNED_EARLY.doubleValue();
-        Thread.sleep(1500);
+        assertTrue(waitFor(() -> 1 == LOW_MEM_DETECTED.get()));
         assertEquals(currentCount, fetched.get());
 
-        assertEquals(1, LOW_MEM_DETECTED.get());
-
         // Free the memory which will allow the pausing scanner to continue
-        freeServerMemory(client, table);
+        LOG.info("Freeing memory");
+        freeServerMemory(client);
+        LOG.info("Memory freed");
 
         t.join();
         assertEquals(30, fetched.get());
@@ -301,20 +309,18 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
 
       try (BatchScanner scanner = client.createBatchScanner(table,
           client.securityOperations().getUserAuthorizations(client.whoami()), 1)) {
-        double returned = SCAN_RETURNED_EARLY.doubleValue();
-        double paused = SCAN_START_DELAYED.doubleValue();
+
+        final double returned = SCAN_RETURNED_EARLY.doubleValue();
+        final double paused = SCAN_START_DELAYED.doubleValue();
 
         consumeServerMemory(scanner);
 
-        // Wait for longer than the memory check interval
-        Thread.sleep(6000);
+        // Wait for metric that indicates a scan was returned early due to low memory
+        assertTrue(waitFor(() -> SCAN_RETURNED_EARLY.doubleValue() > returned
+            && SCAN_START_DELAYED.doubleValue() >= paused));
+        assertTrue(waitFor(() -> 1 == LOW_MEM_DETECTED.get()));
 
-        // The metric that indicates a scan was returned early due to low memory should
-        // have been incremented.
-        assertTrue(SCAN_RETURNED_EARLY.doubleValue() > returned);
-        assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
-        assertEquals(1, LOW_MEM_DETECTED.get());
-        freeServerMemory(client, table);
+        freeServerMemory(client);
       } finally {
         to.delete(table);
       }
@@ -324,6 +330,96 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
   @Test
   public void testBatchScanPauses() throws Exception {
 
+    String table = getUniqueNames(1)[0];
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+
+      TableOperations to = client.tableOperations();
+      to.create(table);
+
+      // check memory okay before starting
+      assertEquals(0, LOW_MEM_DETECTED.get());
+
+      ReadWriteIT.ingest(client, 10, 3, 10, 0, table);
+
+      try (BatchScanner dataConsumingScanner = client.createBatchScanner(table);
+          Scanner memoryConsumingScanner = client.createScanner(table)) {
+
+        dataConsumingScanner.addScanIterator(
+            new IteratorSetting(11, SlowIterator.class, Map.of("sleepTime", "500")));
+        dataConsumingScanner.setRanges(Collections.singletonList(new Range()));
+        Iterator<Entry<Key,Value>> iter = dataConsumingScanner.iterator();
+        AtomicInteger fetched = new AtomicInteger(0);
+        Thread t = new Thread(() -> {
+          int i = 0;
+          while (iter.hasNext()) {
+            iter.next();
+            fetched.set(++i);
+          }
+        });
+
+        memoryConsumingScanner
+            .addScanIterator(new IteratorSetting(11, MemoryConsumingIterator.class, Map.of()));
+        memoryConsumingScanner.setBatchSize(1);
+        memoryConsumingScanner.setReadaheadThreshold(Long.MAX_VALUE);
+
+        t.start();
+
+        // Wait until the dataConsumingScanner has started fetching data
+        int currentCount = fetched.get();
+        while (currentCount == 0) {
+          Thread.sleep(500);
+          currentCount = fetched.get();
+        }
+
+        // This should block until the GarbageCollectionLogger runs and notices that the
+        // VM is low on memory.
+        Iterator<Entry<Key,Value>> consumingIter = memoryConsumingScanner.iterator();
+        assertTrue(consumingIter.hasNext());
+
+        // Confirm that some data was fetched by the dataConsumingScanner
+        currentCount = fetched.get();
+        assertTrue(currentCount > 0 && currentCount < 100);
+
+        // Grab the current paused count, wait two seconds and then confirm that
+        // the number of rows fetched by the memoryConsumingScanner has not increased
+        // and that the scan delay counter has increased.
+        final double returned = SCAN_RETURNED_EARLY.doubleValue();
+        final double paused = SCAN_START_DELAYED.doubleValue();
+
+        final int currentCountCopy = currentCount;
+        waitFor(
+            () -> (currentCountCopy == fetched.get() && SCAN_START_DELAYED.doubleValue() > paused)
+                || (currentCountCopy + 1 == fetched.get()
+                    && SCAN_RETURNED_EARLY.doubleValue() > returned));
+        waitFor(() -> 1 == LOW_MEM_DETECTED.get());
+
+        // Perform the check again
+        final double paused2 = SCAN_START_DELAYED.doubleValue();
+        final double returned2 = SCAN_RETURNED_EARLY.doubleValue();
+        Thread.sleep(1500);
+        assertEquals(currentCount, fetched.get());
+        assertTrue(SCAN_START_DELAYED.doubleValue() >= paused2);
+        assertEquals(returned2, SCAN_RETURNED_EARLY.doubleValue());
+        waitFor(() -> 1 == LOW_MEM_DETECTED.get());
+
+        // Free the memory which will allow the pausing scanner to continue
+        freeServerMemory(client);
+
+        t.join();
+        assertEquals(30, fetched.get());
+
+      } finally {
+        to.delete(table);
+      }
+    }
+  }
+
+  /**
+   * Check that the low memory condition is set and remains set until free memory is available.
+   */
+  @Test
+  public void testLowMemoryFlapping() throws Exception {
+
     String table = getUniqueNames(1)[0];
     try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
 
@@ -383,7 +479,7 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
         assertEquals(currentCount, fetched.get());
         assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
         assertTrue(SCAN_RETURNED_EARLY.doubleValue() >= returned);
-        assertEquals(1, LOW_MEM_DETECTED.get());
+        assertTrue(waitFor(() -> LOW_MEM_DETECTED.get() == 1));
 
         // Perform the check again
         paused = SCAN_START_DELAYED.doubleValue();
@@ -392,21 +488,27 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
         assertEquals(currentCount, fetched.get());
         assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
         assertEquals(returned, SCAN_RETURNED_EARLY.doubleValue());
-        assertEquals(1, LOW_MEM_DETECTED.get());
 
+        // check across multiple low memory checks and metric updates that low memory detected
+        // remains set
+        int checkCount = 0;
+        while (checkCount++ < 5) {
+          Thread.sleep(5_000);
+          LOG.debug("Check low memory still set. Low Memory Flag: {}, Check count: {}",
+              LOW_MEM_DETECTED.get(), checkCount);
+          assertEquals(1, LOW_MEM_DETECTED.get());
+        }
         // Free the memory which will allow the pausing scanner to continue
-        freeServerMemory(client, table);
+        freeServerMemory(client);
 
         t.join();
         assertEquals(30, fetched.get());
-        // allow metic collection to cycle.
-        Thread.sleep(6_000);
-        assertEquals(0, LOW_MEM_DETECTED.get());
+        // allow metric collection to cycle.
+        assertTrue(waitFor(() -> LOW_MEM_DETECTED.get() == 0));
 
       } finally {
         to.delete(table);
       }
     }
   }
-
 }