You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ed...@apache.org on 2023/05/01 18:31:27 UTC
[accumulo] branch main updated: add low memory metric to abstract server (#3288)
This is an automated email from the ASF dual-hosted git repository.
edcoleman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new c0586be591 add low memory metric to abstract server (#3288)
c0586be591 is described below
commit c0586be591d1fd8492aaa4577194967c2403abb7
Author: EdColeman <de...@etcoleman.com>
AuthorDate: Mon May 1 14:31:22 2023 -0400
add low memory metric to abstract server (#3288)
Add low memory metric to abstract server
---
.../accumulo/core/metrics/MetricsProducer.java | 10 +++-
.../org/apache/accumulo/server/AbstractServer.java | 23 +++++++--
.../accumulo/server/metrics/ProcessMetrics.java | 44 ++++++++++++++++
.../apache/accumulo/server/rpc/TimedProcessor.java | 2 +-
.../coordinator/CompactionCoordinator.java | 1 +
.../org/apache/accumulo/compactor/Compactor.java | 5 +-
.../apache/accumulo/gc/SimpleGarbageCollector.java | 2 +-
.../java/org/apache/accumulo/manager/Manager.java | 3 +-
.../accumulo/manager/metrics/ManagerMetrics.java | 22 ++++----
.../org/apache/accumulo/tserver/ScanServer.java | 4 +-
.../org/apache/accumulo/tserver/TabletServer.java | 19 +++----
.../test/functional/MemoryStarvedMajCIT.java | 15 +++---
.../test/functional/MemoryStarvedMinCIT.java | 17 +++---
.../test/functional/MemoryStarvedScanIT.java | 60 +++++++++++++++-------
14 files changed, 162 insertions(+), 65 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
index e0b1bd65a0..86e110d057 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
@@ -35,7 +35,7 @@ import io.micrometer.core.instrument.MeterRegistry;
* <a href="https://micrometer.io/">Micrometer</a>. Micrometer suggests using a particular
* <a href="https://micrometer.io/docs/concepts#_naming_meters">naming convention</a> for the
* metrics. The table below contains a mapping of the old to new metric names.
- *
+ * <p>
* <table border="1">
* <caption>Summary of Metric Changes</caption> <!-- fate -->
* <tr>
@@ -48,6 +48,13 @@ import io.micrometer.core.instrument.MeterRegistry;
* <tr>
* <td>N/A</td>
* <td>N/A</td>
+ * <td>{@link #METRICS_LOW_MEMORY}</td>
+ * <td>Guage</td>
+ * <td>reports 1 when process memory usage is above threshold, 0 when memory is okay</td>
+ * </tr>
+ * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
* <td>{@link #METRICS_COMPACTOR_MAJC_STUCK}</td>
* <td>LongTaskTimer</td>
* <td></td>
@@ -594,6 +601,7 @@ public interface MetricsProducer {
Logger LOG = LoggerFactory.getLogger(MetricsProducer.class);
+ String METRICS_LOW_MEMORY = "accumulo.detected.low.memory";
String METRICS_COMPACTOR_PREFIX = "accumulo.compactor";
String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + "majc.stuck";
diff --git a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
index 6b47fd5fea..e7f8efba26 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
@@ -27,29 +27,34 @@ import org.apache.accumulo.core.classloader.ClassLoaderUtil;
import org.apache.accumulo.core.cli.ConfigOpts;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.metrics.MetricsUtil;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.server.mem.LowMemoryDetector;
+import org.apache.accumulo.server.metrics.ProcessMetrics;
import org.apache.accumulo.server.security.SecurityUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class AbstractServer implements AutoCloseable, Runnable {
+import io.micrometer.core.instrument.MeterRegistry;
+
+public abstract class AbstractServer implements AutoCloseable, MetricsProducer, Runnable {
private final ServerContext context;
protected final String applicationName;
private final String hostname;
- private final Logger log;
+
+ private final ProcessMetrics processMetrics;
protected AbstractServer(String appName, ConfigOpts opts, String[] args) {
- this.log = LoggerFactory.getLogger(getClass().getName());
this.applicationName = appName;
opts.parseArgs(appName, args);
var siteConfig = opts.getSiteConfiguration();
this.hostname = siteConfig.get(Property.GENERAL_PROCESS_BIND_ADDRESS);
SecurityUtil.serverLogin(siteConfig);
context = new ServerContext(siteConfig);
+ Logger log = LoggerFactory.getLogger(getClass());
log.info("Version " + Constants.VERSION);
log.info("Instance " + context.getInstanceID());
context.init(appName);
@@ -64,6 +69,7 @@ public abstract class AbstractServer implements AutoCloseable, Runnable {
() -> lmd.logGCInfo(context.getConfiguration()), 0,
lmd.getIntervalMillis(context.getConfiguration()), TimeUnit.MILLISECONDS);
ThreadPools.watchNonCriticalScheduledTask(future);
+ processMetrics = new ProcessMetrics(context);
}
/**
@@ -87,6 +93,17 @@ public abstract class AbstractServer implements AutoCloseable, Runnable {
}
}
+ /**
+ * Called
+ */
+ @Override
+ public void registerMetrics(MeterRegistry registry) {
+ // makes mocking subclasses easier
+ if (processMetrics != null) {
+ processMetrics.registerMetrics(registry);
+ }
+ }
+
public String getHostname() {
return hostname;
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java
new file mode 100644
index 0000000000..8b44525c53
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/ProcessMetrics.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.metrics;
+
+import java.util.List;
+
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.server.ServerContext;
+
+import io.micrometer.core.instrument.MeterRegistry;
+
+public class ProcessMetrics implements MetricsProducer {
+
+ private final ServerContext context;
+
+ public ProcessMetrics(final ServerContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public void registerMetrics(MeterRegistry registry) {
+ registry.gauge(METRICS_LOW_MEMORY, List.of(), this, this::lowMemDetected);
+ }
+
+ private int lowMemDetected(ProcessMetrics processMetrics) {
+ return context.getLowMemoryDetector().isRunningLowOnMemory() ? 1 : 0;
+ }
+}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
index 0307c42c07..165479a71f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
@@ -33,7 +33,7 @@ public class TimedProcessor implements TProcessor {
private final TProcessor other;
private final ThriftMetrics thriftMetrics;
- private long idleStart = 0;
+ private long idleStart;
public TimedProcessor(TProcessor next) {
this.other = next;
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 2edbc368c5..baabd5aa76 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -267,6 +267,7 @@ public class CompactionCoordinator extends AbstractServer
try {
MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName,
clientAddress);
+ MetricsUtil.initializeProducers(this);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException | NoSuchMethodException
| SecurityException e1) {
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 5b2aa4579c..b33b92431a 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -168,6 +168,7 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac
@Override
public void registerMetrics(MeterRegistry registry) {
+ super.registerMetrics(registry);
LongTaskTimer timer = LongTaskTimer.builder(METRICS_COMPACTOR_MAJC_STUCK)
.description("Number and duration of stuck major compactions").register(registry);
CompactionWatcher.setTimer(timer);
@@ -597,13 +598,13 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac
try {
MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName,
clientAddress);
+ pausedMetrics = new PausedCompactionMetrics();
+ MetricsUtil.initializeProducers(this, pausedMetrics);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException | NoSuchMethodException
| SecurityException e1) {
LOG.error("Error initializing metrics, metrics will not be emitted.", e1);
}
- pausedMetrics = new PausedCompactionMetrics();
- MetricsUtil.initializeProducers(this, pausedMetrics);
LOG.info("Compactor started, waiting for work");
try {
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index d307f86d96..bc641daf34 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -167,7 +167,7 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface {
try {
MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName, address);
- MetricsUtil.initializeProducers(new GcMetrics(this));
+ MetricsUtil.initializeProducers(this, new GcMetrics(this));
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException | NoSuchMethodException
| SecurityException e1) {
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 90dbaba0cb..10baa3b15b 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -1107,7 +1107,8 @@ public class Manager extends AbstractServer
try {
MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName,
sa.getAddress());
- ManagerMetrics.init(getConfiguration(), this);
+ ManagerMetrics mm = new ManagerMetrics(getConfiguration(), this);
+ MetricsUtil.initializeProducers(this, mm);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException | NoSuchMethodException
| SecurityException e1) {
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
index b5a3a54edf..285df23a69 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
@@ -22,21 +22,25 @@ import static java.util.Objects.requireNonNull;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.metrics.fate.FateMetrics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class ManagerMetrics {
+import io.micrometer.core.instrument.MeterRegistry;
- private final static Logger log = LoggerFactory.getLogger(ManagerMetrics.class);
+public class ManagerMetrics implements MetricsProducer {
- public static void init(AccumuloConfiguration conf, Manager m) {
+ private final FateMetrics fateMetrics;
+
+ public ManagerMetrics(final AccumuloConfiguration conf, final Manager manager) {
requireNonNull(conf, "AccumuloConfiguration must not be null");
- MetricsUtil.initializeProducers(new FateMetrics(m.getContext(),
- conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)));
- log.info("Registered FATE metrics module");
+ requireNonNull(conf, "Manager must not be null");
+ fateMetrics = new FateMetrics(manager.getContext(),
+ conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL));
}
+ @Override
+ public void registerMetrics(MeterRegistry registry) {
+ fateMetrics.registerMetrics(registry);
+ }
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 12ea616739..fa863394af 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -357,13 +357,13 @@ public class ScanServer extends AbstractServer
try {
MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName,
clientAddress);
+ scanMetrics = new TabletServerScanMetrics();
+ MetricsUtil.initializeProducers(this, scanMetrics);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException | NoSuchMethodException
| SecurityException e1) {
LOG.error("Error initializing metrics, metrics will not be emitted.", e1);
}
- scanMetrics = new TabletServerScanMetrics();
- MetricsUtil.initializeProducers(scanMetrics);
// We need to set the compaction manager so that we don't get an NPE in CompactableImpl.close
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 7ab576fe43..ebc19875eb 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -710,21 +710,22 @@ public class TabletServer extends AbstractServer implements TabletHostingServer
try {
MetricsUtil.initializeMetrics(context.getConfiguration(), this.applicationName,
clientAddress);
+
+ metrics = new TabletServerMetrics(this);
+ updateMetrics = new TabletServerUpdateMetrics();
+ scanMetrics = new TabletServerScanMetrics();
+ mincMetrics = new TabletServerMinCMetrics();
+ ceMetrics = new CompactionExecutorsMetrics();
+ pausedMetrics = new PausedCompactionMetrics();
+ MetricsUtil.initializeProducers(this, metrics, updateMetrics, scanMetrics, mincMetrics,
+ ceMetrics, pausedMetrics);
+
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException | NoSuchMethodException
| SecurityException e1) {
log.error("Error initializing metrics, metrics will not be emitted.", e1);
}
- metrics = new TabletServerMetrics(this);
- updateMetrics = new TabletServerUpdateMetrics();
- scanMetrics = new TabletServerScanMetrics();
- mincMetrics = new TabletServerMinCMetrics();
- ceMetrics = new CompactionExecutorsMetrics();
- pausedMetrics = new PausedCompactionMetrics();
- MetricsUtil.initializeProducers(metrics, updateMetrics, scanMetrics, mincMetrics, ceMetrics,
- pausedMetrics);
-
this.compactionManager = new CompactionManager(() -> Iterators
.transform(onlineTablets.snapshot().values().iterator(), Tablet::asCompactable),
getContext(), ceMetrics);
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
index da4c98a7a5..e4ded303e6 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
@@ -91,7 +91,7 @@ public class MemoryStarvedMajCIT extends SharedMiniClusterBase {
if (line.startsWith("accumulo")) {
Metric metric = TestStatsDSink.parseStatsDMetric(line);
if (MetricsProducer.METRICS_MAJC_PAUSED.equals(metric.getName())) {
- Double val = Double.parseDouble(metric.getValue());
+ double val = Double.parseDouble(metric.getValue());
MAJC_PAUSED.add(val);
}
}
@@ -112,7 +112,7 @@ public class MemoryStarvedMajCIT extends SharedMiniClusterBase {
}
@BeforeEach
- public void beforeEach() throws Exception {
+ public void beforeEach() {
// Reset the client side counters
MAJC_PAUSED.reset();
}
@@ -140,26 +140,25 @@ public class MemoryStarvedMajCIT extends SharedMiniClusterBase {
try (Scanner scanner = client.createScanner(table)) {
- MemoryStarvedScanIT.consumeServerMemory(scanner, table);
+ MemoryStarvedScanIT.consumeServerMemory(scanner);
- Double paused = MAJC_PAUSED.doubleValue();
+ int paused = MAJC_PAUSED.intValue();
assertEquals(0, paused);
ReadWriteIT.ingest(client, 100, 100, 100, 0, table);
compactionThread.start();
- while (paused == 0) {
+ while (paused <= 0) {
Thread.sleep(1000);
- paused = MAJC_PAUSED.doubleValue();
+ paused = MAJC_PAUSED.intValue();
}
- assertTrue(paused > 0);
MemoryStarvedScanIT.freeServerMemory(client, table);
compactionThread.interrupt();
compactionThread.join();
assertNull(error.get());
assertTrue(client.instanceOperations().getActiveCompactions().stream()
- .filter(ac -> ac.getPausedCount() > 0).findAny().isPresent());
+ .anyMatch(ac -> ac.getPausedCount() > 0));
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java
index 4c91c17635..6418d98ef4 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.DoubleAdder;
-import java.util.stream.Collectors;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
@@ -91,7 +90,7 @@ public class MemoryStarvedMinCIT extends SharedMiniClusterBase {
if (line.startsWith("accumulo")) {
Metric metric = TestStatsDSink.parseStatsDMetric(line);
if (MetricsProducer.METRICS_MINC_PAUSED.equals(metric.getName())) {
- Double val = Double.parseDouble(metric.getValue());
+ double val = Double.parseDouble(metric.getValue());
MINC_PAUSED.add(val);
}
}
@@ -112,7 +111,7 @@ public class MemoryStarvedMinCIT extends SharedMiniClusterBase {
}
@BeforeEach
- public void beforeEach() throws Exception {
+ public void beforeEach() {
// Reset the client side counters
MINC_PAUSED.reset();
}
@@ -142,27 +141,25 @@ public class MemoryStarvedMinCIT extends SharedMiniClusterBase {
try (Scanner scanner = client.createScanner(table)) {
- MemoryStarvedScanIT.consumeServerMemory(scanner, table);
+ MemoryStarvedScanIT.consumeServerMemory(scanner);
- Double paused = MINC_PAUSED.doubleValue();
+ int paused = MINC_PAUSED.intValue();
assertEquals(0, paused);
ingestThread.start();
- while (paused == 0) {
+ while (paused <= 0) {
Thread.sleep(1000);
- paused = MINC_PAUSED.doubleValue();
+ paused = MINC_PAUSED.intValue();
}
- assertTrue(paused > 0);
MemoryStarvedScanIT.freeServerMemory(client, table);
ingestThread.interrupt();
ingestThread.join();
assertNull(error.get());
assertTrue(client.instanceOperations().getActiveCompactions().stream()
- .filter(ac -> ac.getPausedCount() > 0).collect(Collectors.toList()).size() > 0);
+ .anyMatch(ac -> ac.getPausedCount() > 0));
}
}
}
-
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
index ea67ef3734..c0036d358d 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.test.functional;
+import static org.apache.accumulo.core.metrics.MetricsProducer.METRICS_LOW_MEMORY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -80,10 +81,11 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
}
}
- public static final Double FREE_MEMORY_THRESHOLD = 0.20D;
+ public static final double FREE_MEMORY_THRESHOLD = 0.20D;
private static final DoubleAdder SCAN_START_DELAYED = new DoubleAdder();
private static final DoubleAdder SCAN_RETURNED_EARLY = new DoubleAdder();
+ private static final AtomicInteger LOW_MEM_DETECTED = new AtomicInteger(0);
private static TestStatsDSink sink;
private static Thread metricConsumer;
@@ -100,11 +102,17 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
if (line.startsWith("accumulo")) {
Metric metric = TestStatsDSink.parseStatsDMetric(line);
if (MetricsProducer.METRICS_SCAN_PAUSED_FOR_MEM.equals(metric.getName())) {
- Double val = Double.parseDouble(metric.getValue());
+ double val = Double.parseDouble(metric.getValue());
SCAN_START_DELAYED.add(val);
} else if (MetricsProducer.METRICS_SCAN_RETURN_FOR_MEM.equals(metric.getName())) {
- Double val = Double.parseDouble(metric.getValue());
+ double val = Double.parseDouble(metric.getValue());
SCAN_RETURNED_EARLY.add(val);
+ } else if (metric.getName().equals(METRICS_LOW_MEMORY)) {
+ String process = metric.getTags().get("process.name");
+ if (process != null && process.contains("tserver")) {
+ int val = Integer.parseInt(metric.getValue());
+ LOW_MEM_DETECTED.set(val);
+ }
}
}
}
@@ -124,13 +132,14 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
}
@BeforeEach
- public void beforeEach() throws Exception {
+ public void beforeEach() {
// Reset the client side counters
SCAN_START_DELAYED.reset();
SCAN_START_DELAYED.reset();
+ LOW_MEM_DETECTED.set(0);
}
- static void consumeServerMemory(Scanner scanner, String table) throws Exception {
+ static void consumeServerMemory(Scanner scanner) {
// This iterator will attempt to consume all free memory in the TabletServer
scanner.addScanIterator(new IteratorSetting(11, MemoryConsumingIterator.class, Map.of()));
scanner.setBatchSize(1);
@@ -143,7 +152,7 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
assertTrue(iter.hasNext());
}
- private void consumeServerMemory(BatchScanner scanner, String table) throws Exception {
+ private void consumeServerMemory(BatchScanner scanner) {
// This iterator will attempt to consume all free memory in the TabletServer
scanner.addScanIterator(new IteratorSetting(11, MemoryConsumingIterator.class, Map.of()));
scanner.setRanges(Collections.singletonList(new Range()));
@@ -174,13 +183,13 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
ReadWriteIT.ingest(client, 10, 10, 10, 0, table);
try (Scanner scanner = client.createScanner(table)) {
- Double returned = SCAN_RETURNED_EARLY.doubleValue();
- Double paused = SCAN_START_DELAYED.doubleValue();
+ double returned = SCAN_RETURNED_EARLY.doubleValue();
+ double paused = SCAN_START_DELAYED.doubleValue();
- consumeServerMemory(scanner, table);
+ consumeServerMemory(scanner);
// Wait for longer than the memory check interval
- Thread.sleep(6000);
+ Thread.sleep(6_000);
// The metric that indicates a scan was returned early due to low memory should
// have been incremented.
@@ -245,8 +254,8 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
assertTrue(currentCount > 0 && currentCount < 100);
// Grab the current metric counts, wait
- Double returned = SCAN_RETURNED_EARLY.doubleValue();
- Double paused = SCAN_START_DELAYED.doubleValue();
+ double returned = SCAN_RETURNED_EARLY.doubleValue();
+ double paused = SCAN_START_DELAYED.doubleValue();
Thread.sleep(1500);
// One of two conditions could exist here:
// The number of fetched rows equals the current count before the wait above
@@ -263,6 +272,8 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
Thread.sleep(1500);
assertEquals(currentCount, fetched.get());
+ assertEquals(1, LOW_MEM_DETECTED.get());
+
// Free the memory which will allow the pausing scanner to continue
freeServerMemory(client, table);
@@ -283,14 +294,17 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
TableOperations to = client.tableOperations();
to.create(table);
+ // check memory okay before starting
+ assertEquals(0, LOW_MEM_DETECTED.get());
+
ReadWriteIT.ingest(client, 10, 10, 10, 0, table);
try (BatchScanner scanner = client.createBatchScanner(table,
client.securityOperations().getUserAuthorizations(client.whoami()), 1)) {
- Double returned = SCAN_RETURNED_EARLY.doubleValue();
- Double paused = SCAN_START_DELAYED.doubleValue();
+ double returned = SCAN_RETURNED_EARLY.doubleValue();
+ double paused = SCAN_START_DELAYED.doubleValue();
- consumeServerMemory(scanner, table);
+ consumeServerMemory(scanner);
// Wait for longer than the memory check interval
Thread.sleep(6000);
@@ -299,6 +313,7 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
// have been incremented.
assertTrue(SCAN_RETURNED_EARLY.doubleValue() > returned);
assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
+ assertEquals(1, LOW_MEM_DETECTED.get());
freeServerMemory(client, table);
} finally {
to.delete(table);
@@ -315,6 +330,9 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
TableOperations to = client.tableOperations();
to.create(table);
+ // check memory okay before starting
+ assertEquals(0, LOW_MEM_DETECTED.get());
+
ReadWriteIT.ingest(client, 10, 3, 10, 0, table);
try (BatchScanner dataConsumingScanner = client.createBatchScanner(table);
@@ -359,12 +377,13 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
// Grab the current paused count, wait two seconds and then confirm that
// the number of rows fetched by the memoryConsumingScanner has not increased
// and that the scan delay counter has increased.
- Double returned = SCAN_RETURNED_EARLY.doubleValue();
- Double paused = SCAN_START_DELAYED.doubleValue();
+ double returned = SCAN_RETURNED_EARLY.doubleValue();
+ double paused = SCAN_START_DELAYED.doubleValue();
Thread.sleep(1500);
assertEquals(currentCount, fetched.get());
assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
assertTrue(SCAN_RETURNED_EARLY.doubleValue() >= returned);
+ assertEquals(1, LOW_MEM_DETECTED.get());
// Perform the check again
paused = SCAN_START_DELAYED.doubleValue();
@@ -372,13 +391,18 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase {
Thread.sleep(1500);
assertEquals(currentCount, fetched.get());
assertTrue(SCAN_START_DELAYED.doubleValue() >= paused);
- assertTrue(SCAN_RETURNED_EARLY.doubleValue() == returned);
+ assertEquals(returned, SCAN_RETURNED_EARLY.doubleValue());
+ assertEquals(1, LOW_MEM_DETECTED.get());
// Free the memory which will allow the pausing scanner to continue
freeServerMemory(client, table);
t.join();
assertEquals(30, fetched.get());
+ // allow metic collection to cycle.
+ Thread.sleep(6_000);
+ assertEquals(0, LOW_MEM_DETECTED.get());
+
} finally {
to.delete(table);
}