You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/14 16:38:27 UTC

[lucene-solr] branch reference_impl_dev updated: @537 Straighten out overload handling a bit and re-add to ParExecutorService.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new 9c74feb  @537 Straighten out overload handling a bit and re-add to ParExecutorService.
9c74feb is described below

commit 9c74febd53fa570fdd7551238404ba838d5e3ee1
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Aug 14 11:37:24 2020 -0500

    @537 Straighten out overload handling a bit and re-add to ParExecutorService.
---
 .../org/apache/solr/servlet/SolrQoSFilter.java     | 11 ++--
 .../org/apache/solr/common/ParWorkExecService.java | 37 +++++++----
 .../java/org/apache/solr/common/util/SysStats.java | 72 ++++++++++------------
 3 files changed, 65 insertions(+), 55 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
index f7e4a41..46feec1 100644
--- a/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
+++ b/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
@@ -27,6 +27,7 @@ import javax.servlet.ServletRequest;
 import javax.servlet.ServletResponse;
 import javax.servlet.http.HttpServletRequest;
 
+import org.apache.solr.common.ParWork;
 import org.apache.solr.common.params.QoSParams;
 import org.apache.solr.common.util.SysStats;
 import org.eclipse.jetty.servlets.QoSFilter;
@@ -40,11 +41,11 @@ public class SolrQoSFilter extends QoSFilter {
   static final String MAX_REQUESTS_INIT_PARAM = "maxRequests";
   static final String SUSPEND_INIT_PARAM = "suspendMs";
   static final int PROC_COUNT = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors();
-  public static final int OUR_LOAD_HIGH = 99;
+
   protected int _origMaxRequests;
 
 
-  private static SysStats sysStats = SysStats.getSysStats();
+  private static SysStats sysStats = ParWork.getSysStats();
 
   @Override
   public void init(FilterConfig filterConfig) {
@@ -62,9 +63,9 @@ public class SolrQoSFilter extends QoSFilter {
     String source = req.getHeader(QoSParams.REQUEST_SOURCE);
     boolean imagePath = req.getPathInfo() != null && req.getPathInfo().startsWith("/img/");
     if (!imagePath && (source == null || !source.equals(QoSParams.INTERNAL))) {
-      double ourLoad = sysStats.getAvarageUsagePerCPU();
-      if (ourLoad > OUR_LOAD_HIGH) {
-        log.info("Our individual load is {}", ourLoad);
+      double ourLoad = sysStats.getTotalUsage();
+      log.info("Our individual load is {}", ourLoad);
+      if (ourLoad > SysStats.OUR_LOAD_HIGH) {
         int cMax = getMaxRequests();
         if (cMax > 2) {
           int max = Math.max(2, (int) ((double)cMax * 0.60D));
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
index ea08e9c..bd33ac2 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
@@ -2,6 +2,7 @@ package org.apache.solr.common;
 
 import org.apache.solr.common.util.CloseTracker;
 import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.common.util.SysStats;
 import org.apache.solr.common.util.TimeOut;
 import org.apache.solr.common.util.TimeSource;
 import org.eclipse.jetty.util.BlockingArrayQueue;
@@ -51,6 +52,8 @@ public class ParWorkExecService extends AbstractExecutorService {
   private volatile Worker worker;
   private volatile Future<?> workerFuture;
 
+  private SysStats sysStats = ParWork.getSysStats();
+
   private class Worker implements Runnable {
 
     Worker() {
@@ -209,7 +212,7 @@ public class ParWorkExecService extends AbstractExecutorService {
 
      //zaa System.out.println("WAIT : " + workQueue.size() + " " + available.getQueueLength() + " " + workQueue.toString());
       synchronized (awaitTerminate) {
-        awaitTerminate.wait(250);
+        awaitTerminate.wait(500);
       }
     }
 //    workQueue.clear();
@@ -260,7 +263,25 @@ public class ParWorkExecService extends AbstractExecutorService {
         }
         return;
       }
+      if (!checkLoad()) {
+        try {
+          runnable.run();
+        } finally {
+          try {
+            if (runnable instanceof ParWork.SolrFutureTask) {
 
+            } else {
+              available.release();
+            }
+          } finally {
+            ParWork.closeExecutor();
+            running.decrementAndGet();
+            synchronized (awaitTerminate) {
+              awaitTerminate.notifyAll();
+            }
+          }
+        }
+      }
     }
 
     Runnable finalRunnable = runnable;
@@ -318,23 +339,15 @@ public class ParWorkExecService extends AbstractExecutorService {
   }
 
   public boolean checkLoad() {
-    double load = ManagementFactory.getOperatingSystemMXBean()
-        .getSystemLoadAverage();
-    if (load < 0) {
-      log.warn("SystemLoadAverage not supported on this JVM");
-    }
 
-    double ourLoad = ParWork.getSysStats().getAvarageUsagePerCPU();
-
-    if (ourLoad > 99.0D) {
+    double ourLoad = ParWork.getSysStats().getTotalUsage();
+    if (ourLoad > SysStats.OUR_LOAD_HIGH) {
       return false;
     } else {
-      double sLoad = load / (double) ParWork.PROC_COUNT;
+      double sLoad = sysStats.getSystemLoad();
       if (sLoad > ParWork.PROC_COUNT) {
         return false;
       }
-      if (log.isDebugEnabled()) log.debug("ParWork, load:" + sLoad);
-
     }
     return true;
   }
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SysStats.java b/solr/solrj/src/java/org/apache/solr/common/util/SysStats.java
index 9f55f0e..b1d0e33 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/SysStats.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SysStats.java
@@ -19,8 +19,11 @@ import java.util.concurrent.TimeUnit;
 public class SysStats extends Thread {
     private static final Logger log = LoggerFactory
         .getLogger(MethodHandles.lookup().lookupClass());
-    public static final int REFRESH_INTERVAL = 5000;
+
+    public static final int OUR_LOAD_HIGH = 1;
+    public static final long REFRESH_INTERVAL = TimeUnit.NANOSECONDS.convert(5000, TimeUnit.MILLISECONDS);
     static final int PROC_COUNT = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors();
+    private final long refreshIntervalMs;
 
     private long refreshInterval;
     private  volatile boolean stopped;
@@ -31,7 +34,6 @@ public class SysStats extends Thread {
 
     private static volatile SysStats sysStats;
     private volatile double totalUsage;
-    private volatile double usagePerCPU;
     private volatile double sysLoad;
 
     public static SysStats getSysStats() {
@@ -47,6 +49,7 @@ public class SysStats extends Thread {
 
     public SysStats(long refreshInterval) {
         this.refreshInterval = refreshInterval;
+        this.refreshIntervalMs = TimeUnit.MILLISECONDS.convert(refreshInterval, TimeUnit.NANOSECONDS);
         setName("CPUMonitoringThread");
         setDaemon(true);
         start();
@@ -67,6 +70,7 @@ public class SysStats extends Thread {
     @Override
     public void run() {
         boolean gatherThreadStats = true;
+        Collection<ThreadTime> values = null;
         while(!stopped) {
             if (gatherThreadStats) {
                 Set<Long> mappedIds = new HashSet<Long>(threadTimeMap.keySet());
@@ -76,56 +80,49 @@ public class SysStats extends Thread {
                 removeDeadThreads(mappedIds, allThreadIds);
 
                 mapNewThreads(allThreadIds);
-
-                Collection<ThreadTime> values = new HashSet<ThreadTime>(
-                    threadTimeMap.values());
+                values = new HashSet<>(threadTimeMap.values());
 
                 for (ThreadTime threadTime : values) {
                     threadTime.setCurrent(threadBean.getThreadCpuTime(threadTime.getId()));
                 }
 
-                try {
-                    Thread.sleep(refreshInterval / 2);
-                    if (stopped) {
-                        return;
-                    }
-                } catch (InterruptedException e) {
-                    ParWork.propegateInterrupt(e, true);
-                    return;
-                }
-
-                for (ThreadTime threadTime : values) {
-                    threadTime.setLast(threadTime.getCurrent());
-                }
-
-                Collection<ThreadTime> vals;
-                vals = new HashSet<ThreadTime>(threadTimeMap.values());
-
-                double usage = 0D;
-                for (ThreadTime threadTime : vals) {
-                    usage += (threadTime.getCurrent() - threadTime.getLast()) / (
-                        refreshInterval * REFRESH_INTERVAL);
-                }
-                totalUsage = usage;
-                usagePerCPU = getTotalUsage() / ParWork.PROC_COUNT;
-
                 double load =  ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
                 if (load < 0) {
                     log.warn("SystemLoadAverage not supported on this JVM");
-                    load = 0;
+                } else {
+                    sysLoad = load / (double) PROC_COUNT;
                 }
-                sysLoad = load / (double) PROC_COUNT;
 
-                gatherThreadStats = false;
             } else {
                 double load =  ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
                 if (load < 0) {
                     log.warn("SystemLoadAverage not supported on this JVM");
-                    load = 0;
+                } else {
+                    sysLoad = load / (double) PROC_COUNT;
                 }
-                sysLoad = load / (double) PROC_COUNT;
                 gatherThreadStats = true;
             }
+
+
+            try {
+                Thread.sleep(refreshIntervalMs);
+//                if (stopped) {
+//                    return;
+//                }
+            } catch (InterruptedException e) {
+//                ParWork.propegateInterrupt(e, true);
+//                return;
+            }
+
+            if (gatherThreadStats) {
+                double usage = 0D;
+                for (ThreadTime threadTime : values) {
+                    threadTime.setLast(threadBean.getThreadCpuTime(threadTime.getId()));
+                    usage += ( threadTime.getLast() - threadTime.getCurrent()) / (refreshInterval * 2.0f);
+                }
+                totalUsage = usage;
+                gatherThreadStats = false;
+            }
         }
     }
 
@@ -161,12 +158,11 @@ public class SysStats extends Thread {
     }
 
     public double getTotalUsage() {
-
         return totalUsage;
     }
 
     public double getAvarageUsagePerCPU() {
-        return usagePerCPU;
+        return getTotalUsage() / ParWork.PROC_COUNT;
     }
 
     public double getUsageByThread(Thread t) {
@@ -177,7 +173,7 @@ public class SysStats extends Thread {
         double usage = 0D;
         if(info != null) {
             synchronized (info) {
-                usage = (info.getCurrent() - info.getLast()) / (TimeUnit.MILLISECONDS.toNanos(refreshInterval / 2));
+                usage = (info.getCurrent() - info.getLast()) / (TimeUnit.MILLISECONDS.toNanos(refreshInterval * 2));
             }
         }
         return usage;