You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/07/20 13:55:19 UTC

[lucene-solr] branch reference_impl updated: @248 - Another overseeer thread is benched, expensive test sidelined to nightly, fix up this executor.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl by this push:
     new 3765bb9  @248 - Another overseeer thread is benched, expensive test sidelined to nightly, fix up this executor.
3765bb9 is described below

commit 3765bb92bb7970e074bafe08fda0c21e9859cf8a
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Jul 20 08:54:50 2020 -0500

    @248 - Another overseeer thread is benched, expensive test sidelined to nightly, fix up this executor.
---
 .../src/java/org/apache/solr/cloud/Overseer.java   |  3 +-
 .../apache/solr/cloud/OverseerTaskProcessor.java   | 22 ++++---
 .../java/org/apache/solr/core/CoreContainer.java   | 18 +++---
 .../org/apache/solr/search/TestRangeQuery.java     |  2 +
 solr/reference_branch/prod/Dockerfile              |  8 +--
 .../src/java/org/apache/solr/common/ParWork.java   | 51 +++------------
 .../org/apache/solr/common/ParWorkExecutor.java    | 72 ++++++++++++++++++++++
 .../org/apache/solr/common/cloud/SolrZkClient.java | 33 +---------
 .../src/java/org/apache/solr/SolrTestCaseJ4.java   | 37 +----------
 9 files changed, 114 insertions(+), 132 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index fcf09ed..a925a27 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -641,7 +641,8 @@ public class Overseer implements SolrCloseable {
 
     ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
 
-    OverseerNodePrioritizer overseerPrioritizer = new OverseerNodePrioritizer(reader, getStateUpdateQueue(), adminPath, shardHandler.getShardHandlerFactory(), updateShardHandler.getUpdateOnlyHttpClient());
+    // nocommit - I don't know about this guy..
+    OverseerNodePrioritizer overseerPrioritizer = null; // new OverseerNodePrioritizer(reader, getStateUpdateQueue(), adminPath, shardHandler.getShardHandlerFactory(), updateShardHandler.getUpdateOnlyHttpClient());
     overseerCollectionConfigSetProcessor = new OverseerCollectionConfigSetProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this, overseerPrioritizer);
     ccThread = new OverseerThread(ccTg, overseerCollectionConfigSetProcessor, "OverseerCollectionConfigSetProcessor-" + id);
     ccThread.setDaemon(true);
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index 1dd7f2c..9a7a0d0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -176,17 +176,19 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
     else
       log.debug("Found already existing elements in the work-queue. Last element: {}", oldestItemInWorkQueue);
 
-    try {
-      prioritizer.prioritizeOverseerNodes(myId);
-    } catch (Exception e) {
-      ParWork.propegateInterrupt(e);
-      if (e instanceof KeeperException.SessionExpiredException) {
-        return;
-      }
-      if (e instanceof InterruptedException || e instanceof AlreadyClosedException) {
-        return;
+    if (prioritizer != null) {
+      try {
+        prioritizer.prioritizeOverseerNodes(myId);
+      } catch (Exception e) {
+        ParWork.propegateInterrupt(e);
+        if (e instanceof KeeperException.SessionExpiredException) {
+          return;
+        }
+        if (e instanceof InterruptedException || e instanceof AlreadyClosedException) {
+          return;
+        }
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
       }
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
     }
 
     try {
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 7fed407..f8e9a55 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -780,12 +780,6 @@ public class CoreContainer implements Closeable {
         metricsHandler.initializeMetrics(solrMetricsContext, METRICS_PATH);
       });
 
-      if (!Boolean.getBoolean("solr.disableMetricsHistoryHandler")) {
-        work.collect(() -> {
-          createMetricsHistoryHandler();
-        });
-      }
-
       work.collect(() -> {
         autoscalingHistoryHandler = createHandler(AUTOSCALING_HISTORY_PATH, AutoscalingHistoryHandler.class.getName(), AutoscalingHistoryHandler.class);
         metricsCollectorHandler = createHandler(MetricsCollectorHandler.HANDLER_PATH, MetricsCollectorHandler.class.getName(), MetricsCollectorHandler.class);
@@ -793,8 +787,6 @@ public class CoreContainer implements Closeable {
         metricsCollectorHandler.init(null);
       });
 
-      work.addCollect("ccload");
-
       work.collect(() -> {
         containerHandlers.put(AUTHZ_PATH, securityConfHandler);
         securityConfHandler.initializeMetrics(solrMetricsContext, AUTHZ_PATH);
@@ -808,7 +800,15 @@ public class CoreContainer implements Closeable {
         metricManager.loadReporters(metricReporters, loader, this, null, null, SolrInfoBean.Group.jetty);
       });
 
-      work.addCollect("ccload2");
+      work.addCollect("ccload");
+
+      if (!Boolean.getBoolean("solr.disableMetricsHistoryHandler")) {
+        work.collect(() -> {
+          createMetricsHistoryHandler();
+        });
+      }
+
+      work.addCollect("metricsHistoryHandlers");
     }
 
       // initialize gauges for reporting the number of cores and disk total/free
diff --git a/solr/core/src/test/org/apache/solr/search/TestRangeQuery.java b/solr/core/src/test/org/apache/solr/search/TestRangeQuery.java
index 8c5de75..f6e0c85 100644
--- a/solr/core/src/test/org/apache/solr/search/TestRangeQuery.java
+++ b/solr/core/src/test/org/apache/solr/search/TestRangeQuery.java
@@ -27,6 +27,7 @@ import java.util.Locale;
 import java.util.Map;
 
 import com.carrotsearch.hppc.IntHashSet;
+import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.TestUtil;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.SolrInputDocument;
@@ -41,6 +42,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+@LuceneTestCase.Nightly // nocommit slow
 public class TestRangeQuery extends SolrTestCaseJ4 {
   
   private final static long DATE_START_TIME_RANDOM_TEST = 1499797224224L;
diff --git a/solr/reference_branch/prod/Dockerfile b/solr/reference_branch/prod/Dockerfile
index 9925d5f..a32cf82 100644
--- a/solr/reference_branch/prod/Dockerfile
+++ b/solr/reference_branch/prod/Dockerfile
@@ -12,14 +12,12 @@ RUN chmod +x /start-solr.sh
 RUN apt-get -y update; apt-get -y upgrade; apt-get -y install ant
 
 RUN git clone https://github.com/apache/lucene-solr.git --branch reference_impl --single-branch reference_impl; \
- cd reference_impl;ant ivy-bootstrap;cd solr;ant package -Dversion=9.0.0-miller_ref_impl; \
- cp package/*miller_ref_impl/ ${INSTALL_DIR}; \
- chmod +x ${INSTALL_DIR}/reference_impl/solr/bin/solr
+ cd reference_impl;ant ivy-bootstrap;cd solr;ant package -Dversion=9.0.0-miller_ref_impl;
 
 WORKDIR ${BUILD_DIR}
 
 ENV PATH=$PATH:/opt/solr/miller_ref_impl/bin
 
 
-#ENTRYPOINT "/bin/bash"
-ENTRYPOINT "/start-solr.sh"
\ No newline at end of file
+ENTRYPOINT "/bin/bash"
+#ENTRYPOINT "/start-solr.sh"
\ No newline at end of file
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 542630f..543303e 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -66,12 +66,6 @@ public class ParWork implements Closeable {
 
   protected final static ThreadLocal<ExecutorService> THREAD_LOCAL_EXECUTOR = new ThreadLocal<>();
 
-  public static volatile int MAXIMUM_POOL_SIZE;
-  public static final long KEEP_ALIVE_TIME = 10;
-
-  public static volatile int CAPACITY = 30;
-  private static final int GROWBY = 30;
-
   private Set<Object> collectSet = null;
 
   private static SysStats sysStats = SysStats.getSysStats();
@@ -557,6 +551,8 @@ public class ParWork implements Closeable {
   }
 
   public static void sizePoolByLoad() {
+    Integer maxPoolsSize = getMaxPoolSize();
+
     ThreadPoolExecutor executor = (ThreadPoolExecutor) getExecutor();
     double load =  ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
     if (load < 0) {
@@ -577,8 +573,8 @@ public class ParWork implements Closeable {
         if (cMax > 2) {
           executor.setMaximumPoolSize(Math.max(2, (int) ((double) cMax * 0.60D)));
         }
-      } else if (sLoad < 0.9D && MAXIMUM_POOL_SIZE != executor.getMaximumPoolSize()) {
-        executor.setMaximumPoolSize(MAXIMUM_POOL_SIZE);
+      } else if (sLoad < 0.9D && maxPoolsSize != executor.getMaximumPoolSize()) {
+        executor.setMaximumPoolSize(maxPoolsSize);
       }
       if (log.isDebugEnabled()) log.debug("ParWork, load:" + sLoad); //nocommit: remove when testing is done
 
@@ -602,43 +598,16 @@ public class ParWork implements Closeable {
   }
 
   public static ExecutorService getExecutorService(int corePoolSize, int maximumPoolSize, int keepAliveTime) {
-    MAXIMUM_POOL_SIZE = Integer.getInteger("solr.maxThreadExecPoolSize",
-            (int) Math.max(2, Math.round(Runtime.getRuntime().availableProcessors() / 2.0d)));
-    CAPACITY = Integer.getInteger("solr.threadExecQueueSize", 80);
     ThreadPoolExecutor exec;
-    exec = new ThreadPoolExecutor(0, MAXIMUM_POOL_SIZE,
-            KEEP_ALIVE_TIME, TimeUnit.SECONDS,
-             new BlockingArrayQueue<>(CAPACITY, GROWBY), // size?
-             new ThreadFactory() {
-               AtomicInteger threadNumber = new AtomicInteger(1);
-               ThreadGroup group;
-
-               {
-                 SecurityManager s = System.getSecurityManager();
-                 group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
-               }
-
-               @Override
-               public Thread newThread(Runnable r) {
-                 Thread t = new Thread(group, r, "ParWork" + threadNumber.getAndIncrement(), 0);
-                 t.setDaemon(false);
-                 // t.setPriority(priority);
-                 return t;
-               }
-             }, new RejectedExecutionHandler() {
-
-       @Override
-       public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
-         log.warn("Task was rejected, running in caller thread");
-         if (executor.isShutdown() || executor.isTerminated() || executor.isTerminating()) {
-           throw new AlreadyClosedException();
-         }
-         executor.execute(r);
-       }
-     });
+    exec = new ParWorkExecutor("ParWork", getMaxPoolSize());
     return exec;
   }
 
+  private static Integer getMaxPoolSize() {
+    return Integer.getInteger("solr.maxThreadExecPoolSize",
+            (int) Math.max(6, Math.round(Runtime.getRuntime().availableProcessors())));
+  }
+
   private void handleObject(String label, AtomicReference<Throwable> exception, final TimeTracker workUnitTracker, Object object) {
     if (log.isDebugEnabled()) {
       log.debug(
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
new file mode 100644
index 0000000..6ab259d
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
@@ -0,0 +1,72 @@
+package org.apache.solr.common;
+
+import org.eclipse.jetty.util.BlockingArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ParWorkExecutor extends ThreadPoolExecutor {
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    public static final long KEEP_ALIVE_TIME = 10;
+    private static final int GROW_BY = 30;
+
+    private final Object lock = new Object();
+
+    private static AtomicInteger threadNumber = new AtomicInteger(1);
+
+    public ParWorkExecutor(String name, int maxPoolsSize) {
+        super(0,  maxPoolsSize,  KEEP_ALIVE_TIME, TimeUnit.SECONDS,   new BlockingArrayQueue<>(Integer.getInteger("solr.threadExecQueueSize", 80), GROW_BY), new ThreadFactory() {
+
+            ThreadGroup group;
+
+            {
+                SecurityManager s = System.getSecurityManager();
+                group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+            }
+
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread t = new Thread(group, r, name + threadNumber.getAndIncrement(), 0);
+                t.setDaemon(false);
+                // t.setPriority(priority);
+                return t;
+            }
+        });
+
+        setRejectedExecutionHandler(new RejectedExecutionHandler() {
+
+            @Override
+            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                log.warn("Task was rejected, running in caller thread");
+                if (executor.isShutdown() || executor.isTerminated() || executor.isTerminating()) {
+                    throw new AlreadyClosedException();
+                }
+                synchronized (lock) {
+                    try {
+                        lock.wait(10000);
+                    } catch (InterruptedException e) {
+                       ParWork.propegateInterrupt(e, true);
+                    }
+                }
+                executor.execute(r);
+            }
+        });
+    }
+
+
+
+    @Override
+    protected void afterExecute(Runnable r, Throwable t) {
+        super.afterExecute(r, t);
+        synchronized (lock) {
+            lock.notifyAll();
+        }
+    }
+
+}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index c853283..922d0f8 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -57,6 +57,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.ParWork;
+import org.apache.solr.common.ParWorkExecutor;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.StringUtils;
 import org.apache.solr.common.cloud.ConnectionManager.IsClosed;
@@ -101,37 +102,7 @@ public class SolrZkClient implements Closeable {
 
   private volatile SolrZooKeeper keeper;
 
-  private final ExecutorService zkCallbackExecutor =
-          new ThreadPoolExecutor(1, 1,
-                  30L, TimeUnit.SECONDS,
-                  new ArrayBlockingQueue<>(120), // size?
-                  new ThreadFactory() {
-                    AtomicInteger threadNumber = new AtomicInteger(1);
-                    ThreadGroup group;
-
-                    {
-                      SecurityManager s = System.getSecurityManager();
-                      group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
-                    }
-
-                    @Override
-                    public Thread newThread(Runnable r) {
-                      Thread t = new Thread(group, r, "ZkCallback" + threadNumber.getAndIncrement(), 0);
-                      t.setDaemon(false);
-                      // t.setPriority(priority);
-                      return t;
-                    }
-                  }, new RejectedExecutionHandler() {
-
-            @Override
-            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
-              log.warn("Task was rejected, running in caller thread");
-              if (executor.isShutdown() || executor.isTerminated() || executor.isTerminating()) {
-                throw new AlreadyClosedException();
-              }
-              executor.execute(r);
-            }
-          });
+  private final ExecutorService zkCallbackExecutor = new ParWorkExecutor("ZkCallback", 1);
 
   private final ExecutorService zkConnManagerCallbackExecutor =
       ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("zkConnectionManagerCallback"));
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
index 66a22ca..570fbbb 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
@@ -95,6 +95,7 @@ import org.apache.solr.cloud.IpTables;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.ParWork;
+import org.apache.solr.common.ParWorkExecutor;
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrException;
@@ -237,42 +238,8 @@ public abstract class SolrTestCaseJ4 extends SolrTestCase {
     initialRootLogLevel = StartupLoggingUtils.getLogLevelString();
     initClassLogLevels();
     resetExceptionIgnores();
-    
-    testExecutor = new ThreadPoolExecutor(0, Math.max(1, Runtime.getRuntime().availableProcessors()),
-            3000, TimeUnit.SECONDS,
-            new ArrayBlockingQueue<>(30), // size?
-            new ThreadFactory() {
-              AtomicInteger threadNumber = new AtomicInteger(1);
-              ThreadGroup group;
-
-              {
-                SecurityManager s = System.getSecurityManager();
-                group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
-              }
-
-              @Override
-              public Thread newThread(Runnable r) {
-                Thread t = new Thread(group, r, "testExecutor" + threadNumber.getAndIncrement(), 0);
-                t.setDaemon(false);
-                return t;
-              }
-            }, new RejectedExecutionHandler() {
 
-      @Override
-      public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
-        log.warn("Task was rejected, running in caller thread");
-        if (executor.isShutdown() || executor.isTerminated() || executor.isTerminating()) {
-          throw new AlreadyClosedException();
-        }
-//          try {
-//            Thread.sleep(1000);
-//          } catch (InterruptedException e) {
-//            Thread.currentThread().interrupt();
-//          }
-//          executor.execute(r);
-        r.run();
-      }
-    });
+    testExecutor = new ParWorkExecutor("testExecutor",  Math.max(1, Runtime.getRuntime().availableProcessors()));
 
     // set solr.install.dir needed by some test configs outside of the test sandbox (!)
     System.setProperty("solr.install.dir", ExternalPaths.SOURCE_HOME);