You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ed...@apache.org on 2023/05/01 18:31:27 UTC

[accumulo] branch main updated: add low memory metric to abstract server (#3288)

This is an automated email from the ASF dual-hosted git repository.

edcoleman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new c0586be591 add low memory metric to abstract server (#3288)
c0586be591 is described below

commit c0586be591d1fd8492aaa4577194967c2403abb7
Author: EdColeman <de...@etcoleman.com>
AuthorDate: Mon May 1 14:31:22 2023 -0400

    add low memory metric to abstract server (#3288)
    
    Add low memory metric to abstract server
---
 .../accumulo/core/metrics/MetricsProducer.java     | 10 +++-
 .../org/apache/accumulo/server/AbstractServer.java | 23 +++++++--
 .../accumulo/server/metrics/ProcessMetrics.java    | 44 ++++++++++++++++
 .../apache/accumulo/server/rpc/TimedProcessor.java |  2 +-
 .../coordinator/CompactionCoordinator.java         |  1 +
 .../org/apache/accumulo/compactor/Compactor.java   |  5 +-
 .../apache/accumulo/gc/SimpleGarbageCollector.java |  2 +-
 .../java/org/apache/accumulo/manager/Manager.java  |  3 +-
 .../accumulo/manager/metrics/ManagerMetrics.java   | 22 ++++----
 .../org/apache/accumulo/tserver/ScanServer.java    |  4 +-
 .../org/apache/accumulo/tserver/TabletServer.java  | 19 +++----
 .../test/functional/MemoryStarvedMajCIT.java       | 15 +++---
 .../test/functional/MemoryStarvedMinCIT.java       | 17 +++---
 .../test/functional/MemoryStarvedScanIT.java       | 60 +++++++++++++++-------
 14 files changed, 162 insertions(+), 65 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
index e0b1bd65a0..86e110d057 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
@@ -35,7 +35,7 @@ import io.micrometer.core.instrument.MeterRegistry;
  * <a href="https://micrometer.io/">Micrometer</a>. Micrometer suggests using a particular
  * <a href="https://micrometer.io/docs/concepts#_naming_meters">naming convention</a> for the
  * metrics. The table below contains a mapping of the old to new metric names.
- *
+ * <p>
  * <table border="1">
  * <caption>Summary of Metric Changes</caption> <!-- fate -->
  * <tr>
@@ -48,6 +48,13 @@ import io.micrometer.core.instrument.MeterRegistry;
  * <tr>
  * <td>N/A</td>
  * <td>N/A</td>
+ * <td>{@link #METRICS_LOW_MEMORY}</td>
+ * <td>Guage</td>
+ * <td>reports 1 when process memory usage is above threshold, 0 when memory is okay</td>
+ * </tr>
+ * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
  * <td>{@link #METRICS_COMPACTOR_MAJC_STUCK}</td>
  * <td>LongTaskTimer</td>
  * <td></td>
@@ -594,6 +601,7 @@ public interface MetricsProducer {
 
   Logger LOG = LoggerFactory.getLogger(MetricsProducer.class);
 
+  String METRICS_LOW_MEMORY = "accumulo.detected.low.memory";
   String METRICS_COMPACTOR_PREFIX = "accumulo.compactor";
   String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + "majc.stuck";
 
diff --git a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
index 6b47fd5fea..e7f8efba26 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
@@ -27,29 +27,34 @@ import org.apache.accumulo.core.classloader.ClassLoaderUtil;
 import org.apache.accumulo.core.cli.ConfigOpts;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metrics.MetricsProducer;
 import org.apache.accumulo.core.metrics.MetricsUtil;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.server.mem.LowMemoryDetector;
+import org.apache.accumulo.server.metrics.ProcessMetrics;
 import org.apache.accumulo.server.security.SecurityUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractServer implements AutoCloseable, Runnable {
+import io.micrometer.core.instrument.MeterRegistry;
+
+public abstract class AbstractServer implements AutoCloseable, MetricsProducer, Runnable {
 
   private final ServerContext context;
   protected final String applicationName;
   private final String hostname;
-  private final Logger log;
+
+  private final ProcessMetrics processMetrics;
 
   protected AbstractServer(String appName, ConfigOpts opts, String[] args) {
-    this.log = LoggerFactory.getLogger(getClass().getName());
     this.applicationName = appName;
     opts.parseArgs(appName, args);
     var siteConfig = opts.getSiteConfiguration();
     this.hostname = siteConfig.get(Property.GENERAL_PROCESS_BIND_ADDRESS);
     SecurityUtil.serverLogin(siteConfig);
     context = new ServerContext(siteConfig);
+    Logger log = LoggerFactory.getLogger(getClass());
     log.info("Version " + Constants.VERSION);
     log.info("Instance " + context.getInstanceID());
     context.init(appName);
@@ -64,6 +69,7 @@ public abstract class AbstractServer implements AutoCloseable, Runnable {
         () -> lmd.logGCInfo(context.getConfiguration()), 0,
         lmd.getIntervalMillis(context.getConfiguration()), TimeUnit.MILLISECONDS);
     ThreadPools.watchNonCriticalScheduledTask(future);
+    processMetrics = new ProcessMetrics(context);
   }
 
   /**
@@ -87,6 +93,17 @@ public abstract class AbstractServer implements AutoCloseable, Runnable {
     }
   }
 
+  /**
+   * Called
+   */
+  @Override
+  public void registerMetrics(MeterRegistry registry) {
+    // makes mocking subclasses easier
+    if (processMetrics != null) {
+      processMetrics.registerMetrics(registry);
+    }
+  }
+
   public String getHostname() {
     return hostname;
   }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java
new file mode 100644
index 0000000000..8b44525c53
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.metrics;
+
+import java.util.List;
+
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.server.ServerContext;
+
+import io.micrometer.core.instrument.MeterRegistry;
+
+public class ProcessMetrics implements MetricsProducer {
+
+  private final ServerContext context;
+
+  public ProcessMetrics(final ServerContext context) {
+    this.context = context;
+  }
+
+  @Override
+  public void registerMetrics(MeterRegistry registry) {
+    registry.gauge(METRICS_LOW_MEMORY, List.of(), this, this::lowMemDetected);
+  }
+
+  private int lowMemDetected(ProcessMetrics processMetrics) {
+    return context.getLowMemoryDetector().isRunningLowOnMemory() ? 1 : 0;
+  }
+}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
index 0307c42c07..165479a71f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
@@ -33,7 +33,7 @@ public class TimedProcessor implements TProcessor {
 
   private final TProcessor other;
   private final ThriftMetrics thriftMetrics;
-  private long idleStart = 0;
+  private long idleStart;
 
   public TimedProcessor(TProcessor next) {
     this.other = next;
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 2edbc368c5..baabd5aa76 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -267,6 +267,7 @@ public class CompactionCoordinator extends AbstractServer
     try {
       MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName,
           clientAddress);
+      MetricsUtil.initializeProducers(this);
     } catch (ClassNotFoundException | InstantiationException | IllegalAccessException
         | IllegalArgumentException | InvocationTargetException | NoSuchMethodException
         | SecurityException e1) {
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 5b2aa4579c..b33b92431a 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -168,6 +168,7 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac
 
   @Override
   public void registerMetrics(MeterRegistry registry) {
+    super.registerMetrics(registry);
     LongTaskTimer timer = LongTaskTimer.builder(METRICS_COMPACTOR_MAJC_STUCK)
         .description("Number and duration of stuck major compactions").register(registry);
     CompactionWatcher.setTimer(timer);
@@ -597,13 +598,13 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac
     try {
       MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName,
           clientAddress);
+      pausedMetrics = new PausedCompactionMetrics();
+      MetricsUtil.initializeProducers(this, pausedMetrics);
     } catch (ClassNotFoundException | InstantiationException | IllegalAccessException
         | IllegalArgumentException | InvocationTargetException | NoSuchMethodException
         | SecurityException e1) {
       LOG.error("Error initializing metrics, metrics will not be emitted.", e1);
     }
-    pausedMetrics = new PausedCompactionMetrics();
-    MetricsUtil.initializeProducers(this, pausedMetrics);
 
     LOG.info("Compactor started, waiting for work");
     try {
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index d307f86d96..bc641daf34 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -167,7 +167,7 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface {
 
     try {
       MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName, address);
-      MetricsUtil.initializeProducers(new GcMetrics(this));
+      MetricsUtil.initializeProducers(this, new GcMetrics(this));
     } catch (ClassNotFoundException | InstantiationException | IllegalAccessException
         | IllegalArgumentException | InvocationTargetException | NoSuchMethodException
         | SecurityException e1) {
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 90dbaba0cb..10baa3b15b 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -1107,7 +1107,8 @@ public class Manager extends AbstractServer
     try {
       MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName,
           sa.getAddress());
-      ManagerMetrics.init(getConfiguration(), this);
+      ManagerMetrics mm = new ManagerMetrics(getConfiguration(), this);
+      MetricsUtil.initializeProducers(this, mm);
     } catch (ClassNotFoundException | InstantiationException | IllegalAccessException
         | IllegalArgumentException | InvocationTargetException | NoSuchMethodException
         | SecurityException e1) {
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
index b5a3a54edf..285df23a69 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
@@ -22,21 +22,25 @@ import static java.util.Objects.requireNonNull;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.core.metrics.MetricsProducer;
 import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.manager.metrics.fate.FateMetrics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class ManagerMetrics {
+import io.micrometer.core.instrument.MeterRegistry;
 
-  private final static Logger log = LoggerFactory.getLogger(ManagerMetrics.class);
+public class ManagerMetrics implements MetricsProducer {
 
-  public static void init(AccumuloConfiguration conf, Manager m) {
+  private final FateMetrics fateMetrics;
+
+  public ManagerMetrics(final AccumuloConfiguration conf, final Manager manager) {
     requireNonNull(conf, "AccumuloConfiguration must not be null");
-    MetricsUtil.initializeProducers(new FateMetrics(m.getContext(),
-        conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)));
-    log.info("Registered FATE metrics module");
+    requireNonNull(conf, "Manager must not be null");
+    fateMetrics = new FateMetrics(manager.getContext(),
+        conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL));
   }
 
+  @Override
+  public void registerMetrics(MeterRegistry registry) {
+    fateMetrics.registerMetrics(registry);
+  }
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 12ea616739..fa863394af 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -357,13 +357,13 @@ public class ScanServer extends AbstractServer
     try {
       MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName,
           clientAddress);
+      scanMetrics = new TabletServerScanMetrics();
+      MetricsUtil.initializeProducers(this, scanMetrics);
     } catch (ClassNotFoundException | InstantiationException | IllegalAccessException
         | IllegalArgumentException | InvocationTargetException | NoSuchMethodException
         | SecurityException e1) {
       LOG.error("Error initializing metrics, metrics will not be emitted.", e1);
     }
-    scanMetrics = new TabletServerScanMetrics();
-    MetricsUtil.initializeProducers(scanMetrics);
 
     // We need to set the compaction manager so that we don't get an NPE in CompactableImpl.close
 
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 7ab576fe43..ebc19875eb 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -710,21 +710,22 @@ public class TabletServer extends AbstractServer implements TabletHostingServer
     try {
       MetricsUtil.initializeMetrics(context.getConfiguration(), this.applicationName,
           clientAddress);
+
+      metrics = new TabletServerMetrics(this);
+      updateMetrics = new TabletServerUpdateMetrics();
+      scanMetrics = new TabletServerScanMetrics();
+      mincMetrics = new TabletServerMinCMetrics();
+      ceMetrics = new CompactionExecutorsMetrics();
+      pausedMetrics = new PausedCompactionMetrics();
+      MetricsUtil.initializeProducers(this, metrics, updateMetrics, scanMetrics, mincMetrics,
+          ceMetrics, pausedMetrics);
+
     } catch (ClassNotFoundException | InstantiationException | IllegalAccessException
         | IllegalArgumentException | InvocationTargetException | NoSuchMethodException
         | SecurityException e1) {
       log.error("Error initializing metrics, metrics will not be emitted.", e1);
     }
 
-    metrics = new TabletServerMetrics(this);
-    updateMetrics = new TabletServerUpdateMetrics();
-    scanMetrics = new TabletServerScanMetrics();
-    mincMetrics = new TabletServerMinCMetrics();
-    ceMetrics = new CompactionExecutorsMetrics();
-    pausedMetrics = new PausedCompactionMetrics();
-    MetricsUtil.initializeProducers(metrics, updateMetrics, scanMetrics, mincMetrics, ceMetrics,
-        pausedMetrics);
-
     this.compactionManager = new CompactionManager(() -> Iterators
         .transform(onlineTablets.snapshot().values().iterator(), Tablet::asCompactable),
         getContext(), ceMetrics);
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
index da4c98a7a5..e4ded303e6 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
@@ -91,7 +91,7 @@ public class MemoryStarvedMajCIT extends SharedMiniClusterBase {
           if (line.startsWith("accumulo")) {
             Metric metric = TestStatsDSink.parseStatsDMetric(line);
             if (MetricsProducer.METRICS_MAJC_PAUSED.equals(metric.getName())) {
-              Double val = Double.parseDouble(metric.getValue());
+              double val = Double.parseDouble(metric.getValue());
               MAJC_PAUSED.add(val);
             }
           }
@@ -112,7 +112,7 @@ public class MemoryStarvedMajCIT extends SharedMiniClusterBase {
   }
 
   @BeforeEach
-  public void beforeEach() throws Exception {
+  public void beforeEach() {
     // Reset the client side counters
     MAJC_PAUSED.reset();
   }
@@ -140,26 +140,25 @@ public class MemoryStarvedMajCIT extends SharedMiniClusterBase {
 
       try (Scanner scanner = client.createScanner(table)) {
 
-        MemoryStarvedScanIT.consumeServerMemory(scanner, table);
+        MemoryStarvedScanIT.consumeServerMemory(scanner);
 
-        Double paused = MAJC_PAUSED.doubleValue();
+        int paused = MAJC_PAUSED.intValue();
         assertEquals(0, paused);
 
         ReadWriteIT.ingest(client, 100, 100, 100, 0, table);
         compactionThread.start();
 
-        while (paused == 0) {
+        while (paused <= 0) {
           Thread.sleep(1000);
-          paused = MAJC_PAUSED.doubleValue();
+          paused = MAJC_PAUSED.intValue();
         }
-        assertTrue(paused > 0);
 
         MemoryStarvedScanIT.freeServerMemory(client, table);
         compactionThread.interrupt();
         compactionThread.join();
         assertNull(error.get());
         assertTrue(client.instanceOperations().getActiveCompactions().stream()
-            .filter(ac -> ac.getPausedCount() > 0).findAny().isPresent());
+            .anyMatch(ac -> ac.getPausedCount() > 0));
       }
     }
 
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java
index 4c91c17635..6418d98ef4 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.DoubleAdder;
-import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
@@ -91,7 +90,7 @@ public class MemoryStarvedMinCIT extends SharedMiniClusterBase {
           if (line.startsWith("accumulo")) {
             Metric metric = TestStatsDSink.parseStatsDMetric(line);
             if (MetricsProducer.METRICS_MINC_PAUSED.equals(metric.getName())) {
-              Double val = Double.parseDouble(metric.getValue());
+              double val = Double.parseDouble(metric.getValue());
               MINC_PAUSED.add(val);
             }
           }
@@ -112,7 +111,7 @@ public class MemoryStarvedMinCIT extends SharedMiniClusterBase {
   }
 
   @BeforeEach
-  public void beforeEach() throws Exception {
+  public void beforeEach() {
     // Reset the client side counters
     MINC_PAUSED.reset();
   }
@@ -142,27 +141,25 @@ public class MemoryStarvedMinCIT extends SharedMiniClusterBase {
 
       try (Scanner scanner = client.createScanner(table)) {
 
-        MemoryStarvedScanIT.consumeServerMemory(scanner, table);
+        MemoryStarvedScanIT.consumeServerMemory(scanner);
 
-        Double paused = MINC_PAUSED.doubleValue();
+        int paused = MINC_PAUSED.intValue();
         assertEquals(0, paused);
 
         ingestThread.start();
 
-        while (paused == 0) {
+        while (paused <= 0) {
           Thread.sleep(1000);
-          paused = MINC_PAUSED.doubleValue();
+          paused = MINC_PAUSED.intValue();
         }
-        assertTrue(paused > 0);
 
         MemoryStarvedScanIT.freeServerMemory(client, table);
         ingestThread.interrupt();
         ingestThread.join();
         assertNull(error.get());
         assertTrue(client.instanceOperations().getActiveCompactions().stream()
-            .filter(ac -> ac.getPausedCount() > 0).collect(Collectors.toList()).size() > 0);
+            .anyMatch(ac -> ac.getPausedCount() > 0));
       }
     }
   }
-
 }
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
index ea67ef3734..c0036d358d 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
@@ -18,6 +18,7 @@
  */
 package org.apache.accumulo.test.functional;
 
+import static org.apache.accumulo.core.metrics.MetricsProducer.METRICS_LOW_MEMORY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -80,10 +81,11 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
     }
   }
 
-  public static final Double FREE_MEMORY_THRESHOLD = 0.20D;
+  public static final double FREE_MEMORY_THRESHOLD = 0.20D;
 
   private static final DoubleAdder SCAN_START_DELAYED = new DoubleAdder();
   private static final DoubleAdder SCAN_RETURNED_EARLY = new DoubleAdder();
+  private static final AtomicInteger LOW_MEM_DETECTED = new AtomicInteger(0);
   private static TestStatsDSink sink;
   private static Thread metricConsumer;
 
@@ -100,11 +102,17 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
           if (line.startsWith("accumulo")) {
             Metric metric = TestStatsDSink.parseStatsDMetric(line);
             if (MetricsProducer.METRICS_SCAN_PAUSED_FOR_MEM.equals(metric.getName())) {
-              Double val = Double.parseDouble(metric.getValue());
+              double val = Double.parseDouble(metric.getValue());
               SCAN_START_DELAYED.add(val);
             } else if (MetricsProducer.METRICS_SCAN_RETURN_FOR_MEM.equals(metric.getName())) {
-              Double val = Double.parseDouble(metric.getValue());
+              double val = Double.parseDouble(metric.getValue());
               SCAN_RETURNED_EARLY.add(val);
+            } else if (metric.getName().equals(METRICS_LOW_MEMORY)) {
+              String process = metric.getTags().get("process.name");
+              if (process != null && process.contains("tserver")) {
+                int val = Integer.parseInt(metric.getValue());
+                LOW_MEM_DETECTED.set(val);
+              }
             }
           }
         }
@@ -124,13 +132,14 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
   }
 
   @BeforeEach
-  public void beforeEach() throws Exception {
+  public void beforeEach() {
     // Reset the client side counters
     SCAN_START_DELAYED.reset();
     SCAN_START_DELAYED.reset();
+    LOW_MEM_DETECTED.set(0);
   }
 
-  static void consumeServerMemory(Scanner scanner, String table) throws Exception {
+  static void consumeServerMemory(Scanner scanner) {
     // This iterator will attempt to consume all free memory in the TabletServer
     scanner.addScanIterator(new IteratorSetting(11, MemoryConsumingIterator.class, Map.of()));
     scanner.setBatchSize(1);
@@ -143,7 +152,7 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
     assertTrue(iter.hasNext());
   }
 
-  private void consumeServerMemory(BatchScanner scanner, String table) throws Exception {
+  private void consumeServerMemory(BatchScanner scanner) {
     // This iterator will attempt to consume all free memory in the TabletServer
     scanner.addScanIterator(new IteratorSetting(11, MemoryConsumingIterator.class, Map.of()));
     scanner.setRanges(Collections.singletonList(new Range()));
@@ -174,13 +183,13 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
       ReadWriteIT.ingest(client, 10, 10, 10, 0, table);
 
       try (Scanner scanner = client.createScanner(table)) {
-        Double returned = SCAN_RETURNED_EARLY.doubleValue();
-        Double paused = SCAN_START_DELAYED.doubleValue();
+        double returned = SCAN_RETURNED_EARLY.doubleValue();
+        double paused = SCAN_START_DELAYED.doubleValue();
 
-        consumeServerMemory(scanner, table);
+        consumeServerMemory(scanner);
 
         // Wait for longer than the memory check interval
-        Thread.sleep(6000);
+        Thread.sleep(6_000);
 
         // The metric that indicates a scan was returned early due to low memory should
         // have been incremented.
@@ -245,8 +254,8 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
         assertTrue(currentCount > 0 && currentCount < 100);
 
         // Grab the current metric counts, wait
-        Double returned = SCAN_RETURNED_EARLY.doubleValue();
-        Double paused = SCAN_START_DELAYED.doubleValue();
+        double returned = SCAN_RETURNED_EARLY.doubleValue();
+        double paused = SCAN_START_DELAYED.doubleValue();
         Thread.sleep(1500);
         // One of two conditions could exist here:
         // The number of fetched rows equals the current count before the wait above
@@ -263,6 +272,8 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
         Thread.sleep(1500);
         assertEquals(currentCount, fetched.get());
 
+        assertEquals(1, LOW_MEM_DETECTED.get());
+
         // Free the memory which will allow the pausing scanner to continue
         freeServerMemory(client, table);
 
@@ -283,14 +294,17 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
       TableOperations to = client.tableOperations();
       to.create(table);
 
+      // check memory okay before starting
+      assertEquals(0, LOW_MEM_DETECTED.get());
+
       ReadWriteIT.ingest(client, 10, 10, 10, 0, table);
 
       try (BatchScanner scanner = client.createBatchScanner(table,
           client.securityOperations().getUserAuthorizations(client.whoami()), 1)) {
-        Double returned = SCAN_RETURNED_EARLY.doubleValue();
-        Double paused = SCAN_START_DELAYED.doubleValue();
+        double returned = SCAN_RETURNED_EARLY.doubleValue();
+        double paused = SCAN_START_DELAYED.doubleValue();
 
-        consumeServerMemory(scanner, table);
+        consumeServerMemory(scanner);
 
         // Wait for longer than the memory check interval
         Thread.sleep(6000);
@@ -299,6 +313,7 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
         // have been incremented.
         assertTrue(SCAN_RETURNED_EARLY.doubleValue() > returned);
         assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
+        assertEquals(1, LOW_MEM_DETECTED.get());
         freeServerMemory(client, table);
       } finally {
         to.delete(table);
@@ -315,6 +330,9 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
       TableOperations to = client.tableOperations();
       to.create(table);
 
+      // check memory okay before starting
+      assertEquals(0, LOW_MEM_DETECTED.get());
+
       ReadWriteIT.ingest(client, 10, 3, 10, 0, table);
 
       try (BatchScanner dataConsumingScanner = client.createBatchScanner(table);
@@ -359,12 +377,13 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
         // Grab the current paused count, wait two seconds and then confirm that
         // the number of rows fetched by the memoryConsumingScanner has not increased
         // and that the scan delay counter has increased.
-        Double returned = SCAN_RETURNED_EARLY.doubleValue();
-        Double paused = SCAN_START_DELAYED.doubleValue();
+        double returned = SCAN_RETURNED_EARLY.doubleValue();
+        double paused = SCAN_START_DELAYED.doubleValue();
         Thread.sleep(1500);
         assertEquals(currentCount, fetched.get());
         assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
         assertTrue(SCAN_RETURNED_EARLY.doubleValue() >= returned);
+        assertEquals(1, LOW_MEM_DETECTED.get());
 
         // Perform the check again
         paused = SCAN_START_DELAYED.doubleValue();
@@ -372,13 +391,18 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
         Thread.sleep(1500);
         assertEquals(currentCount, fetched.get());
         assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
-        assertTrue(SCAN_RETURNED_EARLY.doubleValue() == returned);
+        assertEquals(returned, SCAN_RETURNED_EARLY.doubleValue());
+        assertEquals(1, LOW_MEM_DETECTED.get());
 
         // Free the memory which will allow the pausing scanner to continue
         freeServerMemory(client, table);
 
         t.join();
         assertEquals(30, fetched.get());
+        // allow metic collection to cycle.
+        Thread.sleep(6_000);
+        assertEquals(0, LOW_MEM_DETECTED.get());
+
       } finally {
         to.delete(table);
       }