You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/14 16:38:27 UTC
[lucene-solr] branch reference_impl_dev updated: @537 Straighten
out overload handling a bit and re-add to ParExecutorService.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new 9c74feb @537 Straighten out overload handling a bit and re-add to ParExecutorService.
9c74feb is described below
commit 9c74febd53fa570fdd7551238404ba838d5e3ee1
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Aug 14 11:37:24 2020 -0500
@537 Straighten out overload handling a bit and re-add to ParExecutorService.
---
.../org/apache/solr/servlet/SolrQoSFilter.java | 11 ++--
.../org/apache/solr/common/ParWorkExecService.java | 37 +++++++----
.../java/org/apache/solr/common/util/SysStats.java | 72 ++++++++++------------
3 files changed, 65 insertions(+), 55 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
index f7e4a41..46feec1 100644
--- a/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
+++ b/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
@@ -27,6 +27,7 @@ import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
+import org.apache.solr.common.ParWork;
import org.apache.solr.common.params.QoSParams;
import org.apache.solr.common.util.SysStats;
import org.eclipse.jetty.servlets.QoSFilter;
@@ -40,11 +41,11 @@ public class SolrQoSFilter extends QoSFilter {
static final String MAX_REQUESTS_INIT_PARAM = "maxRequests";
static final String SUSPEND_INIT_PARAM = "suspendMs";
static final int PROC_COUNT = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors();
- public static final int OUR_LOAD_HIGH = 99;
+
protected int _origMaxRequests;
- private static SysStats sysStats = SysStats.getSysStats();
+ private static SysStats sysStats = ParWork.getSysStats();
@Override
public void init(FilterConfig filterConfig) {
@@ -62,9 +63,9 @@ public class SolrQoSFilter extends QoSFilter {
String source = req.getHeader(QoSParams.REQUEST_SOURCE);
boolean imagePath = req.getPathInfo() != null && req.getPathInfo().startsWith("/img/");
if (!imagePath && (source == null || !source.equals(QoSParams.INTERNAL))) {
- double ourLoad = sysStats.getAvarageUsagePerCPU();
- if (ourLoad > OUR_LOAD_HIGH) {
- log.info("Our individual load is {}", ourLoad);
+ double ourLoad = sysStats.getTotalUsage();
+ log.info("Our individual load is {}", ourLoad);
+ if (ourLoad > SysStats.OUR_LOAD_HIGH) {
int cMax = getMaxRequests();
if (cMax > 2) {
int max = Math.max(2, (int) ((double)cMax * 0.60D));
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
index ea08e9c..bd33ac2 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
@@ -2,6 +2,7 @@ package org.apache.solr.common;
import org.apache.solr.common.util.CloseTracker;
import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.common.util.SysStats;
import org.apache.solr.common.util.TimeOut;
import org.apache.solr.common.util.TimeSource;
import org.eclipse.jetty.util.BlockingArrayQueue;
@@ -51,6 +52,8 @@ public class ParWorkExecService extends AbstractExecutorService {
private volatile Worker worker;
private volatile Future<?> workerFuture;
+ private SysStats sysStats = ParWork.getSysStats();
+
private class Worker implements Runnable {
Worker() {
@@ -209,7 +212,7 @@ public class ParWorkExecService extends AbstractExecutorService {
//zaa System.out.println("WAIT : " + workQueue.size() + " " + available.getQueueLength() + " " + workQueue.toString());
synchronized (awaitTerminate) {
- awaitTerminate.wait(250);
+ awaitTerminate.wait(500);
}
}
// workQueue.clear();
@@ -260,7 +263,25 @@ public class ParWorkExecService extends AbstractExecutorService {
}
return;
}
+ if (!checkLoad()) {
+ try {
+ runnable.run();
+ } finally {
+ try {
+ if (runnable instanceof ParWork.SolrFutureTask) {
+ } else {
+ available.release();
+ }
+ } finally {
+ ParWork.closeExecutor();
+ running.decrementAndGet();
+ synchronized (awaitTerminate) {
+ awaitTerminate.notifyAll();
+ }
+ }
+ }
+ }
}
Runnable finalRunnable = runnable;
@@ -318,23 +339,15 @@ public class ParWorkExecService extends AbstractExecutorService {
}
public boolean checkLoad() {
- double load = ManagementFactory.getOperatingSystemMXBean()
- .getSystemLoadAverage();
- if (load < 0) {
- log.warn("SystemLoadAverage not supported on this JVM");
- }
- double ourLoad = ParWork.getSysStats().getAvarageUsagePerCPU();
-
- if (ourLoad > 99.0D) {
+ double ourLoad = ParWork.getSysStats().getTotalUsage();
+ if (ourLoad > SysStats.OUR_LOAD_HIGH) {
return false;
} else {
- double sLoad = load / (double) ParWork.PROC_COUNT;
+ double sLoad = sysStats.getSystemLoad();
if (sLoad > ParWork.PROC_COUNT) {
return false;
}
- if (log.isDebugEnabled()) log.debug("ParWork, load:" + sLoad);
-
}
return true;
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SysStats.java b/solr/solrj/src/java/org/apache/solr/common/util/SysStats.java
index 9f55f0e..b1d0e33 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/SysStats.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SysStats.java
@@ -19,8 +19,11 @@ import java.util.concurrent.TimeUnit;
public class SysStats extends Thread {
private static final Logger log = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
- public static final int REFRESH_INTERVAL = 5000;
+
+ public static final int OUR_LOAD_HIGH = 1;
+ public static final long REFRESH_INTERVAL = TimeUnit.NANOSECONDS.convert(5000, TimeUnit.MILLISECONDS);
static final int PROC_COUNT = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors();
+ private final long refreshIntervalMs;
private long refreshInterval;
private volatile boolean stopped;
@@ -31,7 +34,6 @@ public class SysStats extends Thread {
private static volatile SysStats sysStats;
private volatile double totalUsage;
- private volatile double usagePerCPU;
private volatile double sysLoad;
public static SysStats getSysStats() {
@@ -47,6 +49,7 @@ public class SysStats extends Thread {
public SysStats(long refreshInterval) {
this.refreshInterval = refreshInterval;
+ this.refreshIntervalMs = TimeUnit.MILLISECONDS.convert(refreshInterval, TimeUnit.NANOSECONDS);
setName("CPUMonitoringThread");
setDaemon(true);
start();
@@ -67,6 +70,7 @@ public class SysStats extends Thread {
@Override
public void run() {
boolean gatherThreadStats = true;
+ Collection<ThreadTime> values = null;
while(!stopped) {
if (gatherThreadStats) {
Set<Long> mappedIds = new HashSet<Long>(threadTimeMap.keySet());
@@ -76,56 +80,49 @@ public class SysStats extends Thread {
removeDeadThreads(mappedIds, allThreadIds);
mapNewThreads(allThreadIds);
-
- Collection<ThreadTime> values = new HashSet<ThreadTime>(
- threadTimeMap.values());
+ values = new HashSet<>(threadTimeMap.values());
for (ThreadTime threadTime : values) {
threadTime.setCurrent(threadBean.getThreadCpuTime(threadTime.getId()));
}
- try {
- Thread.sleep(refreshInterval / 2);
- if (stopped) {
- return;
- }
- } catch (InterruptedException e) {
- ParWork.propegateInterrupt(e, true);
- return;
- }
-
- for (ThreadTime threadTime : values) {
- threadTime.setLast(threadTime.getCurrent());
- }
-
- Collection<ThreadTime> vals;
- vals = new HashSet<ThreadTime>(threadTimeMap.values());
-
- double usage = 0D;
- for (ThreadTime threadTime : vals) {
- usage += (threadTime.getCurrent() - threadTime.getLast()) / (
- refreshInterval * REFRESH_INTERVAL);
- }
- totalUsage = usage;
- usagePerCPU = getTotalUsage() / ParWork.PROC_COUNT;
-
double load = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
if (load < 0) {
log.warn("SystemLoadAverage not supported on this JVM");
- load = 0;
+ } else {
+ sysLoad = load / (double) PROC_COUNT;
}
- sysLoad = load / (double) PROC_COUNT;
- gatherThreadStats = false;
} else {
double load = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
if (load < 0) {
log.warn("SystemLoadAverage not supported on this JVM");
- load = 0;
+ } else {
+ sysLoad = load / (double) PROC_COUNT;
}
- sysLoad = load / (double) PROC_COUNT;
gatherThreadStats = true;
}
+
+
+ try {
+ Thread.sleep(refreshIntervalMs);
+// if (stopped) {
+// return;
+// }
+ } catch (InterruptedException e) {
+// ParWork.propegateInterrupt(e, true);
+// return;
+ }
+
+ if (gatherThreadStats) {
+ double usage = 0D;
+ for (ThreadTime threadTime : values) {
+ threadTime.setLast(threadBean.getThreadCpuTime(threadTime.getId()));
+ usage += ( threadTime.getLast() - threadTime.getCurrent()) / (refreshInterval * 2.0f);
+ }
+ totalUsage = usage;
+ gatherThreadStats = false;
+ }
}
}
@@ -161,12 +158,11 @@ public class SysStats extends Thread {
}
public double getTotalUsage() {
-
return totalUsage;
}
public double getAvarageUsagePerCPU() {
- return usagePerCPU;
+ return getTotalUsage() / ParWork.PROC_COUNT;
}
public double getUsageByThread(Thread t) {
@@ -177,7 +173,7 @@ public class SysStats extends Thread {
double usage = 0D;
if(info != null) {
synchronized (info) {
- usage = (info.getCurrent() - info.getLast()) / (TimeUnit.MILLISECONDS.toNanos(refreshInterval / 2));
+ usage = (info.getCurrent() - info.getLast()) / (TimeUnit.MILLISECONDS.toNanos(refreshInterval * 2));
}
}
return usage;