You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2023/02/16 18:28:37 UTC

[accumulo] branch main updated: Rename GarbageCollectionLogger, optionally pause scans/compactions until memory usage is lower (#3161)

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d5e593373 Rename GarbageCollectionLogger, optionally pause scans/compactions until memory usage is lower (#3161)
6d5e593373 is described below

commit 6d5e5933734f74bb4a3c607a34df32fd93cb8916
Author: Dave Marion <dl...@apache.org>
AuthorDate: Thu Feb 16 13:28:32 2023 -0500

    Rename GarbageCollectionLogger, optionally pause scans/compactions until memory usage is lower (#3161)
    
    Renamed the GarbageCollectionLogger to LowMemoryDetector and added
    properties to configure its interval and memory threshold. The LowMemoryDetector
    is passive and will log when free memory is low (default less than 5%). Three
    new properties were added that will influence how scan, minc, and majc behave
    when the LowMemoryDetector determines that the server is running low on memory.
    Scans will either pause at the start of retrieving the next batch of results
    or return the current batch of results early and minor/major compactions will
    pause until the low memory condition no longer exists. Server-side user
    iterators can check for low memory conditions by subclassing WrappingIterator
    and calling isRunningLowOnMemory(). Four new metrics where added to indicate
    when scans were paused or returned early, or when minor or major compactions
    were paused.
---
 .../core/client/admin/ActiveCompaction.java        |   6 +
 .../core/clientImpl/ActiveCompactionImpl.java      |   5 +
 .../org/apache/accumulo/core/conf/Property.java    |  24 +-
 .../core/iterators/IteratorEnvironment.java        |  10 +
 .../core/iterators/SortedKeyValueIterator.java     |  13 +
 .../accumulo/core/iterators/WrappingIterator.java  |  11 +-
 .../accumulo/core/metrics/MetricsProducer.java     |  32 ++
 .../core/tabletserver/thrift/ActiveCompaction.java | 104 +++++-
 core/src/main/thrift/tabletserver.thrift           |   1 +
 .../org/apache/accumulo/server/AbstractServer.java |   9 +
 .../org/apache/accumulo/server/ServerContext.java  |   7 +
 .../accumulo/server/compaction/CompactionInfo.java |   8 +-
 .../server/compaction/CompactionStats.java         |  13 +-
 .../accumulo/server/compaction/FileCompactor.java  |  45 ++-
 ...tionStats.java => PausedCompactionMetrics.java} |  47 +--
 .../iterators/SystemIteratorEnvironment.java       |   5 +
 .../LowMemoryDetector.java}                        |  83 ++++-
 .../coordinator/CompactionCoordinator.java         |  11 -
 .../coordinator/CompactionCoordinatorTest.java     |   3 -
 .../org/apache/accumulo/compactor/Compactor.java   |  30 +-
 .../apache/accumulo/compactor/CompactorTest.java   |   8 +-
 .../org/apache/accumulo/tserver/ScanServer.java    |  18 +-
 .../accumulo/tserver/TabletClientHandler.java      |   4 +-
 .../accumulo/tserver/TabletHostingServer.java      |   6 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |  29 +-
 .../tserver/compactions/CompactionManager.java     |   1 +
 .../tserver/metrics/TabletServerScanMetrics.java   |  16 +
 .../accumulo/tserver/tablet/CompactableUtils.java  |   2 +-
 .../accumulo/tserver/tablet/MinorCompactor.java    |   4 +-
 .../org/apache/accumulo/tserver/tablet/Tablet.java |   5 +
 .../apache/accumulo/tserver/tablet/TabletBase.java |  60 +++-
 .../test/functional/MemoryConsumingIterator.java   |  91 +++++
 .../test/functional/MemoryFreeingIterator.java     |  23 +-
 .../test/functional/MemoryStarvedMajCIT.java       | 167 +++++++++
 .../test/functional/MemoryStarvedMinCIT.java       | 168 +++++++++
 .../test/functional/MemoryStarvedScanIT.java       | 388 +++++++++++++++++++++
 .../apache/accumulo/test/metrics/MetricsIT.java    |   3 +-
 37 files changed, 1328 insertions(+), 132 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveCompaction.java b/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveCompaction.java
index 28965f7267..3c804fac41 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveCompaction.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveCompaction.java
@@ -119,6 +119,12 @@ public abstract class ActiveCompaction {
    */
   public abstract long getEntriesWritten();
 
+  /**
+   * @return the number of times the server paused a compaction
+   * @since 3.0.0
+   */
+  public abstract long getPausedCount();
+
   /**
    * @return the per compaction iterators configured
    */
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java
index 46c0c1014c..ca36f3df7e 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java
@@ -102,6 +102,11 @@ public class ActiveCompactionImpl extends ActiveCompaction {
     return tac.getEntriesWritten();
   }
 
+  @Override
+  public long getPausedCount() {
+    return tac.getTimesPaused();
+  }
+
   @Override
   public List<IteratorSetting> getIterators() {
     ArrayList<IteratorSetting> ret = new ArrayList<>();
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 93fad5e98e..19d22d84e6 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -275,6 +275,29 @@ public enum Property {
   GENERAL_DELEGATION_TOKEN_UPDATE_INTERVAL("general.delegation.token.update.interval", "1d",
       PropertyType.TIMEDURATION, "The length of time between generation of new secret keys",
       "1.7.0"),
+  GENERAL_LOW_MEM_DETECTOR_INTERVAL("general.low.mem.detector.interval", "5s",
+      PropertyType.TIMEDURATION, "The time interval between low memory checks", "3.0.0"),
+  GENERAL_LOW_MEM_DETECTOR_THRESHOLD("general.low.mem.detector.threshold", "0.05",
+      PropertyType.FRACTION,
+      "The LowMemoryDetector will report when free memory drops below this percentage of total memory",
+      "3.0.0"),
+  GENERAL_LOW_MEM_SCAN_PROTECTION("general.low.mem.protection.scan", "false", PropertyType.BOOLEAN,
+      "Scans may be paused or return results early when the server "
+          + "is low on memory and this property is set to true. Enabling this property will incur a slight "
+          + "scan performance penalty when the server is not low on memory",
+      "3.0.0"),
+  GENERAL_LOW_MEM_MINC_PROTECTION("general.low.mem.protection.compaction.minc", "false",
+      PropertyType.BOOLEAN,
+      "Minor compactions may be paused when the server "
+          + "is low on memory and this property is set to true. Enabling this property will incur a slight "
+          + "compaction performance penalty when the server is not low on memory",
+      "3.0.0"),
+  GENERAL_LOW_MEM_MAJC_PROTECTION("general.low.mem.protection.compaction.majc", "false",
+      PropertyType.BOOLEAN,
+      "Major compactions may be paused when the server "
+          + "is low on memory and this property is set to true. Enabling this property will incur a slight "
+          + "compaction performance penalty when the server is not low on memory",
+      "3.0.0"),
   GENERAL_MAX_SCANNER_RETRY_PERIOD("general.max.scanner.retry.period", "5s",
       PropertyType.TIMEDURATION,
       "The maximum amount of time that a Scanner should wait before retrying a failed RPC",
@@ -810,7 +833,6 @@ public enum Property {
           + " The resources that are used by default can be seen in"
           + " accumulo/server/monitor/src/main/resources/templates/default.ftl",
       "2.0.0"),
-
   // per table properties
   TABLE_PREFIX("table.", null, PropertyType.PREFIX,
       "Properties in this category affect tablet server treatment of tablets,"
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java
index 372a0e49a3..9a78ddf35d 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java
@@ -187,4 +187,14 @@ public interface IteratorEnvironment {
   default TableId getTableId() {
     throw new UnsupportedOperationException();
   }
+
+  /**
+   * Return whether or not the server is running low on memory
+   *
+   * @return true if server is running low on memory
+   * @since 3.0.0
+   */
+  default boolean isRunningLowOnMemory() {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java
index be9bfe0653..30efd5f07a 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java
@@ -149,4 +149,17 @@ public interface SortedKeyValueIterator<K extends WritableComparable<?>,V extend
    * @exception UnsupportedOperationException if not supported.
    */
   SortedKeyValueIterator<K,V> deepCopy(IteratorEnvironment env);
+
+  /**
+   * Returns true when running in a server process and the GarbageCollectionLogger determines that
+   * the server is running low on memory. This is useful for iterators that aggregate KV pairs or
+   * perform long running operations that create a lot of garbage. Server side iterators can
+   * override this method and return the value of IteratorEnvironment.isRunningLowOnMemory.
+   *
+   * @return true if running in server process and server is running low on memory
+   * @since 3.0.0
+   */
+  default boolean isRunningLowOnMemory() {
+    return false;
+  }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java
index eb84bdc2c1..f7edbb4ad4 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/WrappingIterator.java
@@ -43,6 +43,7 @@ import org.apache.accumulo.core.data.Value;
 public abstract class WrappingIterator implements SortedKeyValueIterator<Key,Value> {
 
   private SortedKeyValueIterator<Key,Value> source = null;
+  private IteratorEnvironment env = null;
   boolean seenSeek = false;
 
   protected void setSource(SortedKeyValueIterator<Key,Value> source) {
@@ -89,7 +90,7 @@ public abstract class WrappingIterator implements SortedKeyValueIterator<Key,Val
   public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
       IteratorEnvironment env) throws IOException {
     this.setSource(source);
-
+    this.env = env;
   }
 
   @Override
@@ -107,4 +108,12 @@ public abstract class WrappingIterator implements SortedKeyValueIterator<Key,Val
     seenSeek = true;
   }
 
+  @Override
+  public boolean isRunningLowOnMemory() {
+    if (env == null) {
+      return SortedKeyValueIterator.super.isRunningLowOnMemory();
+    }
+    return env.isRunningLowOnMemory();
+  }
+
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
index ef5b444724..e0b1bd65a0 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
@@ -425,6 +425,20 @@ import io.micrometer.core.instrument.MeterRegistry;
  * <td>Counter</td>
  * <td></td>
  * </tr>
+ * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@link #METRICS_SCAN_PAUSED_FOR_MEM}</td>
+ * <td>Counter</td>
+ * <td></td>
+ * </tr>
+ * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@link #METRICS_SCAN_RETURN_FOR_MEM}</td>
+ * <td>Counter</td>
+ * <td></td>
+ * </tr>
  * <!-- major compactions -->
  * <tr>
  * <td>{i|e}_{compactionServiceName}_{executor_name}_queued</td>
@@ -442,6 +456,13 @@ import io.micrometer.core.instrument.MeterRegistry;
  * <td>The compaction service information is in a tag:
  * id={i|e}_{compactionServiceName}_{executor_name}</td>
  * </tr>
+ * <tr>
+ * <td></td>
+ * <td></td>
+ * <td>{@link #METRICS_MAJC_PAUSED}</td>
+ * <td>Counter</td>
+ * <td></td>
+ * </tr>
  * <!-- minor compactions -->
  * <tr>
  * <td>Queue</td>
@@ -457,6 +478,13 @@ import io.micrometer.core.instrument.MeterRegistry;
  * <td>Timer</td>
  * <td></td>
  * </tr>
+ * <tr>
+ * <td></td>
+ * <td></td>
+ * <td>{@link #METRICS_MINC_PAUSED}</td>
+ * <td>Counter</td>
+ * <td></td>
+ * </tr>
  * <!-- Updates (ingest) -->
  * <tr>
  * <td>permissionErrors</td>
@@ -595,10 +623,12 @@ public interface MetricsProducer {
   String METRICS_MAJC_PREFIX = "accumulo.tserver.compactions.majc.";
   String METRICS_MAJC_QUEUED = METRICS_MAJC_PREFIX + "queued";
   String METRICS_MAJC_RUNNING = METRICS_MAJC_PREFIX + "running";
+  String METRICS_MAJC_PAUSED = METRICS_MAJC_PREFIX + "paused";
 
   String METRICS_MINC_PREFIX = "accumulo.tserver.compactions.minc.";
   String METRICS_MINC_QUEUED = METRICS_MINC_PREFIX + "queued";
   String METRICS_MINC_RUNNING = METRICS_MINC_PREFIX + "running";
+  String METRICS_MINC_PAUSED = METRICS_MINC_PREFIX + "paused";
 
   String METRICS_SCAN = "accumulo.tserver.scans";
   String METRICS_SCAN_OPEN_FILES = METRICS_SCAN + ".files.open";
@@ -608,6 +638,8 @@ public interface MetricsProducer {
   String METRICS_SCAN_CONTINUE = METRICS_SCAN + ".continue";
   String METRICS_SCAN_CLOSE = METRICS_SCAN + ".close";
   String METRICS_SCAN_BUSY_TIMEOUT = METRICS_SCAN + ".busy_timeout";
+  String METRICS_SCAN_PAUSED_FOR_MEM = METRICS_SCAN + ".paused.for.memory";
+  String METRICS_SCAN_RETURN_FOR_MEM = METRICS_SCAN + ".return.early.for.memory";
 
   String METRICS_TSERVER_PREFIX = "accumulo.tserver.";
   String METRICS_TSERVER_ENTRIES = METRICS_TSERVER_PREFIX + "entries";
diff --git a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/ActiveCompaction.java b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/ActiveCompaction.java
index dc534d2347..3c48fe14c0 100644
--- a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/ActiveCompaction.java
+++ b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/ActiveCompaction.java
@@ -39,6 +39,7 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
   private static final org.apache.thrift.protocol.TField ENTRIES_WRITTEN_FIELD_DESC = new org.apache.thrift.protocol.TField("entriesWritten", org.apache.thrift.protocol.TType.I64, (short)9);
   private static final org.apache.thrift.protocol.TField SSI_LIST_FIELD_DESC = new org.apache.thrift.protocol.TField("ssiList", org.apache.thrift.protocol.TType.LIST, (short)10);
   private static final org.apache.thrift.protocol.TField SSIO_FIELD_DESC = new org.apache.thrift.protocol.TField("ssio", org.apache.thrift.protocol.TType.MAP, (short)11);
+  private static final org.apache.thrift.protocol.TField TIMES_PAUSED_FIELD_DESC = new org.apache.thrift.protocol.TField("timesPaused", org.apache.thrift.protocol.TType.I64, (short)12);
 
   private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new ActiveCompactionStandardSchemeFactory();
   private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new ActiveCompactionTupleSchemeFactory();
@@ -62,6 +63,7 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
   public long entriesWritten; // required
   public @org.apache.thrift.annotation.Nullable java.util.List<org.apache.accumulo.core.dataImpl.thrift.IterInfo> ssiList; // required
   public @org.apache.thrift.annotation.Nullable java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>> ssio; // required
+  public long timesPaused; // required
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -83,7 +85,8 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
     ENTRIES_READ((short)8, "entriesRead"),
     ENTRIES_WRITTEN((short)9, "entriesWritten"),
     SSI_LIST((short)10, "ssiList"),
-    SSIO((short)11, "ssio");
+    SSIO((short)11, "ssio"),
+    TIMES_PAUSED((short)12, "timesPaused");
 
     private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
 
@@ -121,6 +124,8 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
           return SSI_LIST;
         case 11: // SSIO
           return SSIO;
+        case 12: // TIMES_PAUSED
+          return TIMES_PAUSED;
         default:
           return null;
       }
@@ -167,6 +172,7 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
   private static final int __AGE_ISSET_ID = 0;
   private static final int __ENTRIESREAD_ISSET_ID = 1;
   private static final int __ENTRIESWRITTEN_ISSET_ID = 2;
+  private static final int __TIMESPAUSED_ISSET_ID = 3;
   private byte __isset_bitfield = 0;
   public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
@@ -199,6 +205,8 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
             new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, 
                 new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING), 
                 new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)))));
+    tmpMap.put(_Fields.TIMES_PAUSED, new org.apache.thrift.meta_data.FieldMetaData("timesPaused", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
     metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ActiveCompaction.class, metaDataMap);
   }
@@ -217,7 +225,8 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
     long entriesRead,
     long entriesWritten,
     java.util.List<org.apache.accumulo.core.dataImpl.thrift.IterInfo> ssiList,
-    java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>> ssio)
+    java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>> ssio,
+    long timesPaused)
   {
     this();
     this.extent = extent;
@@ -234,6 +243,8 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
     setEntriesWrittenIsSet(true);
     this.ssiList = ssiList;
     this.ssio = ssio;
+    this.timesPaused = timesPaused;
+    setTimesPausedIsSet(true);
   }
 
   /**
@@ -285,6 +296,7 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
       }
       this.ssio = __this__ssio;
     }
+    this.timesPaused = other.timesPaused;
   }
 
   @Override
@@ -308,6 +320,8 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
     this.entriesWritten = 0;
     this.ssiList = null;
     this.ssio = null;
+    setTimesPausedIsSet(false);
+    this.timesPaused = 0;
   }
 
   @org.apache.thrift.annotation.Nullable
@@ -638,6 +652,29 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
     }
   }
 
+  public long getTimesPaused() {
+    return this.timesPaused;
+  }
+
+  public ActiveCompaction setTimesPaused(long timesPaused) {
+    this.timesPaused = timesPaused;
+    setTimesPausedIsSet(true);
+    return this;
+  }
+
+  public void unsetTimesPaused() {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __TIMESPAUSED_ISSET_ID);
+  }
+
+  /** Returns true if field timesPaused is set (has been assigned a value) and false otherwise */
+  public boolean isSetTimesPaused() {
+    return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __TIMESPAUSED_ISSET_ID);
+  }
+
+  public void setTimesPausedIsSet(boolean value) {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __TIMESPAUSED_ISSET_ID, value);
+  }
+
   @Override
   public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
     switch (field) {
@@ -729,6 +766,14 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
       }
       break;
 
+    case TIMES_PAUSED:
+      if (value == null) {
+        unsetTimesPaused();
+      } else {
+        setTimesPaused((java.lang.Long)value);
+      }
+      break;
+
     }
   }
 
@@ -769,6 +814,9 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
     case SSIO:
       return getSsio();
 
+    case TIMES_PAUSED:
+      return getTimesPaused();
+
     }
     throw new java.lang.IllegalStateException();
   }
@@ -803,6 +851,8 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
       return isSetSsiList();
     case SSIO:
       return isSetSsio();
+    case TIMES_PAUSED:
+      return isSetTimesPaused();
     }
     throw new java.lang.IllegalStateException();
   }
@@ -919,6 +969,15 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
         return false;
     }
 
+    boolean this_present_timesPaused = true;
+    boolean that_present_timesPaused = true;
+    if (this_present_timesPaused || that_present_timesPaused) {
+      if (!(this_present_timesPaused && that_present_timesPaused))
+        return false;
+      if (this.timesPaused != that.timesPaused)
+        return false;
+    }
+
     return true;
   }
 
@@ -964,6 +1023,8 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
     if (isSetSsio())
       hashCode = hashCode * 8191 + ssio.hashCode();
 
+    hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(timesPaused);
+
     return hashCode;
   }
 
@@ -1085,6 +1146,16 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
         return lastComparison;
       }
     }
+    lastComparison = java.lang.Boolean.compare(isSetTimesPaused(), other.isSetTimesPaused());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetTimesPaused()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.timesPaused, other.timesPaused);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -1184,6 +1255,10 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
       sb.append(this.ssio);
     }
     first = false;
+    if (!first) sb.append(", ");
+    sb.append("timesPaused:");
+    sb.append(this.timesPaused);
+    first = false;
     sb.append(")");
     return sb.toString();
   }
@@ -1368,6 +1443,14 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 12: // TIMES_PAUSED
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.timesPaused = iprot.readI64();
+              struct.setTimesPausedIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -1463,6 +1546,9 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
         }
         oprot.writeFieldEnd();
       }
+      oprot.writeFieldBegin(TIMES_PAUSED_FIELD_DESC);
+      oprot.writeI64(struct.timesPaused);
+      oprot.writeFieldEnd();
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -1515,7 +1601,10 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
       if (struct.isSetSsio()) {
         optionals.set(10);
       }
-      oprot.writeBitSet(optionals, 11);
+      if (struct.isSetTimesPaused()) {
+        optionals.set(11);
+      }
+      oprot.writeBitSet(optionals, 12);
       if (struct.isSetExtent()) {
         struct.extent.write(oprot);
       }
@@ -1575,12 +1664,15 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
           }
         }
       }
+      if (struct.isSetTimesPaused()) {
+        oprot.writeI64(struct.timesPaused);
+      }
     }
 
     @Override
     public void read(org.apache.thrift.protocol.TProtocol prot, ActiveCompaction struct) throws org.apache.thrift.TException {
       org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
-      java.util.BitSet incoming = iprot.readBitSet(11);
+      java.util.BitSet incoming = iprot.readBitSet(12);
       if (incoming.get(0)) {
         struct.extent = new org.apache.accumulo.core.dataImpl.thrift.TKeyExtent();
         struct.extent.read(iprot);
@@ -1667,6 +1759,10 @@ public class ActiveCompaction implements org.apache.thrift.TBase<ActiveCompactio
         }
         struct.setSsioIsSet(true);
       }
+      if (incoming.get(11)) {
+        struct.timesPaused = iprot.readI64();
+        struct.setTimesPausedIsSet(true);
+      }
     }
   }
 
diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift
index 445211ad61..0098a9ca93 100644
--- a/core/src/main/thrift/tabletserver.thrift
+++ b/core/src/main/thrift/tabletserver.thrift
@@ -83,6 +83,7 @@ struct ActiveCompaction {
   9:i64 entriesWritten
   10:list<data.IterInfo> ssiList
   11:map<string, map<string, string>> ssio
+  12:i64 timesPaused
 }
 
 struct TIteratorSetting {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
index 9d1d4c4a0d..6b47fd5fea 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
@@ -18,6 +18,8 @@
  */
 package org.apache.accumulo.server;
 
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.accumulo.core.Constants;
@@ -27,6 +29,8 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.metrics.MetricsUtil;
 import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.server.mem.LowMemoryDetector;
 import org.apache.accumulo.server.security.SecurityUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,6 +59,11 @@ public abstract class AbstractServer implements AutoCloseable, Runnable {
       // Server-side "client" check to make sure we're logged in as a user we expect to be
       context.enforceKerberosLogin();
     }
+    final LowMemoryDetector lmd = context.getLowMemoryDetector();
+    ScheduledFuture<?> future = context.getScheduledExecutor().scheduleWithFixedDelay(
+        () -> lmd.logGCInfo(context.getConfiguration()), 0,
+        lmd.getIntervalMillis(context.getConfiguration()), TimeUnit.MILLISECONDS);
+    ThreadPools.watchNonCriticalScheduledTask(future);
   }
 
   /**
diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
index e2286115c2..90cb171b88 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
@@ -66,6 +66,7 @@ import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.conf.store.PropStore;
 import org.apache.accumulo.server.conf.store.impl.ZooPropStore;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.mem.LowMemoryDetector;
 import org.apache.accumulo.server.metadata.ServerAmpleImpl;
 import org.apache.accumulo.server.rpc.SaslServerConnectionParams;
 import org.apache.accumulo.server.rpc.ThriftServerType;
@@ -102,6 +103,7 @@ public class ServerContext extends ClientContext {
   private final Supplier<ScheduledThreadPoolExecutor> sharedScheduledThreadPool;
   private final Supplier<AuditedSecurityOperation> securityOperation;
   private final Supplier<CryptoServiceFactory> cryptoFactorySupplier;
+  private final Supplier<LowMemoryDetector> lowMemoryDetector;
 
   public ServerContext(SiteConfiguration siteConfig) {
     this(new ServerInfo(siteConfig));
@@ -127,6 +129,7 @@ public class ServerContext extends ClientContext {
     securityOperation =
         memoize(() -> new AuditedSecurityOperation(this, SecurityOperation.getAuthorizor(this),
             SecurityOperation.getAuthenticator(this), SecurityOperation.getPermHandler(this)));
+    lowMemoryDetector = memoize(() -> new LowMemoryDetector());
   }
 
   /**
@@ -453,4 +456,8 @@ public class ServerContext extends ClientContext {
     return zkUserPath.get();
   }
 
+  public LowMemoryDetector getLowMemoryDetector() {
+    return lowMemoryDetector.get();
+  }
+
 }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
index 500653964d..05b8fa2b72 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
@@ -38,12 +38,14 @@ public class CompactionInfo {
   private final String localityGroup;
   private final long entriesRead;
   private final long entriesWritten;
+  private final long timesPaused;
   private final TCompactionReason reason;
 
   CompactionInfo(FileCompactor compactor) {
     this.localityGroup = compactor.getCurrentLocalityGroup();
     this.entriesRead = compactor.getEntriesRead();
     this.entriesWritten = compactor.getEntriesWritten();
+    this.timesPaused = compactor.getTimesPaused();
     this.reason = compactor.getReason();
     this.compactor = compactor;
   }
@@ -64,6 +66,10 @@ public class CompactionInfo {
     return entriesWritten;
   }
 
+  public long getTimesPaused() {
+    return timesPaused;
+  }
+
   public Thread getThread() {
     return compactor.thread;
   }
@@ -100,6 +106,6 @@ public class CompactionInfo {
         .collect(Collectors.toList());
     return new ActiveCompaction(compactor.extent.toThrift(),
         System.currentTimeMillis() - compactor.getStartTime(), files, compactor.getOutputFile(),
-        type, reason, localityGroup, entriesRead, entriesWritten, iiList, iterOptions);
+        type, reason, localityGroup, entriesRead, entriesWritten, iiList, iterOptions, timesPaused);
   }
 }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionStats.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionStats.java
index e3ccfdd798..9704726d2f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionStats.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionStats.java
@@ -22,10 +22,12 @@ public class CompactionStats {
   private long entriesRead;
   private long entriesWritten;
   private long fileSize;
+  private int timesPaused;
 
-  public CompactionStats(long er, long ew) {
+  public CompactionStats(long er, long ew, int tp) {
     this.setEntriesRead(er);
     this.setEntriesWritten(ew);
+    this.setTimesPaused(tp);
   }
 
   public CompactionStats() {}
@@ -46,9 +48,18 @@ public class CompactionStats {
     return entriesWritten;
   }
 
+  public long getTimesPaused() {
+    return timesPaused;
+  }
+
+  public void setTimesPaused(int timesPaused) {
+    this.timesPaused = timesPaused;
+  }
+
   public void add(CompactionStats mcs) {
     this.entriesRead += mcs.entriesRead;
     this.entriesWritten += mcs.entriesWritten;
+    this.timesPaused += mcs.timesPaused;
   }
 
   public void setFileSize(long fileSize) {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
index 27eef8def0..f8ba0011c9 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.client.IteratorSetting;
@@ -67,6 +68,7 @@ import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
+import org.apache.accumulo.server.mem.LowMemoryDetector.DetectionScope;
 import org.apache.accumulo.server.problems.ProblemReport;
 import org.apache.accumulo.server.problems.ProblemReportingIterator;
 import org.apache.accumulo.server.problems.ProblemReports;
@@ -113,6 +115,7 @@ public class FileCompactor implements Callable<CompactionStats> {
   protected final KeyExtent extent;
   private final List<IteratorSetting> iterators;
   private final CryptoService cryptoService;
+  private final PausedCompactionMetrics metrics;
 
   // things to report
   private String currentLocalityGroup = "";
@@ -120,6 +123,7 @@ public class FileCompactor implements Callable<CompactionStats> {
 
   private final AtomicLong entriesRead = new AtomicLong(0);
   private final AtomicLong entriesWritten = new AtomicLong(0);
+  private final AtomicInteger timesPaused = new AtomicInteger(0);
   private final DateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
 
   // a unique id to identify a compactor
@@ -142,6 +146,7 @@ public class FileCompactor implements Callable<CompactionStats> {
   private void clearStats() {
     entriesRead.set(0);
     entriesWritten.set(0);
+    timesPaused.set(0);
   }
 
   protected static final Set<FileCompactor> runningCompactions =
@@ -162,7 +167,7 @@ public class FileCompactor implements Callable<CompactionStats> {
   public FileCompactor(ServerContext context, KeyExtent extent,
       Map<StoredTabletFile,DataFileValue> files, TabletFile outputFile, boolean propagateDeletes,
       CompactionEnv env, List<IteratorSetting> iterators, AccumuloConfiguration tableConfiguation,
-      CryptoService cs) {
+      CryptoService cs, PausedCompactionMetrics metrics) {
     this.context = context;
     this.extent = extent;
     this.fs = context.getVolumeManager();
@@ -173,6 +178,7 @@ public class FileCompactor implements Callable<CompactionStats> {
     this.env = env;
     this.iterators = iterators;
     this.cryptoService = cs;
+    this.metrics = metrics;
 
     startTime = System.currentTimeMillis();
   }
@@ -268,10 +274,11 @@ public class FileCompactor implements Callable<CompactionStats> {
 
       log.trace(String.format(
           "Compaction %s %,d read | %,d written | %,6d entries/sec"
-              + " | %,6.3f secs | %,12d bytes | %9.3f byte/sec",
+              + " | %,6.3f secs | %,12d bytes | %9.3f byte/sec | %,d paused",
           extent, majCStats.getEntriesRead(), majCStats.getEntriesWritten(),
           (int) (majCStats.getEntriesRead() / ((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0,
-          mfwTmp.getLength(), mfwTmp.getLength() / ((t2 - t1) / 1000.0)));
+          mfwTmp.getLength(), mfwTmp.getLength() / ((t2 - t1) / 1000.0),
+          majCStats.getTimesPaused()));
 
       majCStats.setFileSize(mfwTmp.getLength());
       return majCStats;
@@ -406,9 +413,31 @@ public class FileCompactor implements Callable<CompactionStats> {
         mfw.startDefaultLocalityGroup();
       }
 
+      DetectionScope scope =
+          env.getIteratorScope() == IteratorScope.minc ? DetectionScope.MINC : DetectionScope.MAJC;
       Span writeSpan = TraceUtil.startSpan(this.getClass(), "write");
       try (Scope write = writeSpan.makeCurrent()) {
         while (itr.hasTop() && env.isCompactionEnabled()) {
+
+          while (context.getLowMemoryDetector().isRunningLowOnMemory(context, scope, () -> {
+            return !extent.isMeta();
+          }, () -> {
+            log.info("Pausing compaction because low on memory, extent: {}", extent);
+            timesPaused.incrementAndGet();
+            if (scope == DetectionScope.MINC) {
+              metrics.incrementMinCPause();
+            } else {
+              metrics.incrementMajCPause();
+            }
+            try {
+              Thread.sleep(500);
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              throw new IllegalStateException(
+                  "Interrupted while waiting for low memory condition to resolve", e);
+            }
+          })) {}
+
           mfw.append(itr.getTopKey(), itr.getTopValue());
           itr.next();
           entriesCompacted++;
@@ -436,12 +465,12 @@ public class FileCompactor implements Callable<CompactionStats> {
         }
 
       } finally {
-        CompactionStats lgMajcStats = new CompactionStats(citr.getCount(), entriesCompacted);
+        CompactionStats lgMajcStats =
+            new CompactionStats(citr.getCount(), entriesCompacted, timesPaused.get());
         majCStats.add(lgMajcStats);
         writeSpan.end();
       }
-
-    } catch (Exception e) {
+    } catch (IOException | CompactionCanceledException e) {
       TraceUtil.setException(compactSpan, e, true);
       throw e;
     } finally {
@@ -477,6 +506,10 @@ public class FileCompactor implements Callable<CompactionStats> {
     return entriesWritten.get();
   }
 
+  long getTimesPaused() {
+    return timesPaused.get();
+  }
+
   long getStartTime() {
     return startTime;
   }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionStats.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java
similarity index 50%
copy from server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionStats.java
copy to server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java
index e3ccfdd798..b8731cda08 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionStats.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java
@@ -18,44 +18,31 @@
  */
 package org.apache.accumulo.server.compaction;
 
-public class CompactionStats {
-  private long entriesRead;
-  private long entriesWritten;
-  private long fileSize;
-
-  public CompactionStats(long er, long ew) {
-    this.setEntriesRead(er);
-    this.setEntriesWritten(ew);
-  }
-
-  public CompactionStats() {}
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.core.metrics.MetricsUtil;
 
-  private void setEntriesRead(long entriesRead) {
-    this.entriesRead = entriesRead;
-  }
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.MeterRegistry;
 
-  public long getEntriesRead() {
-    return entriesRead;
-  }
+public class PausedCompactionMetrics implements MetricsProducer {
 
-  private void setEntriesWritten(long entriesWritten) {
-    this.entriesWritten = entriesWritten;
-  }
+  private Counter majcPauses;
+  private Counter mincPauses;
 
-  public long getEntriesWritten() {
-    return entriesWritten;
+  public void incrementMinCPause() {
+    mincPauses.increment();
   }
 
-  public void add(CompactionStats mcs) {
-    this.entriesRead += mcs.entriesRead;
-    this.entriesWritten += mcs.entriesWritten;
+  public void incrementMajCPause() {
+    majcPauses.increment();
   }
 
-  public void setFileSize(long fileSize) {
-    this.fileSize = fileSize;
+  @Override
+  public void registerMetrics(MeterRegistry registry) {
+    majcPauses = Counter.builder(METRICS_MAJC_PAUSED).description("major compaction pause count")
+        .tags(MetricsUtil.getCommonTags()).register(registry);
+    mincPauses = Counter.builder(METRICS_MINC_PAUSED).description("minor compactor pause count")
+        .tags(MetricsUtil.getCommonTags()).register(registry);
   }
 
-  public long getFileSize() {
-    return this.fileSize;
-  }
 }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironment.java b/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironment.java
index 72daff609c..e13fa93e80 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironment.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironment.java
@@ -30,4 +30,9 @@ public interface SystemIteratorEnvironment extends IteratorEnvironment {
 
   SortedKeyValueIterator<Key,Value> getTopLevelIterator(SortedKeyValueIterator<Key,Value> iter);
 
+  @Override
+  default boolean isRunningLowOnMemory() {
+    return getServerContext().getLowMemoryDetector().isRunningLowOnMemory();
+  }
+
 }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
similarity index 58%
rename from server/base/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
rename to server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
index 469ec5632d..362f6ef173 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
@@ -16,35 +16,94 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.server;
+package org.apache.accumulo.server.mem;
 
 import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.server.ServerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class GarbageCollectionLogger {
-  private static final Logger log = LoggerFactory.getLogger(GarbageCollectionLogger.class);
+public class LowMemoryDetector {
+
+  @FunctionalInterface
+  public static interface Action {
+    void execute();
+  }
+
+  public enum DetectionScope {
+    MINC, MAJC, SCAN
+  };
+
+  private static final Logger log = LoggerFactory.getLogger(LowMemoryDetector.class);
 
   private final HashMap<String,Long> prevGcTime = new HashMap<>();
   private long lastMemorySize = 0;
   private long gcTimeIncreasedCount = 0;
-  private static long lastMemoryCheckTime = 0;
-  private static final Lock memCheckTimeLock = new ReentrantLock();
+  private long lastMemoryCheckTime = 0;
+  private final Lock memCheckTimeLock = new ReentrantLock();
+  private volatile boolean runningLowOnMemory = false;
+
+  public long getIntervalMillis(AccumuloConfiguration conf) {
+    return conf.getTimeInMillis(Property.GENERAL_LOW_MEM_DETECTOR_INTERVAL);
+  }
+
+  public boolean isRunningLowOnMemory() {
+    return runningLowOnMemory;
+  }
+
+  /**
+   * @param context server context
+   * @param scope whether this is being checked in the context of scan or compact code
+   * @param isUserTable boolean as to whether the table being scanned / compacted is a user table.
+   *        No action is taken for system tables.
+   * @param action Action to perform when this method returns true
+   * @return true if server running low on memory
+   */
+  public boolean isRunningLowOnMemory(ServerContext context, DetectionScope scope,
+      Supplier<Boolean> isUserTable, Action action) {
+    if (isUserTable.get()) {
+      Property p = null;
+      switch (scope) {
+        case SCAN:
+          p = Property.GENERAL_LOW_MEM_SCAN_PROTECTION;
+          break;
+        case MINC:
+          p = Property.GENERAL_LOW_MEM_MINC_PROTECTION;
+          break;
+        case MAJC:
+          p = Property.GENERAL_LOW_MEM_MAJC_PROTECTION;
+          break;
+        default:
+          throw new IllegalArgumentException("Unknown scope: " + scope);
+      }
+      boolean isEnabled = context.getConfiguration().getBoolean(p);
+      // Only incur the penalty of accessing the volatile variable when enabled for this scope
+      if (isEnabled) {
+        action.execute();
+        return runningLowOnMemory;
+      }
+    }
+    return false;
+  }
 
   public void logGCInfo(AccumuloConfiguration conf) {
 
+    Double freeMemoryPercentage = conf.getFraction(Property.GENERAL_LOW_MEM_DETECTOR_THRESHOLD);
+
     memCheckTimeLock.lock();
     try {
-      final long now = System.currentTimeMillis();
+      final long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
 
       List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
       Runtime rt = Runtime.getRuntime();
@@ -81,9 +140,19 @@ public class GarbageCollectionLogger {
         gcTimeIncreasedCount = 0;
       } else {
         gcTimeIncreasedCount++;
-        if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * 0.05) {
+        if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * freeMemoryPercentage) {
+          runningLowOnMemory = true;
           log.warn("Running low on memory");
           gcTimeIncreasedCount = 0;
+        } else {
+          // If we were running low on memory, but are not any longer, than log at warn
+          // so that it shows up in the logs
+          if (runningLowOnMemory) {
+            log.warn("Recovered from low memory condition");
+          } else {
+            log.trace("Not running low on memory");
+          }
+          runningLowOnMemory = false;
         }
       }
 
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 5c73ed4b46..1705701ec1 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -77,7 +77,6 @@ import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.core.util.compaction.RunningCompaction;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.server.AbstractServer;
-import org.apache.accumulo.server.GarbageCollectionLogger;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.manager.LiveTServerSet;
 import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection;
@@ -101,7 +100,6 @@ public class CompactionCoordinator extends AbstractServer
     implements CompactionCoordinatorService.Iface, LiveTServerSet.Listener {
 
   private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class);
-  private static final long TIME_BETWEEN_GC_CHECKS = 5000;
   private static final long FIFTEEN_MINUTES = TimeUnit.MINUTES.toMillis(15);
 
   protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries();
@@ -122,7 +120,6 @@ public class CompactionCoordinator extends AbstractServer
   /* Map of queue name to last time compactor called to get a compaction job */
   private static final Map<String,Long> TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>();
 
-  private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
   protected SecurityOperation security;
   protected final AccumuloConfiguration aconf;
   protected CompactionFinalizer compactionFinalizer;
@@ -146,7 +143,6 @@ public class CompactionCoordinator extends AbstractServer
     compactionFinalizer = createCompactionFinalizer(schedExecutor);
     tserverSet = createLiveTServerSet();
     setupSecurity();
-    startGCLogger(schedExecutor);
     printStartupMsg();
     startCompactionCleaner(schedExecutor);
     startRunningCleaner(schedExecutor);
@@ -170,13 +166,6 @@ public class CompactionCoordinator extends AbstractServer
     security = getContext().getSecurityOperation();
   }
 
-  protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {
-    ScheduledFuture<?> future =
-        schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0,
-            TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
-    ThreadPools.watchNonCriticalScheduledTask(future);
-  }
-
   protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) {
     ScheduledFuture<?> future =
         schedExecutor.scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5, TimeUnit.MINUTES);
diff --git a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
index 7225960a53..746f552a3c 100644
--- a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
+++ b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
@@ -127,9 +127,6 @@ public class CompactionCoordinatorTest {
     @Override
     protected void setupSecurity() {}
 
-    @Override
-    protected void startGCLogger(ScheduledThreadPoolExecutor stpe) {}
-
     @Override
     protected void printStartupMsg() {}
 
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index a9f0bfbf40..f09484cb9d 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -32,7 +32,6 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -93,10 +92,10 @@ import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.server.AbstractServer;
-import org.apache.accumulo.server.GarbageCollectionLogger;
 import org.apache.accumulo.server.compaction.CompactionInfo;
 import org.apache.accumulo.server.compaction.CompactionWatcher;
 import org.apache.accumulo.server.compaction.FileCompactor;
+import org.apache.accumulo.server.compaction.PausedCompactionMetrics;
 import org.apache.accumulo.server.compaction.RetryableThriftCall;
 import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
 import org.apache.accumulo.server.conf.TableConfiguration;
@@ -123,14 +122,12 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac
   private static final SecureRandom random = new SecureRandom();
 
   private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
-  private static final long TIME_BETWEEN_GC_CHECKS = 5000;
   private static final long TIME_BETWEEN_CANCEL_CHECKS = MINUTES.toMillis(5);
 
   private static final long TEN_MEGABYTES = 10485760;
 
   protected static final CompactionJobHolder JOB_HOLDER = new CompactionJobHolder();
 
-  private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
   private final UUID compactorId = UUID.randomUUID();
   private final AccumuloConfiguration aconf;
   private final String queueName;
@@ -141,6 +138,7 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac
   private SecurityOperation security;
   private ServiceLock compactorLock;
   private ServerAddress compactorAddress = null;
+  private PausedCompactionMetrics pausedMetrics;
 
   // Exposed for tests
   protected volatile boolean shutdown = false;
@@ -159,7 +157,6 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac
     watcher = new CompactionWatcher(aconf);
     var schedExecutor =
         ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(aconf);
-    startGCLogger(schedExecutor);
     startCancelChecker(schedExecutor, TIME_BETWEEN_CANCEL_CHECKS);
     printStartupMsg();
   }
@@ -180,13 +177,6 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac
     security = getContext().getSecurityOperation();
   }
 
-  protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {
-    ScheduledFuture<?> future =
-        schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0,
-            TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
-    ThreadPools.watchNonCriticalScheduledTask(future);
-  }
-
   protected void startCancelChecker(ScheduledThreadPoolExecutor schedExecutor,
       long timeBetweenChecks) {
     ThreadPools.watchCriticalScheduledTask(schedExecutor.scheduleWithFixedDelay(
@@ -275,7 +265,7 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac
       public void lostLock(final LockLossReason reason) {
         Halt.halt(1, () -> {
           LOG.error("Compactor lost lock (reason = {}), exiting.", reason);
-          gcLogger.logGCInfo(getConfiguration());
+          getContext().getLowMemoryDetector().logGCInfo(getConfiguration());
         });
       }
 
@@ -526,8 +516,9 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac
             .forEach(tis -> iters.add(SystemIteratorUtil.toIteratorSetting(tis)));
 
         ExtCEnv cenv = new ExtCEnv(JOB_HOLDER, queueName);
-        FileCompactor compactor = new FileCompactor(getContext(), extent, files, outputFile,
-            job.isPropagateDeletes(), cenv, iters, aConfig, tConfig.getCryptoService());
+        FileCompactor compactor =
+            new FileCompactor(getContext(), extent, files, outputFile, job.isPropagateDeletes(),
+                cenv, iters, aConfig, tConfig.getCryptoService(), pausedMetrics);
 
         LOG.trace("Starting compactor");
         started.countDown();
@@ -610,7 +601,8 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac
         | SecurityException e1) {
       LOG.error("Error initializing metrics, metrics will not be emitted.", e1);
     }
-    MetricsUtil.initializeProducers(this);
+    pausedMetrics = new PausedCompactionMetrics();
+    MetricsUtil.initializeProducers(this, pausedMetrics);
 
     LOG.info("Compactor started, waiting for work");
     try {
@@ -671,9 +663,9 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac
                       Float.toString((info.getEntriesRead() / (float) inputEntries) * 100);
                 }
                 String message = String.format(
-                    "Compaction in progress, read %d of %d input entries ( %s %s ), written %d entries",
+                    "Compaction in progress, read %d of %d input entries ( %s %s ), written %d entries, paused %d times",
                     info.getEntriesRead(), inputEntries, percentComplete, "%",
-                    info.getEntriesWritten());
+                    info.getEntriesWritten(), info.getTimesPaused());
                 watcher.run();
                 try {
                   LOG.debug("Updating coordinator with compaction progress: {}.", message);
@@ -785,7 +777,7 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac
         LOG.warn("Failed to close filesystem : {}", e.getMessage(), e);
       }
 
-      gcLogger.logGCInfo(getConfiguration());
+      getContext().getLowMemoryDetector().logGCInfo(getConfiguration());
       LOG.info("stop requested. exiting ... ");
       try {
         if (null != compactorLock) {
diff --git a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
index 11b68e007a..e61f9f1903 100644
--- a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
+++ b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
@@ -28,7 +28,6 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Supplier;
@@ -51,6 +50,7 @@ import org.apache.accumulo.server.AbstractServer;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.mem.LowMemoryDetector;
 import org.apache.accumulo.server.rpc.ServerAddress;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
@@ -192,9 +192,6 @@ public class CompactorTest {
     @Override
     protected void setupSecurity() {}
 
-    @Override
-    protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {}
-
     @Override
     protected void printStartupMsg() {}
 
@@ -335,6 +332,7 @@ public class CompactorTest {
 
     ServerContext context = PowerMock.createNiceMock(ServerContext.class);
     expect(context.getConfiguration()).andReturn(conf).anyTimes();
+    expect(context.getLowMemoryDetector()).andReturn(new LowMemoryDetector()).anyTimes();
     ZooReaderWriter zrw = PowerMock.createNiceMock(ZooReaderWriter.class);
     ZooKeeper zk = PowerMock.createNiceMock(ZooKeeper.class);
     expect(context.getZooReaderWriter()).andReturn(zrw).anyTimes();
@@ -383,6 +381,7 @@ public class CompactorTest {
 
     ServerContext context = PowerMock.createNiceMock(ServerContext.class);
     expect(context.getConfiguration()).andReturn(conf).anyTimes();
+    expect(context.getLowMemoryDetector()).andReturn(new LowMemoryDetector()).anyTimes();
     ZooReaderWriter zrw = PowerMock.createNiceMock(ZooReaderWriter.class);
     ZooKeeper zk = PowerMock.createNiceMock(ZooKeeper.class);
     expect(context.getZooReaderWriter()).andReturn(zrw).anyTimes();
@@ -431,6 +430,7 @@ public class CompactorTest {
 
     ServerContext context = PowerMock.createNiceMock(ServerContext.class);
     expect(context.getConfiguration()).andReturn(conf).anyTimes();
+    expect(context.getLowMemoryDetector()).andReturn(new LowMemoryDetector()).anyTimes();
     ZooReaderWriter zrw = PowerMock.createNiceMock(ZooReaderWriter.class);
     ZooKeeper zk = PowerMock.createNiceMock(ZooKeeper.class);
     expect(context.getZooReaderWriter()).andReturn(zrw).anyTimes();
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 5a2608b1b1..77047f013b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -87,8 +87,8 @@ import org.apache.accumulo.core.util.ServiceLockData.ThriftService;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.server.AbstractServer;
-import org.apache.accumulo.server.GarbageCollectionLogger;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.compaction.PausedCompactionMetrics;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.rpc.ServerAddress;
@@ -182,7 +182,6 @@ public class ScanServer extends AbstractServer
   private final SessionManager sessionManager;
   private final TabletServerResourceManager resourceManager;
   HostAndPort clientAddress;
-  private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
 
   private volatile boolean serverStopRequested = false;
   private ServiceLock scanServerLock;
@@ -313,7 +312,7 @@ public class ScanServer extends AbstractServer
             if (!serverStopRequested) {
               LOG.error("Lost tablet server lock (reason = {}), exiting.", reason);
             }
-            gcLogger.logGCInfo(getConfiguration());
+            context.getLowMemoryDetector().logGCInfo(getConfiguration());
           });
         }
 
@@ -392,7 +391,7 @@ public class ScanServer extends AbstractServer
         LOG.warn("Failed to close filesystem : {}", e.getMessage(), e);
       }
 
-      gcLogger.logGCInfo(getConfiguration());
+      context.getLowMemoryDetector().logGCInfo(getConfiguration());
       LOG.info("stop requested. exiting ... ");
       try {
         if (null != lock) {
@@ -992,6 +991,12 @@ public class ScanServer extends AbstractServer
     return scanMetrics;
   }
 
+  @Override
+  public PausedCompactionMetrics getPausedCompactionMetrics() {
+    // ScanServer does not perform compactions
+    return null;
+  }
+
   @Override
   public Session getSession(long scanID) {
     return sessionManager.getSession(scanID);
@@ -1012,11 +1017,6 @@ public class ScanServer extends AbstractServer
     return managerLockCache;
   }
 
-  @Override
-  public GarbageCollectionLogger getGcLogger() {
-    return gcLogger;
-  }
-
   @Override
   public BlockCacheConfiguration getBlockCacheConfiguration(AccumuloConfiguration acuConf) {
     return BlockCacheConfiguration.forScanServer(acuConf);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
index 1f6a649a4c..09aa2169c4 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
@@ -1072,7 +1072,7 @@ public class TabletClientHandler implements TabletServerClientService.Iface,
       Halt.halt(1, () -> {
         log.info("Tablet server no longer holds lock during checkPermission() : {}, exiting",
             request);
-        server.getGcLogger().logGCInfo(server.getConfiguration());
+        context.getLowMemoryDetector().logGCInfo(server.getConfiguration());
       });
     }
 
@@ -1260,7 +1260,7 @@ public class TabletClientHandler implements TabletServerClientService.Iface,
 
     Halt.halt(0, () -> {
       log.info("Manager requested tablet server halt");
-      server.gcLogger.logGCInfo(server.getConfiguration());
+      context.getLowMemoryDetector().logGCInfo(server.getConfiguration());
       server.requestStop();
       try {
         server.getLock().unlock();
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java
index 3715eac0c2..bbff2e1377 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java
@@ -24,8 +24,8 @@ import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
 import org.apache.accumulo.core.fate.zookeeper.ZooCache;
 import org.apache.accumulo.core.spi.cache.BlockCacheManager;
 import org.apache.accumulo.core.spi.scan.ScanServerInfo;
-import org.apache.accumulo.server.GarbageCollectionLogger;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.compaction.PausedCompactionMetrics;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
 import org.apache.accumulo.tserver.session.Session;
@@ -50,6 +50,8 @@ public interface TabletHostingServer {
 
   TabletServerScanMetrics getScanMetrics();
 
+  PausedCompactionMetrics getPausedCompactionMetrics();
+
   Session getSession(long scanID);
 
   TableConfiguration getTableConfiguration(KeyExtent threadPoolExtent);
@@ -58,7 +60,5 @@ public interface TabletHostingServer {
 
   ZooCache getManagerLockCache();
 
-  GarbageCollectionLogger getGcLogger();
-
   BlockCacheManager.Configuration getBlockCacheConfiguration(AccumuloConfiguration acuConf);
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 1f614035f1..521335e1fc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -109,11 +109,11 @@ import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.server.AbstractServer;
-import org.apache.accumulo.server.GarbageCollectionLogger;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.TabletLevel;
 import org.apache.accumulo.server.client.ClientServiceHandler;
 import org.apache.accumulo.server.compaction.CompactionWatcher;
+import org.apache.accumulo.server.compaction.PausedCompactionMetrics;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl;
 import org.apache.accumulo.server.fs.VolumeManager;
@@ -175,10 +175,8 @@ public class TabletServer extends AbstractServer implements TabletHostingServer
 
   private static final SecureRandom random = new SecureRandom();
   private static final Logger log = LoggerFactory.getLogger(TabletServer.class);
-  private static final long TIME_BETWEEN_GC_CHECKS = TimeUnit.SECONDS.toMillis(5);
   private static final long TIME_BETWEEN_LOCATOR_CACHE_CLEARS = TimeUnit.HOURS.toMillis(1);
 
-  final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
   final ZooCache managerLockCache;
 
   final TabletServerLogger logger;
@@ -188,6 +186,7 @@ public class TabletServer extends AbstractServer implements TabletHostingServer
   TabletServerScanMetrics scanMetrics;
   TabletServerMinCMetrics mincMetrics;
   CompactionExecutorsMetrics ceMetrics;
+  PausedCompactionMetrics pausedMetrics;
 
   @Override
   public TabletServerScanMetrics getScanMetrics() {
@@ -198,6 +197,11 @@ public class TabletServer extends AbstractServer implements TabletHostingServer
     return mincMetrics;
   }
 
+  @Override
+  public PausedCompactionMetrics getPausedCompactionMetrics() {
+    return pausedMetrics;
+  }
+
   private final LogSorter logSorter;
   final TabletStatsKeeper statsKeeper;
   private final AtomicInteger logIdGenerator = new AtomicInteger();
@@ -612,11 +616,6 @@ public class TabletServer extends AbstractServer implements TabletHostingServer
     return managerLockCache;
   }
 
-  @Override
-  public GarbageCollectionLogger getGcLogger() {
-    return gcLogger;
-  }
-
   private void announceExistence() {
     ZooReaderWriter zoo = getContext().getZooReaderWriter();
     try {
@@ -644,7 +643,7 @@ public class TabletServer extends AbstractServer implements TabletHostingServer
             if (!serverStopRequested) {
               log.error("Lost tablet server lock (reason = {}), exiting.", reason);
             }
-            gcLogger.logGCInfo(getConfiguration());
+            context.getLowMemoryDetector().logGCInfo(getConfiguration());
           });
         }
 
@@ -717,7 +716,9 @@ public class TabletServer extends AbstractServer implements TabletHostingServer
     scanMetrics = new TabletServerScanMetrics();
     mincMetrics = new TabletServerMinCMetrics();
     ceMetrics = new CompactionExecutorsMetrics();
-    MetricsUtil.initializeProducers(metrics, updateMetrics, scanMetrics, mincMetrics, ceMetrics);
+    pausedMetrics = new PausedCompactionMetrics();
+    MetricsUtil.initializeProducers(metrics, updateMetrics, scanMetrics, mincMetrics, ceMetrics,
+        pausedMetrics);
 
     this.compactionManager = new CompactionManager(() -> Iterators
         .transform(onlineTablets.snapshot().values().iterator(), Tablet::asCompactable),
@@ -889,7 +890,7 @@ public class TabletServer extends AbstractServer implements TabletHostingServer
       log.warn("Failed to close filesystem : {}", e.getMessage(), e);
     }
 
-    gcLogger.logGCInfo(getConfiguration());
+    context.getLowMemoryDetector().logGCInfo(getConfiguration());
 
     log.info("TServerInfo: stop requested. exiting ... ");
 
@@ -955,12 +956,6 @@ public class TabletServer extends AbstractServer implements TabletHostingServer
     Threads.createThread("Split/MajC initiator", new MajorCompactor(context)).start();
 
     clientAddress = HostAndPort.fromParts(getHostname(), 0);
-
-    Runnable gcDebugTask = () -> gcLogger.logGCInfo(getConfiguration());
-
-    ScheduledFuture<?> future = context.getScheduledExecutor().scheduleWithFixedDelay(gcDebugTask,
-        0, TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
-    watchNonCriticalScheduledTask(future);
   }
 
   public TabletServerStatus getStats(Map<TableId,MapCounter<ScanRunState>> scanCounts) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
index f39df6789f..9957eb1dee 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
@@ -370,6 +370,7 @@ public class CompactionManager {
     public CompactionExecutorId ceid;
     public int running;
     public int queued;
+    public int paused;
   }
 
   public Collection<ExtCompMetric> getExternalMetrics() {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
index eef8c62315..57e33d2334 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
@@ -41,6 +41,8 @@ public class TabletServerScanMetrics implements MetricsProducer {
   private Counter continueScanCalls;
   private Counter closeScanCalls;
   private Counter busyTimeoutReturned;
+  private Counter pausedForMemory;
+  private Counter earlyReturnForMemory;
 
   private final LongAdder lookupCount = new LongAdder();
   private final LongAdder queryResultCount = new LongAdder();
@@ -119,6 +121,14 @@ public class TabletServerScanMetrics implements MetricsProducer {
     busyTimeoutReturned.increment(value);
   }
 
+  public void incrementScanPausedForLowMemory() {
+    pausedForMemory.increment();
+  }
+
+  public void incrementEarlyReturnForLowMemory() {
+    earlyReturnForMemory.increment();
+  }
+
   @Override
   public void registerMetrics(MeterRegistry registry) {
     Gauge.builder(METRICS_SCAN_OPEN_FILES, openFiles::get)
@@ -150,6 +160,12 @@ public class TabletServerScanMetrics implements MetricsProducer {
         .description("Query rate (bytes/sec)").register(registry);
     Gauge.builder(METRICS_TSERVER_SCANNED_ENTRIES, this, TabletServerScanMetrics::getScannedCount)
         .description("Scanned rate").register(registry);
+    pausedForMemory = Counter.builder(METRICS_SCAN_PAUSED_FOR_MEM)
+        .description("scan paused due to server being low on memory")
+        .tags(MetricsUtil.getCommonTags()).register(registry);
+    earlyReturnForMemory = Counter.builder(METRICS_SCAN_RETURN_FOR_MEM)
+        .description("scan returned results early due to server being low on memory")
+        .tags(MetricsUtil.getCommonTags()).register(registry);
   }
 
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
index 195625c32d..bb62af44a0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
@@ -430,7 +430,7 @@ public class CompactableUtils {
 
     FileCompactor compactor = new FileCompactor(tablet.getContext(), tablet.getExtent(),
         compactFiles, tmpFileName, cInfo.propagateDeletes, cenv, cInfo.iters, compactionConfig,
-        tableConf.getCryptoService());
+        tableConf.getCryptoService(), tablet.getPausedCompactionMetrics());
 
     return compactor.call();
   }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
index 7a6ee05b6e..0092da7712 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
@@ -57,7 +57,7 @@ public class MinorCompactor extends FileCompactor {
       TabletFile outputFile, MinorCompactionReason mincReason, TableConfiguration tableConfig) {
     super(tabletServer.getContext(), tablet.getExtent(), Collections.emptyMap(), outputFile, true,
         new MinCEnv(mincReason, imm.compactionIterator()), Collections.emptyList(), tableConfig,
-        tableConfig.getCryptoService());
+        tableConfig.getCryptoService(), tabletServer.getPausedCompactionMetrics());
     this.tabletServer = tabletServer;
     this.mincReason = mincReason;
   }
@@ -146,7 +146,7 @@ public class MinorCompactor extends FileCompactor {
         }
 
         if (isTableDeleting()) {
-          return new CompactionStats(0, 0);
+          return new CompactionStats(0, 0, 0);
         }
 
       } while (true);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 2ec3ad4707..12266f5995 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -91,6 +91,7 @@ import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.volume.Volume;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.compaction.CompactionStats;
+import org.apache.accumulo.server.compaction.PausedCompactionMetrics;
 import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl;
 import org.apache.accumulo.server.fs.VolumeUtil;
 import org.apache.accumulo.server.fs.VolumeUtil.TabletFiles;
@@ -1955,6 +1956,10 @@ public class Tablet extends TabletBase {
     return getTabletServer().getScanMetrics();
   }
 
+  public PausedCompactionMetrics getPausedCompactionMetrics() {
+    return getTabletServer().getPausedCompactionMetrics();
+  }
+
   DatafileManager getDatafileManager() {
     return datafileManager;
   }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
index ce8cddc348..1f54e66eac 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java
@@ -54,6 +54,7 @@ import org.apache.accumulo.core.util.ShutdownUtil;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.fs.TooManyFilesException;
+import org.apache.accumulo.server.mem.LowMemoryDetector.DetectionScope;
 import org.apache.accumulo.tserver.InMemoryMap;
 import org.apache.accumulo.tserver.TabletHostingServer;
 import org.apache.accumulo.tserver.TabletServerResourceManager;
@@ -87,10 +88,13 @@ public abstract class TabletBase {
 
   protected final TableConfiguration tableConfiguration;
 
+  private final boolean isUserTable;
+
   public TabletBase(TabletHostingServer server, KeyExtent extent) {
     this.context = server.getContext();
     this.server = server;
     this.extent = extent;
+    this.isUserTable = !extent.isMeta();
 
     TableConfiguration tblConf = context.getTableConfiguration(extent.tableId());
     if (tblConf == null) {
@@ -229,7 +233,19 @@ public abstract class TabletBase {
   Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, ScanParameters scanParams)
       throws IOException {
 
-    // log.info("In nextBatch..");
+    while (context.getLowMemoryDetector().isRunningLowOnMemory(context, DetectionScope.SCAN, () -> {
+      return isUserTable;
+    }, () -> {
+      log.info("Not starting next batch because low on memory, extent: {}", extent);
+      server.getScanMetrics().incrementScanPausedForLowMemory();
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IllegalStateException(
+            "Interrupted while waiting for low memory condition to resolve", e);
+      }
+    })) {}
 
     long batchTimeOut = scanParams.getBatchTimeOut();
 
@@ -279,7 +295,15 @@ public abstract class TabletBase {
 
       boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) >= timeToRun;
 
-      if (resultSize >= maxResultsSize || results.size() >= scanParams.getMaxEntries() || timesUp) {
+      boolean runningLowOnMemory =
+          context.getLowMemoryDetector().isRunningLowOnMemory(context, DetectionScope.SCAN, () -> {
+            return isUserTable;
+          }, () -> {
+            log.info("Not continuing next batch because low on memory, extent: {}", extent);
+            server.getScanMetrics().incrementEarlyReturnForLowMemory();
+          });
+      if (runningLowOnMemory || resultSize >= maxResultsSize
+          || results.size() >= scanParams.getMaxEntries() || timesUp) {
         continueKey = new Key(key);
         skipContinueKey = true;
         break;
@@ -318,6 +342,20 @@ public abstract class TabletBase {
   private Tablet.LookupResult lookup(SortedKeyValueIterator<Key,Value> mmfi, List<Range> ranges,
       List<KVEntry> results, ScanParameters scanParams, long maxResultsSize) throws IOException {
 
+    while (context.getLowMemoryDetector().isRunningLowOnMemory(context, DetectionScope.SCAN, () -> {
+      return isUserTable;
+    }, () -> {
+      log.info("Not starting lookup because low on memory, extent: {}", extent);
+      server.getScanMetrics().incrementScanPausedForLowMemory();
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IllegalStateException(
+            "Interrupted while waiting for low memory condition to resolve", e);
+      }
+    })) {}
+
     Tablet.LookupResult lookupResult = new Tablet.LookupResult();
 
     boolean exceededMemoryUsage = false;
@@ -346,7 +384,14 @@ public abstract class TabletBase {
 
       boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) > timeToRun;
 
-      if (exceededMemoryUsage || tabletClosed || timesUp || yielded) {
+      boolean runningLowOnMemory =
+          context.getLowMemoryDetector().isRunningLowOnMemory(context, DetectionScope.SCAN, () -> {
+            return isUserTable;
+          }, () -> {
+            log.info("Not continuing lookup because low on memory, extent: {}", extent);
+            server.getScanMetrics().incrementEarlyReturnForLowMemory();
+          });
+      if (runningLowOnMemory || exceededMemoryUsage || tabletClosed || timesUp || yielded) {
         lookupResult.unfinishedRanges.add(range);
         continue;
       }
@@ -377,7 +422,14 @@ public abstract class TabletBase {
 
           timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) > timeToRun;
 
-          if (exceededMemoryUsage || timesUp) {
+          runningLowOnMemory = context.getLowMemoryDetector().isRunningLowOnMemory(context,
+              DetectionScope.SCAN, () -> {
+                return isUserTable;
+              }, () -> {
+                log.info("Not continuing lookup because low on memory, extent: {}", extent);
+                server.getScanMetrics().incrementEarlyReturnForLowMemory();
+              });
+          if (runningLowOnMemory || exceededMemoryUsage || timesUp) {
             addUnfinishedRange(lookupResult, range, key);
             break;
           }
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingIterator.java
new file mode 100644
index 0000000000..3a7f5f653e
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryConsumingIterator.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+public class MemoryConsumingIterator extends WrappingIterator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MemoryConsumingIterator.class);
+
+  private static final List<byte[]> BUFFERS = new ArrayList<>();
+
+  public static void freeBuffers() {
+    BUFFERS.clear();
+  }
+
+  @SuppressFBWarnings(value = "DM_GC", justification = "gc is okay for test")
+  private int getAmountToConsume() {
+    System.gc();
+    Runtime runtime = Runtime.getRuntime();
+    long maxConfiguredMemory = runtime.maxMemory();
+    long allocatedMemory = runtime.totalMemory();
+    long allocatedFreeMemory = runtime.freeMemory();
+    long freeMemory = maxConfiguredMemory - (allocatedMemory - allocatedFreeMemory);
+    long minimumFreeMemoryThreshold =
+        (long) (maxConfiguredMemory * MemoryStarvedScanIT.FREE_MEMORY_THRESHOLD);
+
+    int amountToConsume = 0;
+    if (freeMemory > minimumFreeMemoryThreshold) {
+      amountToConsume = (int) (freeMemory - (minimumFreeMemoryThreshold - 10485760));
+    }
+    if (amountToConsume > Integer.MAX_VALUE) {
+      throw new IllegalStateException(
+          "Unsupported memory size for tablet server when using this iterator");
+    }
+    amountToConsume = Math.max(0, amountToConsume);
+    LOG.info("max: {}, free: {}, minFree: {}, amountToConsume: {}", maxConfiguredMemory, freeMemory,
+        minimumFreeMemoryThreshold, amountToConsume);
+    return amountToConsume;
+  }
+
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+      throws IOException {
+    LOG.info("seek called");
+    while (!this.isRunningLowOnMemory()) {
+      int amountToConsume = getAmountToConsume();
+      if (amountToConsume > 0) {
+        LOG.info("allocating memory: " + amountToConsume);
+        BUFFERS.add(new byte[amountToConsume]);
+        LOG.info("memory allocated");
+      } else {
+        LOG.info("Waiting for LowMemoryDetector to recognize low on memory condition.");
+      }
+      Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+    }
+    LOG.info("Running low on memory == true");
+    super.seek(range, columnFamilies, inclusive);
+  }
+
+}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironment.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java
similarity index 58%
copy from server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironment.java
copy to test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java
index 72daff609c..dde5e1c14b 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironment.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java
@@ -16,18 +16,31 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.server.iterators;
+package org.apache.accumulo.test.functional;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.core.iterators.WrappingIterator;
 
-public interface SystemIteratorEnvironment extends IteratorEnvironment {
+import com.google.common.util.concurrent.Uninterruptibles;
 
-  ServerContext getServerContext();
+public class MemoryFreeingIterator extends WrappingIterator {
 
-  SortedKeyValueIterator<Key,Value> getTopLevelIterator(SortedKeyValueIterator<Key,Value> iter);
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+      IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+    MemoryConsumingIterator.freeBuffers();
+    while (this.isRunningLowOnMemory()) {
+      // wait for LowMemoryDetector to recognize the memory is free.
+      Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+    }
+  }
 
 }
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
new file mode 100644
index 0000000000..da4c98a7a5
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.DoubleAdder;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
+import org.apache.accumulo.test.metrics.TestStatsDSink;
+import org.apache.accumulo.test.metrics.TestStatsDSink.Metric;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class MemoryStarvedMajCIT extends SharedMiniClusterBase {
+
+  public static class MemoryStarvedITConfiguration implements MiniClusterConfigurationCallback {
+
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
+      cfg.setNumTservers(1);
+      cfg.setMemory(ServerType.TABLET_SERVER, 256, MemoryUnit.MEGABYTE);
+      // Configure the LowMemoryDetector in the TabletServer
+      // check on 1s intervals and set low mem condition if more than 80% of
+      // the heap is used.
+      cfg.setProperty(Property.GENERAL_LOW_MEM_DETECTOR_INTERVAL, "5s");
+      cfg.setProperty(Property.GENERAL_LOW_MEM_DETECTOR_THRESHOLD,
+          Double.toString(MemoryStarvedScanIT.FREE_MEMORY_THRESHOLD));
+      cfg.setProperty(Property.GENERAL_LOW_MEM_MAJC_PROTECTION, "true");
+      // Tell the server processes to use a StatsDMeterRegistry that will be configured
+      // to push all metrics to the sink we started.
+      cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true");
+      cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY,
+          TestStatsDRegistryFactory.class.getName());
+      Map<String,String> sysProps = Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1",
+          TestStatsDRegistryFactory.SERVER_PORT, Integer.toString(sink.getPort()));
+      cfg.setSystemProperties(sysProps);
+    }
+  }
+
+  private static final DoubleAdder MAJC_PAUSED = new DoubleAdder();
+  private static TestStatsDSink sink;
+  private static Thread metricConsumer;
+
+  @BeforeAll
+  public static void start() throws Exception {
+    sink = new TestStatsDSink();
+    metricConsumer = new Thread(() -> {
+      while (!Thread.currentThread().isInterrupted()) {
+        List<String> statsDMetrics = sink.getLines();
+        for (String line : statsDMetrics) {
+          if (Thread.currentThread().isInterrupted()) {
+            break;
+          }
+          if (line.startsWith("accumulo")) {
+            Metric metric = TestStatsDSink.parseStatsDMetric(line);
+            if (MetricsProducer.METRICS_MAJC_PAUSED.equals(metric.getName())) {
+              Double val = Double.parseDouble(metric.getValue());
+              MAJC_PAUSED.add(val);
+            }
+          }
+        }
+      }
+    });
+    metricConsumer.start();
+
+    SharedMiniClusterBase.startMiniClusterWithConfig(new MemoryStarvedITConfiguration());
+  }
+
+  @AfterAll
+  public static void stop() throws Exception {
+    SharedMiniClusterBase.stopMiniCluster();
+    sink.close();
+    metricConsumer.interrupt();
+    metricConsumer.join();
+  }
+
+  @BeforeEach
+  public void beforeEach() throws Exception {
+    // Reset the client side counters
+    MAJC_PAUSED.reset();
+  }
+
+  @Test
+  public void testMajCPauses() throws Exception {
+    String table = getUniqueNames(1)[0];
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+
+      TableOperations to = client.tableOperations();
+      to.create(table);
+
+      // Add a small amount of data so that the MemoryConsumingIterator
+      // returns true when trying to consume all of the memory.
+      ReadWriteIT.ingest(client, 1, 1, 1, 0, table);
+
+      AtomicReference<Throwable> error = new AtomicReference<>();
+      Thread compactionThread = new Thread(() -> {
+        try {
+          to.compact(table, new CompactionConfig().setWait(false));
+        } catch (Exception e) {
+          error.set(e);
+        }
+      });
+
+      try (Scanner scanner = client.createScanner(table)) {
+
+        MemoryStarvedScanIT.consumeServerMemory(scanner, table);
+
+        Double paused = MAJC_PAUSED.doubleValue();
+        assertEquals(0, paused);
+
+        ReadWriteIT.ingest(client, 100, 100, 100, 0, table);
+        compactionThread.start();
+
+        while (paused == 0) {
+          Thread.sleep(1000);
+          paused = MAJC_PAUSED.doubleValue();
+        }
+        assertTrue(paused > 0);
+
+        MemoryStarvedScanIT.freeServerMemory(client, table);
+        compactionThread.interrupt();
+        compactionThread.join();
+        assertNull(error.get());
+        assertTrue(client.instanceOperations().getActiveCompactions().stream()
+            .filter(ac -> ac.getPausedCount() > 0).findAny().isPresent());
+      }
+    }
+
+  }
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java
new file mode 100644
index 0000000000..4c91c17635
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.DoubleAdder;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
+import org.apache.accumulo.test.metrics.TestStatsDSink;
+import org.apache.accumulo.test.metrics.TestStatsDSink.Metric;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class MemoryStarvedMinCIT extends SharedMiniClusterBase {
+
+  public static class MemoryStarvedITConfiguration implements MiniClusterConfigurationCallback {
+
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
+      cfg.setNumTservers(1);
+      cfg.setMemory(ServerType.TABLET_SERVER, 256, MemoryUnit.MEGABYTE);
+      // Configure the LowMemoryDetector in the TabletServer
+      // check on 1s intervals and set low mem condition if more than 80% of
+      // the heap is used.
+      cfg.setProperty(Property.GENERAL_LOW_MEM_DETECTOR_INTERVAL, "5s");
+      cfg.setProperty(Property.GENERAL_LOW_MEM_DETECTOR_THRESHOLD,
+          Double.toString(MemoryStarvedScanIT.FREE_MEMORY_THRESHOLD));
+      cfg.setProperty(Property.GENERAL_LOW_MEM_MINC_PROTECTION, "true");
+      // Tell the server processes to use a StatsDMeterRegistry that will be configured
+      // to push all metrics to the sink we started.
+      cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true");
+      cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY,
+          TestStatsDRegistryFactory.class.getName());
+      Map<String,String> sysProps = Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1",
+          TestStatsDRegistryFactory.SERVER_PORT, Integer.toString(sink.getPort()));
+      cfg.setSystemProperties(sysProps);
+    }
+  }
+
+  private static final DoubleAdder MINC_PAUSED = new DoubleAdder();
+  private static TestStatsDSink sink;
+  private static Thread metricConsumer;
+
+  @BeforeAll
+  public static void start() throws Exception {
+    sink = new TestStatsDSink();
+    metricConsumer = new Thread(() -> {
+      while (!Thread.currentThread().isInterrupted()) {
+        List<String> statsDMetrics = sink.getLines();
+        for (String line : statsDMetrics) {
+          if (Thread.currentThread().isInterrupted()) {
+            break;
+          }
+          if (line.startsWith("accumulo")) {
+            Metric metric = TestStatsDSink.parseStatsDMetric(line);
+            if (MetricsProducer.METRICS_MINC_PAUSED.equals(metric.getName())) {
+              Double val = Double.parseDouble(metric.getValue());
+              MINC_PAUSED.add(val);
+            }
+          }
+        }
+      }
+    });
+    metricConsumer.start();
+
+    SharedMiniClusterBase.startMiniClusterWithConfig(new MemoryStarvedITConfiguration());
+  }
+
+  @AfterAll
+  public static void stop() throws Exception {
+    SharedMiniClusterBase.stopMiniCluster();
+    sink.close();
+    metricConsumer.interrupt();
+    metricConsumer.join();
+  }
+
+  @BeforeEach
+  public void beforeEach() throws Exception {
+    // Reset the client side counters
+    MINC_PAUSED.reset();
+  }
+
+  @Test
+  public void testMinCPauses() throws Exception {
+
+    String table = getUniqueNames(1)[0];
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+
+      TableOperations to = client.tableOperations();
+      to.create(table);
+
+      // Add a small amount of data so that the MemoryConsumingIterator
+      // returns true when trying to consume all of the memory.
+      ReadWriteIT.ingest(client, 1, 1, 1, 0, table);
+
+      AtomicReference<Throwable> error = new AtomicReference<>();
+      Thread ingestThread = new Thread(() -> {
+        try {
+          ReadWriteIT.ingest(client, 100, 100, 100, 0, table);
+          to.flush(table);
+        } catch (Exception e) {
+          error.set(e);
+        }
+      });
+
+      try (Scanner scanner = client.createScanner(table)) {
+
+        MemoryStarvedScanIT.consumeServerMemory(scanner, table);
+
+        Double paused = MINC_PAUSED.doubleValue();
+        assertEquals(0, paused);
+
+        ingestThread.start();
+
+        while (paused == 0) {
+          Thread.sleep(1000);
+          paused = MINC_PAUSED.doubleValue();
+        }
+        assertTrue(paused > 0);
+
+        MemoryStarvedScanIT.freeServerMemory(client, table);
+        ingestThread.interrupt();
+        ingestThread.join();
+        assertNull(error.get());
+        assertTrue(client.instanceOperations().getActiveCompactions().stream()
+            .filter(ac -> ac.getPausedCount() > 0).collect(Collectors.toList()).size() > 0);
+      }
+    }
+  }
+
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
new file mode 100644
index 0000000000..ea67ef3734
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.DoubleAdder;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
+import org.apache.accumulo.test.metrics.TestStatsDSink;
+import org.apache.accumulo.test.metrics.TestStatsDSink.Metric;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class MemoryStarvedScanIT extends SharedMiniClusterBase {
+
+  public static class MemoryStarvedITConfiguration implements MiniClusterConfigurationCallback {
+
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
+      cfg.setNumTservers(1);
+      cfg.setMemory(ServerType.TABLET_SERVER, 256, MemoryUnit.MEGABYTE);
+      // Configure the LowMemoryDetector in the TabletServer
+      // check on 1s intervals and set low mem condition if more than 80% of
+      // the heap is used.
+      cfg.setProperty(Property.GENERAL_LOW_MEM_DETECTOR_INTERVAL, "5s");
+      cfg.setProperty(Property.GENERAL_LOW_MEM_DETECTOR_THRESHOLD,
+          Double.toString(FREE_MEMORY_THRESHOLD));
+      cfg.setProperty(Property.GENERAL_LOW_MEM_SCAN_PROTECTION, "true");
+      // Tell the server processes to use a StatsDMeterRegistry that will be configured
+      // to push all metrics to the sink we started.
+      cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true");
+      cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY,
+          TestStatsDRegistryFactory.class.getName());
+      Map<String,String> sysProps = Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1",
+          TestStatsDRegistryFactory.SERVER_PORT, Integer.toString(sink.getPort()));
+      cfg.setSystemProperties(sysProps);
+    }
+  }
+
+  public static final Double FREE_MEMORY_THRESHOLD = 0.20D;
+
+  private static final DoubleAdder SCAN_START_DELAYED = new DoubleAdder();
+  private static final DoubleAdder SCAN_RETURNED_EARLY = new DoubleAdder();
+  private static TestStatsDSink sink;
+  private static Thread metricConsumer;
+
+  @BeforeAll
+  public static void start() throws Exception {
+    sink = new TestStatsDSink();
+    metricConsumer = new Thread(() -> {
+      while (!Thread.currentThread().isInterrupted()) {
+        List<String> statsDMetrics = sink.getLines();
+        for (String line : statsDMetrics) {
+          if (Thread.currentThread().isInterrupted()) {
+            break;
+          }
+          if (line.startsWith("accumulo")) {
+            Metric metric = TestStatsDSink.parseStatsDMetric(line);
+            if (MetricsProducer.METRICS_SCAN_PAUSED_FOR_MEM.equals(metric.getName())) {
+              Double val = Double.parseDouble(metric.getValue());
+              SCAN_START_DELAYED.add(val);
+            } else if (MetricsProducer.METRICS_SCAN_RETURN_FOR_MEM.equals(metric.getName())) {
+              Double val = Double.parseDouble(metric.getValue());
+              SCAN_RETURNED_EARLY.add(val);
+            }
+          }
+        }
+      }
+    });
+    metricConsumer.start();
+
+    SharedMiniClusterBase.startMiniClusterWithConfig(new MemoryStarvedITConfiguration());
+  }
+
+  @AfterAll
+  public static void stop() throws Exception {
+    SharedMiniClusterBase.stopMiniCluster();
+    sink.close();
+    metricConsumer.interrupt();
+    metricConsumer.join();
+  }
+
+  @BeforeEach
+  public void beforeEach() throws Exception {
+    // Reset the client side counters
+    SCAN_START_DELAYED.reset();
+    SCAN_START_DELAYED.reset();
+  }
+
+  static void consumeServerMemory(Scanner scanner, String table) throws Exception {
+    // This iterator will attempt to consume all free memory in the TabletServer
+    scanner.addScanIterator(new IteratorSetting(11, MemoryConsumingIterator.class, Map.of()));
+    scanner.setBatchSize(1);
+    // Set the ReadaheadThreshold to a large number so that another background thread
+    // that performs read-ahead of KV pairs is not started.
+    scanner.setReadaheadThreshold(Long.MAX_VALUE);
+    Iterator<Entry<Key,Value>> iter = scanner.iterator();
+    // This should block until the GarbageCollectionLogger runs and notices that the
+    // VM is low on memory.
+    assertTrue(iter.hasNext());
+  }
+
+  private void consumeServerMemory(BatchScanner scanner, String table) throws Exception {
+    // This iterator will attempt to consume all free memory in the TabletServer
+    scanner.addScanIterator(new IteratorSetting(11, MemoryConsumingIterator.class, Map.of()));
+    scanner.setRanges(Collections.singletonList(new Range()));
+    Iterator<Entry<Key,Value>> iter = scanner.iterator();
+    // This should block until the GarbageCollectionLogger runs and notices that the
+    // VM is low on memory.
+    assertTrue(iter.hasNext());
+  }
+
+  static void freeServerMemory(AccumuloClient client, String table) throws Exception {
+    try (Scanner scanner = client.createScanner(table)) {
+      scanner.addScanIterator(new IteratorSetting(11, MemoryFreeingIterator.class, Map.of()));
+      @SuppressWarnings("unused")
+      Iterator<Entry<Key,Value>> iter = scanner.iterator(); // init'ing the iterator should be
+                                                            // enough to free the memory
+    }
+  }
+
+  @Test
+  public void testScanReturnsEarlyDueToLowMemory() throws Exception {
+
+    String table = getUniqueNames(1)[0];
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+
+      TableOperations to = client.tableOperations();
+      to.create(table);
+
+      ReadWriteIT.ingest(client, 10, 10, 10, 0, table);
+
+      try (Scanner scanner = client.createScanner(table)) {
+        Double returned = SCAN_RETURNED_EARLY.doubleValue();
+        Double paused = SCAN_START_DELAYED.doubleValue();
+
+        consumeServerMemory(scanner, table);
+
+        // Wait for longer than the memory check interval
+        Thread.sleep(6000);
+
+        // The metric that indicates a scan was returned early due to low memory should
+        // have been incremented.
+        assertTrue(SCAN_RETURNED_EARLY.doubleValue() > returned);
+        assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
+        freeServerMemory(client, table);
+      } finally {
+        to.delete(table);
+      }
+    }
+  }
+
+  @Test
+  public void testScanPauses() throws Exception {
+
+    String table = getUniqueNames(1)[0];
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+
+      TableOperations to = client.tableOperations();
+      to.create(table);
+
+      ReadWriteIT.ingest(client, 10, 3, 10, 0, table);
+
+      try (Scanner dataConsumingScanner = client.createScanner(table);
+          Scanner memoryConsumingScanner = client.createScanner(table)) {
+
+        dataConsumingScanner.addScanIterator(
+            new IteratorSetting(11, SlowIterator.class, Map.of("sleepTime", "500")));
+        dataConsumingScanner.setBatchSize(1);
+        dataConsumingScanner.setReadaheadThreshold(Long.MAX_VALUE);
+        Iterator<Entry<Key,Value>> iter = dataConsumingScanner.iterator();
+        AtomicInteger fetched = new AtomicInteger(0);
+        Thread t = new Thread(() -> {
+          int i = 0;
+          while (iter.hasNext()) {
+            iter.next();
+            fetched.set(++i);
+          }
+        });
+
+        memoryConsumingScanner
+            .addScanIterator(new IteratorSetting(11, MemoryConsumingIterator.class, Map.of()));
+        memoryConsumingScanner.setBatchSize(1);
+        memoryConsumingScanner.setReadaheadThreshold(Long.MAX_VALUE);
+
+        t.start();
+
+        // Wait until the dataConsumingScanner has started fetching data
+        int currentCount = fetched.get();
+        while (currentCount == 0) {
+          Thread.sleep(500);
+          currentCount = fetched.get();
+        }
+
+        // This should block until the GarbageCollectionLogger runs and notices that the
+        // VM is low on memory.
+        Iterator<Entry<Key,Value>> consumingIter = memoryConsumingScanner.iterator();
+        assertTrue(consumingIter.hasNext());
+
+        // Confirm that some data was fetched by the memoryConsumingScanner
+        currentCount = fetched.get();
+        assertTrue(currentCount > 0 && currentCount < 100);
+
+        // Grab the current metric counts, wait
+        Double returned = SCAN_RETURNED_EARLY.doubleValue();
+        Double paused = SCAN_START_DELAYED.doubleValue();
+        Thread.sleep(1500);
+        // One of two conditions could exist here:
+        // The number of fetched rows equals the current count before the wait above
+        // and the SCAN_START_DELAYED has been incremented OR the number of fetched
+        // rows is one more than the current count and the SCAN_RETURNED_EARLY has
+        // been incremented.
+        assertTrue((currentCount == fetched.get() && SCAN_START_DELAYED.doubleValue() > paused)
+            || (currentCount + 1 == fetched.get() && SCAN_RETURNED_EARLY.doubleValue() > returned));
+        currentCount = fetched.get();
+
+        // Perform the check again
+        paused = SCAN_START_DELAYED.doubleValue();
+        returned = SCAN_RETURNED_EARLY.doubleValue();
+        Thread.sleep(1500);
+        assertEquals(currentCount, fetched.get());
+
+        // Free the memory which will allow the pausing scanner to continue
+        freeServerMemory(client, table);
+
+        t.join();
+        assertEquals(30, fetched.get());
+      } finally {
+        to.delete(table);
+      }
+    }
+  }
+
+  @Test
+  public void testBatchScanReturnsEarlyDueToLowMemory() throws Exception {
+
+    String table = getUniqueNames(1)[0];
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+
+      TableOperations to = client.tableOperations();
+      to.create(table);
+
+      ReadWriteIT.ingest(client, 10, 10, 10, 0, table);
+
+      try (BatchScanner scanner = client.createBatchScanner(table,
+          client.securityOperations().getUserAuthorizations(client.whoami()), 1)) {
+        Double returned = SCAN_RETURNED_EARLY.doubleValue();
+        Double paused = SCAN_START_DELAYED.doubleValue();
+
+        consumeServerMemory(scanner, table);
+
+        // Wait for longer than the memory check interval
+        Thread.sleep(6000);
+
+        // The metric that indicates a scan was returned early due to low memory should
+        // have been incremented.
+        assertTrue(SCAN_RETURNED_EARLY.doubleValue() > returned);
+        assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
+        freeServerMemory(client, table);
+      } finally {
+        to.delete(table);
+      }
+    }
+  }
+
+  @Test
+  public void testBatchScanPauses() throws Exception {
+
+    String table = getUniqueNames(1)[0];
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+
+      TableOperations to = client.tableOperations();
+      to.create(table);
+
+      ReadWriteIT.ingest(client, 10, 3, 10, 0, table);
+
+      try (BatchScanner dataConsumingScanner = client.createBatchScanner(table);
+          Scanner memoryConsumingScanner = client.createScanner(table)) {
+
+        dataConsumingScanner.addScanIterator(
+            new IteratorSetting(11, SlowIterator.class, Map.of("sleepTime", "500")));
+        dataConsumingScanner.setRanges(Collections.singletonList(new Range()));
+        Iterator<Entry<Key,Value>> iter = dataConsumingScanner.iterator();
+        AtomicInteger fetched = new AtomicInteger(0);
+        Thread t = new Thread(() -> {
+          int i = 0;
+          while (iter.hasNext()) {
+            iter.next();
+            fetched.set(++i);
+          }
+        });
+
+        memoryConsumingScanner
+            .addScanIterator(new IteratorSetting(11, MemoryConsumingIterator.class, Map.of()));
+        memoryConsumingScanner.setBatchSize(1);
+        memoryConsumingScanner.setReadaheadThreshold(Long.MAX_VALUE);
+
+        t.start();
+
+        // Wait until the dataConsumingScanner has started fetching data
+        int currentCount = fetched.get();
+        while (currentCount == 0) {
+          Thread.sleep(500);
+          currentCount = fetched.get();
+        }
+
+        // This should block until the GarbageCollectionLogger runs and notices that the
+        // VM is low on memory.
+        Iterator<Entry<Key,Value>> consumingIter = memoryConsumingScanner.iterator();
+        assertTrue(consumingIter.hasNext());
+
+        // Confirm that some data was fetched by the dataConsumingScanner
+        currentCount = fetched.get();
+        assertTrue(currentCount > 0 && currentCount < 100);
+
+        // Grab the current paused count, wait two seconds and then confirm that
+        // the number of rows fetched by the memoryConsumingScanner has not increased
+        // and that the scan delay counter has increased.
+        Double returned = SCAN_RETURNED_EARLY.doubleValue();
+        Double paused = SCAN_START_DELAYED.doubleValue();
+        Thread.sleep(1500);
+        assertEquals(currentCount, fetched.get());
+        assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
+        assertTrue(SCAN_RETURNED_EARLY.doubleValue() >= returned);
+
+        // Perform the check again
+        paused = SCAN_START_DELAYED.doubleValue();
+        returned = SCAN_RETURNED_EARLY.doubleValue();
+        Thread.sleep(1500);
+        assertEquals(currentCount, fetched.get());
+        assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
+        assertTrue(SCAN_RETURNED_EARLY.doubleValue() == returned);
+
+        // Free the memory which will allow the pausing scanner to continue
+        freeServerMemory(client, table);
+
+        t.join();
+        assertEquals(30, fetched.get());
+      } finally {
+        to.delete(table);
+      }
+    }
+  }
+
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
index 48a21f63cf..6aae998a11 100644
--- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
@@ -90,7 +90,8 @@ public class MetricsIT extends ConfigurableMacBase implements MetricsProducer {
     cluster.stop();
 
     Set<String> unexpectedMetrics = Set.of(METRICS_SCAN_YIELDS, METRICS_UPDATE_ERRORS,
-        METRICS_COMPACTOR_MAJC_STUCK, METRICS_SCAN_BUSY_TIMEOUT);
+        METRICS_COMPACTOR_MAJC_STUCK, METRICS_SCAN_BUSY_TIMEOUT, METRICS_SCAN_PAUSED_FOR_MEM,
+        METRICS_SCAN_RETURN_FOR_MEM, METRICS_MINC_PAUSED, METRICS_MAJC_PAUSED);
     Set<String> flakyMetrics = Set.of(METRICS_GC_WAL_ERRORS, METRICS_FATE_TYPE_IN_PROGRESS,
         METRICS_PROPSTORE_EVICTION_COUNT, METRICS_PROPSTORE_REFRESH_COUNT,
         METRICS_PROPSTORE_REFRESH_LOAD_COUNT, METRICS_PROPSTORE_ZK_ERROR_COUNT);