You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ed...@apache.org on 2023/06/23 20:28:59 UTC
[accumulo] branch main updated: fix low memory detector and test updates (#3508)
This is an automated email from the ASF dual-hosted git repository.
edcoleman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new e1320df709 fix low memory detector and test updates (#3508)
e1320df709 is described below
commit e1320df709630be844bddd9fd0d8c6045bc5111e
Author: EdColeman <de...@etcoleman.com>
AuthorDate: Fri Jun 23 20:28:53 2023 +0000
fix low memory detector and test updates (#3508)
* fix low memory detector and test updates
Co-authored-by: Dave Marion <dl...@apache.org>
---
.../accumulo/server/mem/LowMemoryDetector.java | 79 ++++-----
.../test/functional/MemoryFreeingIterator.java | 30 ++--
.../test/functional/MemoryStarvedMajCIT.java | 8 +-
.../test/functional/MemoryStarvedMinCIT.java | 2 +-
.../test/functional/MemoryStarvedScanIT.java | 192 ++++++++++++++++-----
5 files changed, 203 insertions(+), 108 deletions(-)
diff --git a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
index e08eed772b..fb18c7c682 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
@@ -36,20 +36,21 @@ import org.slf4j.LoggerFactory;
public class LowMemoryDetector {
+ private static final Logger LOG = LoggerFactory.getLogger(LowMemoryDetector.class);
+
@FunctionalInterface
- public static interface Action {
+ public interface Action {
void execute();
}
public enum DetectionScope {
MINC, MAJC, SCAN
- };
-
- private static final Logger log = LoggerFactory.getLogger(LowMemoryDetector.class);
+ }
private final HashMap<String,Long> prevGcTime = new HashMap<>();
+
private long lastMemorySize = 0;
- private long gcTimeIncreasedCount = 0;
+ private int lowMemCount = 0;
private long lastMemoryCheckTime = 0;
private final Lock memCheckTimeLock = new ReentrantLock();
private volatile boolean runningLowOnMemory = false;
@@ -65,15 +66,15 @@ public class LowMemoryDetector {
/**
* @param context server context
* @param scope whether this is being checked in the context of scan or compact code
- * @param isUserTable boolean as to whether the table being scanned / compacted is a user table.
- * No action is taken for system tables.
+ * @param isUserTable boolean set true if the table being scanned / compacted is a user table. No
+ * action is taken for system tables.
* @param action Action to perform when this method returns true
* @return true if server running low on memory
*/
public boolean isRunningLowOnMemory(ServerContext context, DetectionScope scope,
Supplier<Boolean> isUserTable, Action action) {
if (isUserTable.get()) {
- Property p = null;
+ Property p;
switch (scope) {
case SCAN:
p = Property.GENERAL_LOW_MEM_SCAN_PROTECTION;
@@ -99,21 +100,19 @@ public class LowMemoryDetector {
public void logGCInfo(AccumuloConfiguration conf) {
- Double freeMemoryPercentage = conf.getFraction(Property.GENERAL_LOW_MEM_DETECTOR_THRESHOLD);
+ double freeMemoryPercentage = conf.getFraction(Property.GENERAL_LOW_MEM_DETECTOR_THRESHOLD);
memCheckTimeLock.lock();
try {
final long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
- Runtime rt = Runtime.getRuntime();
StringBuilder sb = new StringBuilder("gc");
boolean sawChange = false;
long maxIncreaseInCollectionTime = 0;
-
for (GarbageCollectorMXBean gcBean : gcmBeans) {
Long prevTime = prevGcTime.get(gcBean.getName());
long pt = 0;
@@ -135,48 +134,50 @@ public class LowMemoryDetector {
prevGcTime.put(gcBean.getName(), time);
}
- long mem = rt.freeMemory();
- if (maxIncreaseInCollectionTime == 0) {
- gcTimeIncreasedCount = 0;
- } else {
- gcTimeIncreasedCount++;
- if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * freeMemoryPercentage) {
+ Runtime rt = Runtime.getRuntime();
+ final long maxConfiguredMemory = rt.maxMemory();
+ final long allocatedMemory = rt.totalMemory();
+ final long allocatedFreeMemory = rt.freeMemory();
+ final long freeMemory = maxConfiguredMemory - (allocatedMemory - allocatedFreeMemory);
+ final long lowMemoryThreshold = (long) (maxConfiguredMemory * freeMemoryPercentage);
+ LOG.trace("Memory info: max={}, allocated={}, free={}, free threshold={}",
+ maxConfiguredMemory, allocatedMemory, freeMemory, lowMemoryThreshold);
+
+ if (freeMemory < lowMemoryThreshold) {
+ lowMemCount++;
+ if (lowMemCount > 3 && !runningLowOnMemory) {
runningLowOnMemory = true;
- log.warn("Running low on memory");
- gcTimeIncreasedCount = 0;
+ LOG.warn("Running low on memory: max={}, allocated={}, free={}, free threshold={}",
+ maxConfiguredMemory, allocatedMemory, freeMemory, lowMemoryThreshold);
+ }
+ } else {
+ // If we were running low on memory, but are not any longer, than log at warn
+ // so that it shows up in the logs
+ if (runningLowOnMemory) {
+ LOG.warn("Recovered from low memory condition");
} else {
- // If we were running low on memory, but are not any longer, than log at warn
- // so that it shows up in the logs
- if (runningLowOnMemory) {
- log.warn("Recovered from low memory condition");
- } else {
- log.trace("Not running low on memory");
- }
- runningLowOnMemory = false;
+ LOG.trace("Not running low on memory");
}
+ runningLowOnMemory = false;
+ lowMemCount = 0;
}
- if (mem != lastMemorySize) {
+ if (freeMemory != lastMemorySize) {
sawChange = true;
}
- String sign = "+";
- if (mem - lastMemorySize <= 0) {
- sign = "";
- }
-
- sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, (mem - lastMemorySize),
- rt.totalMemory()));
+ sb.append(String.format(" freemem=%,d(%+,d) totalmem=%,d", freeMemory,
+ (freeMemory - lastMemorySize), rt.totalMemory()));
if (sawChange) {
- log.debug(sb.toString());
+ LOG.debug(sb.toString());
}
final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
if (lastMemoryCheckTime > 0 && lastMemoryCheckTime < now) {
final long diff = now - lastMemoryCheckTime;
if (diff > keepAliveTimeout + 1000) {
- log.warn(String.format(
+ LOG.warn(String.format(
"GC pause checker not called in a timely"
+ " fashion. Expected every %.1f seconds but was %.1f seconds since last check",
keepAliveTimeout / 1000., diff / 1000.));
@@ -186,10 +187,10 @@ public class LowMemoryDetector {
}
if (maxIncreaseInCollectionTime > keepAliveTimeout) {
- Halt.halt("Garbage collection may be interfering with lock keep-alive. Halting.", -1);
+ Halt.halt("Garbage collection may be interfering with lock keep-alive. Halting.", -1);
}
- lastMemorySize = mem;
+ lastMemorySize = freeMemory;
lastMemoryCheckTime = now;
} finally {
memCheckTimeLock.unlock();
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java
index 2aa86e9ddd..95e7e2e7a4 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryFreeingIterator.java
@@ -20,31 +20,25 @@ package org.apache.accumulo.test.functional;
import static java.util.concurrent.TimeUnit.SECONDS;
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
public class MemoryFreeingIterator extends WrappingIterator {
- @Override
- public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
- IteratorEnvironment env) throws IOException {
- super.init(source, options, env);
+ private static final Logger LOG = LoggerFactory.getLogger(MemoryFreeingIterator.class);
+
+ @SuppressFBWarnings(value = "DM_GC", justification = "gc is okay for test")
+ public MemoryFreeingIterator() throws InterruptedException {
+ LOG.info("Try to free consumed memory - will block until isRunningLowOnMemory returns false.");
MemoryConsumingIterator.freeBuffers();
while (this.isRunningLowOnMemory()) {
+ System.gc();
// wait for LowMemoryDetector to recognize the memory is free.
- try {
- Thread.sleep(SECONDS.toMillis(1));
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- throw new IOException("wait for low memory detector interrupted", ex);
- }
+ Thread.sleep(SECONDS.toMillis(1));
}
+ LOG.info("isRunningLowOnMemory returned false - memory available");
}
-
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
index e4ded303e6..8c9d0f7db3 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.test.functional;
+import static org.apache.accumulo.test.util.Wait.waitFor;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -148,12 +149,9 @@ public class MemoryStarvedMajCIT extends SharedMiniClusterBase {
ReadWriteIT.ingest(client, 100, 100, 100, 0, table);
compactionThread.start();
- while (paused <= 0) {
- Thread.sleep(1000);
- paused = MAJC_PAUSED.intValue();
- }
+ assertTrue(waitFor(() -> MAJC_PAUSED.intValue() > 0));
- MemoryStarvedScanIT.freeServerMemory(client, table);
+ MemoryStarvedScanIT.freeServerMemory(client);
compactionThread.interrupt();
compactionThread.join();
assertNull(error.get());
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java
index 6418d98ef4..0a001047cc 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java
@@ -153,7 +153,7 @@ public class MemoryStarvedMinCIT extends SharedMiniClusterBase {
paused = MINC_PAUSED.intValue();
}
- MemoryStarvedScanIT.freeServerMemory(client, table);
+ MemoryStarvedScanIT.freeServerMemory(client);
ingestThread.interrupt();
ingestThread.join();
assertNull(error.get());
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
index c0036d358d..607a59ece1 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.test.functional;
import static org.apache.accumulo.core.metrics.MetricsProducer.METRICS_LOW_MEMORY;
+import static org.apache.accumulo.test.util.Wait.waitFor;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -40,6 +41,7 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.WrappingIterator;
import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
@@ -54,6 +56,8 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MemoryStarvedScanIT extends SharedMiniClusterBase {
@@ -83,6 +87,7 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
public static final double FREE_MEMORY_THRESHOLD = 0.20D;
+ private static final Logger LOG = LoggerFactory.getLogger(MemoryStarvedScanIT.class);
private static final DoubleAdder SCAN_START_DELAYED = new DoubleAdder();
private static final DoubleAdder SCAN_RETURNED_EARLY = new DoubleAdder();
private static final AtomicInteger LOW_MEM_DETECTED = new AtomicInteger(0);
@@ -132,11 +137,16 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
}
@BeforeEach
- public void beforeEach() {
+ public void beforeEach() throws Exception {
+ // Free the server side memory
+ try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+ freeServerMemory(client);
+ }
+ // allow metric collection to cycle and metric value to reset to zero
+ waitFor(() -> 0 == LOW_MEM_DETECTED.get());
// Reset the client side counters
SCAN_START_DELAYED.reset();
SCAN_START_DELAYED.reset();
- LOW_MEM_DETECTED.set(0);
}
static void consumeServerMemory(Scanner scanner) {
@@ -162,13 +172,11 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
assertTrue(iter.hasNext());
}
- static void freeServerMemory(AccumuloClient client, String table) throws Exception {
- try (Scanner scanner = client.createScanner(table)) {
- scanner.addScanIterator(new IteratorSetting(11, MemoryFreeingIterator.class, Map.of()));
- @SuppressWarnings("unused")
- Iterator<Entry<Key,Value>> iter = scanner.iterator(); // init'ing the iterator should be
- // enough to free the memory
- }
+ static void freeServerMemory(AccumuloClient client) throws Exception {
+ // Instantiating this class on the TabletServer will free the memory as it
+ // frees the buffers created by the MemoryConsumingIterator in its constructor.
+ client.instanceOperations().testClassLoad(MemoryFreeingIterator.class.getName(),
+ WrappingIterator.class.getName());
}
@Test
@@ -183,19 +191,16 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
ReadWriteIT.ingest(client, 10, 10, 10, 0, table);
try (Scanner scanner = client.createScanner(table)) {
- double returned = SCAN_RETURNED_EARLY.doubleValue();
- double paused = SCAN_START_DELAYED.doubleValue();
+ final double returned = SCAN_RETURNED_EARLY.doubleValue();
+ final double paused = SCAN_START_DELAYED.doubleValue();
consumeServerMemory(scanner);
- // Wait for longer than the memory check interval
- Thread.sleep(6_000);
+ // Wait for The metric that indicates a scan was returned early due to low memory
+ waitFor(() -> SCAN_RETURNED_EARLY.doubleValue() > returned
+ && SCAN_START_DELAYED.doubleValue() >= paused);
- // The metric that indicates a scan was returned early due to low memory should
- // have been incremented.
- assertTrue(SCAN_RETURNED_EARLY.doubleValue() > returned);
- assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
- freeServerMemory(client, table);
+ freeServerMemory(client);
} finally {
to.delete(table);
}
@@ -236,6 +241,7 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
memoryConsumingScanner.setReadaheadThreshold(Long.MAX_VALUE);
t.start();
+ LOG.info("Waiting for memory to be consumed");
// Wait until the dataConsumingScanner has started fetching data
int currentCount = fetched.get();
@@ -252,30 +258,32 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
// Confirm that some data was fetched by the memoryConsumingScanner
currentCount = fetched.get();
assertTrue(currentCount > 0 && currentCount < 100);
+ LOG.info("Memory consumed");
// Grab the current metric counts, wait
- double returned = SCAN_RETURNED_EARLY.doubleValue();
- double paused = SCAN_START_DELAYED.doubleValue();
+ final double returned = SCAN_RETURNED_EARLY.doubleValue();
+ final double paused = SCAN_START_DELAYED.doubleValue();
Thread.sleep(1500);
// One of two conditions could exist here:
// The number of fetched rows equals the current count before the wait above
// and the SCAN_START_DELAYED has been incremented OR the number of fetched
// rows is one more than the current count and the SCAN_RETURNED_EARLY has
// been incremented.
- assertTrue((currentCount == fetched.get() && SCAN_START_DELAYED.doubleValue() > paused)
- || (currentCount + 1 == fetched.get() && SCAN_RETURNED_EARLY.doubleValue() > returned));
+ final int currentCountCopy = currentCount;
+ waitFor(
+ () -> (currentCountCopy == fetched.get() && SCAN_START_DELAYED.doubleValue() > paused)
+ || (currentCountCopy + 1 == fetched.get()
+ && SCAN_RETURNED_EARLY.doubleValue() > returned));
currentCount = fetched.get();
// Perform the check again
- paused = SCAN_START_DELAYED.doubleValue();
- returned = SCAN_RETURNED_EARLY.doubleValue();
- Thread.sleep(1500);
+ assertTrue(waitFor(() -> 1 == LOW_MEM_DETECTED.get()));
assertEquals(currentCount, fetched.get());
- assertEquals(1, LOW_MEM_DETECTED.get());
-
// Free the memory which will allow the pausing scanner to continue
- freeServerMemory(client, table);
+ LOG.info("Freeing memory");
+ freeServerMemory(client);
+ LOG.info("Memory freed");
t.join();
assertEquals(30, fetched.get());
@@ -301,20 +309,18 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
try (BatchScanner scanner = client.createBatchScanner(table,
client.securityOperations().getUserAuthorizations(client.whoami()), 1)) {
- double returned = SCAN_RETURNED_EARLY.doubleValue();
- double paused = SCAN_START_DELAYED.doubleValue();
+
+ final double returned = SCAN_RETURNED_EARLY.doubleValue();
+ final double paused = SCAN_START_DELAYED.doubleValue();
consumeServerMemory(scanner);
- // Wait for longer than the memory check interval
- Thread.sleep(6000);
+ // Wait for metric that indicates a scan was returned early due to low memory
+ assertTrue(waitFor(() -> SCAN_RETURNED_EARLY.doubleValue() > returned
+ && SCAN_START_DELAYED.doubleValue() >= paused));
+ assertTrue(waitFor(() -> 1 == LOW_MEM_DETECTED.get()));
- // The metric that indicates a scan was returned early due to low memory should
- // have been incremented.
- assertTrue(SCAN_RETURNED_EARLY.doubleValue() > returned);
- assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
- assertEquals(1, LOW_MEM_DETECTED.get());
- freeServerMemory(client, table);
+ freeServerMemory(client);
} finally {
to.delete(table);
}
@@ -324,6 +330,96 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
@Test
public void testBatchScanPauses() throws Exception {
+ String table = getUniqueNames(1)[0];
+ try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+
+ TableOperations to = client.tableOperations();
+ to.create(table);
+
+ // check memory okay before starting
+ assertEquals(0, LOW_MEM_DETECTED.get());
+
+ ReadWriteIT.ingest(client, 10, 3, 10, 0, table);
+
+ try (BatchScanner dataConsumingScanner = client.createBatchScanner(table);
+ Scanner memoryConsumingScanner = client.createScanner(table)) {
+
+ dataConsumingScanner.addScanIterator(
+ new IteratorSetting(11, SlowIterator.class, Map.of("sleepTime", "500")));
+ dataConsumingScanner.setRanges(Collections.singletonList(new Range()));
+ Iterator<Entry<Key,Value>> iter = dataConsumingScanner.iterator();
+ AtomicInteger fetched = new AtomicInteger(0);
+ Thread t = new Thread(() -> {
+ int i = 0;
+ while (iter.hasNext()) {
+ iter.next();
+ fetched.set(++i);
+ }
+ });
+
+ memoryConsumingScanner
+ .addScanIterator(new IteratorSetting(11, MemoryConsumingIterator.class, Map.of()));
+ memoryConsumingScanner.setBatchSize(1);
+ memoryConsumingScanner.setReadaheadThreshold(Long.MAX_VALUE);
+
+ t.start();
+
+ // Wait until the dataConsumingScanner has started fetching data
+ int currentCount = fetched.get();
+ while (currentCount == 0) {
+ Thread.sleep(500);
+ currentCount = fetched.get();
+ }
+
+ // This should block until the GarbageCollectionLogger runs and notices that the
+ // VM is low on memory.
+ Iterator<Entry<Key,Value>> consumingIter = memoryConsumingScanner.iterator();
+ assertTrue(consumingIter.hasNext());
+
+ // Confirm that some data was fetched by the dataConsumingScanner
+ currentCount = fetched.get();
+ assertTrue(currentCount > 0 && currentCount < 100);
+
+ // Grab the current paused count, wait two seconds and then confirm that
+ // the number of rows fetched by the memoryConsumingScanner has not increased
+ // and that the scan delay counter has increased.
+ final double returned = SCAN_RETURNED_EARLY.doubleValue();
+ final double paused = SCAN_START_DELAYED.doubleValue();
+
+ final int currentCountCopy = currentCount;
+ waitFor(
+ () -> (currentCountCopy == fetched.get() && SCAN_START_DELAYED.doubleValue() > paused)
+ || (currentCountCopy + 1 == fetched.get()
+ && SCAN_RETURNED_EARLY.doubleValue() > returned));
+ waitFor(() -> 1 == LOW_MEM_DETECTED.get());
+
+ // Perform the check again
+ final double paused2 = SCAN_START_DELAYED.doubleValue();
+ final double returned2 = SCAN_RETURNED_EARLY.doubleValue();
+ Thread.sleep(1500);
+ assertEquals(currentCount, fetched.get());
+ assertTrue(SCAN_START_DELAYED.doubleValue() >= paused2);
+ assertEquals(returned2, SCAN_RETURNED_EARLY.doubleValue());
+ waitFor(() -> 1 == LOW_MEM_DETECTED.get());
+
+ // Free the memory which will allow the pausing scanner to continue
+ freeServerMemory(client);
+
+ t.join();
+ assertEquals(30, fetched.get());
+
+ } finally {
+ to.delete(table);
+ }
+ }
+ }
+
+ /**
+ * Check that the low memory condition is set and remains set until free memory is available.
+ */
+ @Test
+ public void testLowMemoryFlapping() throws Exception {
+
String table = getUniqueNames(1)[0];
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
@@ -383,7 +479,7 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
assertEquals(currentCount, fetched.get());
assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
assertTrue(SCAN_RETURNED_EARLY.doubleValue() >= returned);
- assertEquals(1, LOW_MEM_DETECTED.get());
+ assertTrue(waitFor(() -> LOW_MEM_DETECTED.get() == 1));
// Perform the check again
paused = SCAN_START_DELAYED.doubleValue();
@@ -392,21 +488,27 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
assertEquals(currentCount, fetched.get());
assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
assertEquals(returned, SCAN_RETURNED_EARLY.doubleValue());
- assertEquals(1, LOW_MEM_DETECTED.get());
+ // check across multiple low memory checks and metric updates that low memory detected
+ // remains set
+ int checkCount = 0;
+ while (checkCount++ < 5) {
+ Thread.sleep(5_000);
+ LOG.debug("Check low memory still set. Low Memory Flag: {}, Check count: {}",
+ LOW_MEM_DETECTED.get(), checkCount);
+ assertEquals(1, LOW_MEM_DETECTED.get());
+ }
// Free the memory which will allow the pausing scanner to continue
- freeServerMemory(client, table);
+ freeServerMemory(client);
t.join();
assertEquals(30, fetched.get());
- // allow metic collection to cycle.
- Thread.sleep(6_000);
- assertEquals(0, LOW_MEM_DETECTED.get());
+ // allow metric collection to cycle.
+ assertTrue(waitFor(() -> LOW_MEM_DETECTED.get() == 0));
} finally {
to.delete(table);
}
}
}
-
}