You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/07/29 17:44:32 UTC

[lucene-solr] 02/02: @454 Working on good shutdown.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit de28d2f8e15c903a97b7cf351589c193429e15e2
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Jul 29 12:42:10 2020 -0500

    @454 Working on good shutdown.
---
 .../client/solrj/embedded/JettySolrRunner.java     | 33 ++++++++++++++++++----
 .../java/org/apache/solr/core/CoreContainer.java   | 11 ++++----
 .../src/java/org/apache/solr/core/SolrCore.java    |  3 +-
 .../apache/solr/AnalysisAfterCoreReloadTest.java   |  2 ++
 .../solr/client/solrj/io/SolrClientCache.java      | 14 ++++-----
 .../src/java/org/apache/solr/common/ParWork.java   | 22 ++++++++++-----
 .../solr/common/util/SolrQueuedThreadPool.java     |  6 ----
 .../apache/solr/common/util/ValidatingJsonMap.java |  2 +-
 .../src/java/org/apache/solr/SolrTestCase.java     |  2 ++
 .../org/apache/solr/cloud/SolrCloudTestCase.java   | 18 +++++++++++-
 10 files changed, 78 insertions(+), 35 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index e21c8c3..f9d153b 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -155,6 +155,7 @@ public class JettySolrRunner implements Closeable {
 
 
   private static Scheduler scheduler = new SolrHttpClientScheduler("JettySolrRunnerScheduler", true, null, new ThreadGroup("JettySolrRunnerScheduler"), 1);
+  private volatile SolrQueuedThreadPool qtp;
 
   public String getContext() {
     return config.context;
@@ -277,7 +278,7 @@ public class JettySolrRunner implements Closeable {
    * @param enableProxy       enables proxy feature to disable connections
    */
   public JettySolrRunner(String solrHome, Properties nodeProperties, JettyConfig config, boolean enableProxy) {
-    ObjectReleaseTracker.track(this);
+    assert ObjectReleaseTracker.track(this);
     this.enableProxy = enableProxy;
     this.solrHome = solrHome;
     this.config = config;
@@ -297,8 +298,7 @@ public class JettySolrRunner implements Closeable {
   }
 
   private void init(int port) {
-
-    SolrQueuedThreadPool qtp;
+    
     if (config.qtp != null) {
       qtp = config.qtp;
     } else {
@@ -745,7 +745,26 @@ public class JettySolrRunner implements Closeable {
     Map<String,String> prevContext = MDC.getCopyOfContextMap();
     MDC.clear();
     try {
-      server.stop();
+      try (ParWork closer = new ParWork(this)) {
+        closer.collect(() -> {
+          try {
+            server.stop();
+          } catch (Exception e) {
+            log.error("Error stopping jetty server", e);
+          }
+        });
+        closer.collect(() -> {
+          try {
+            if (config.qtp == null) {
+              qtp.waitForStopping();
+            }
+          } catch (InterruptedException e) {
+            ParWork.propegateInterrupt(e);
+          }
+        });
+        closer.addCollect("stopServer");
+      }
+
 
       try {
 
@@ -755,6 +774,10 @@ public class JettySolrRunner implements Closeable {
         throw new RuntimeException(e);
       }
 
+      if (config.qtp == null) {
+      //  qtp.stop();
+      }
+
     } catch (Exception e) {
       SolrZkClient.checkInterrupted(e);
       log.error("", e);
@@ -794,7 +817,7 @@ public class JettySolrRunner implements Closeable {
 //          }
 //        }
 //      }
-      ObjectReleaseTracker.release(this);
+      assert ObjectReleaseTracker.release(this);
       if (prevContext != null) {
         MDC.setContextMap(prevContext);
       } else {
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index b170673..ce08166 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -346,7 +346,7 @@ public class CoreContainer implements Closeable {
     this.asyncSolrCoreLoad = asyncSolrCoreLoad;
 
     this.replayUpdatesExecutor = new OrderedExecutor( cfg.getReplayUpdatesThreads(),
-            ParWork.getExecutorService( cfg.getReplayUpdatesThreads(), cfg.getReplayUpdatesThreads(), 1000));
+            ParWork.getExecutorService( cfg.getReplayUpdatesThreads(), cfg.getReplayUpdatesThreads(), 250));
 
     metricManager = new SolrMetricManager(loader, cfg.getMetricsConfig());
     String registryName = SolrMetricManager.getRegistryName(SolrInfoBean.Group.node);
@@ -1047,9 +1047,9 @@ public class CoreContainer implements Closeable {
 
   @Override
   public void close() throws IOException {
-    if (this.isShutDown) {
-      return;
-    }
+//    if (this.isShutDown) {
+//      return;
+//    }
     log.info("Closing CoreContainer");
     // must do before isShutDown=true
     if (isZooKeeperAware()) {
@@ -1159,10 +1159,9 @@ public class CoreContainer implements Closeable {
 
       closer.add("zkSys", zkSys);
 
-      closer.add("shardHandlers", shardHandlerFactory, updateShardHandler);
       closer.add("loader", loader);
 
-
+      closer.add("shardHandlers", shardHandlerFactory, updateShardHandler);
      }
 
     assert ObjectReleaseTracker.release(this);
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index d9ab2ba..0fb8bf2 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -1873,8 +1873,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
   private final LinkedList<RefCounted<SolrIndexSearcher>> _searchers = new LinkedList<>();
   private final LinkedList<RefCounted<SolrIndexSearcher>> _realtimeSearchers = new LinkedList<>();
 
-  final ExecutorService searcherExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(
-      new SolrNamedThreadFactory("searcherExecutor"));
+  final ExecutorService searcherExecutor = ParWork.getExecutorService(0, 2, 250);
   private AtomicInteger onDeckSearchers = new AtomicInteger();  // number of searchers preparing
   // Lock ordering: one can acquire the openSearcherLock and then the searcherLock, but not vice-versa.
   private final Object searcherLock = new Object();  // the sync object for the searcher
diff --git a/solr/core/src/test/org/apache/solr/AnalysisAfterCoreReloadTest.java b/solr/core/src/test/org/apache/solr/AnalysisAfterCoreReloadTest.java
index 4941bd1..92f0f67 100644
--- a/solr/core/src/test/org/apache/solr/AnalysisAfterCoreReloadTest.java
+++ b/solr/core/src/test/org/apache/solr/AnalysisAfterCoreReloadTest.java
@@ -27,10 +27,12 @@ import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.core.SolrCore;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 
 import java.io.File;
 import java.io.IOException;
 
+@Ignore
 public class AnalysisAfterCoreReloadTest extends SolrTestCaseJ4 {
   
   private static String tmpSolrHome;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
index 7817021..7308eab 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
@@ -30,6 +30,7 @@ import org.apache.http.client.HttpClient;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.common.ParWork;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +53,7 @@ public class SolrClientCache implements Serializable, Closeable {
 
   public SolrClientCache(HttpClient httpClient) {
     this.httpClient = httpClient;
-    ObjectReleaseTracker.track(this);
+    assert ObjectReleaseTracker.track(this);
   }
 
   public synchronized CloudSolrClient getCloudSolrClient(String zkHost) {
@@ -90,14 +91,13 @@ public class SolrClientCache implements Serializable, Closeable {
   }
 
   public synchronized void close() {
-    for(Map.Entry<String, SolrClient> entry : solrClients.entrySet()) {
-      try {
-        entry.getValue().close();
-      } catch (IOException e) {
-        log.error("Error closing SolrClient for {}", entry.getKey(), e);
+    try (ParWork closer = new ParWork(this, true)) {
+      for (Map.Entry<String, SolrClient> entry : solrClients.entrySet()) {
+        closer.collect(entry.getValue());
       }
+      closer.addCollect("solrClients");
     }
     solrClients.clear();
-    ObjectReleaseTracker.release(this);
+    assert ObjectReleaseTracker.release(this);
   }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 7cd9b92..06cb172 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -67,7 +67,6 @@ public class ParWork implements Closeable {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   protected final static ThreadLocal<ExecutorService> THREAD_LOCAL_EXECUTOR = new ThreadLocal<>();
-  public static final int MIN_THREADS = 3;
 
   private Set<Object> collectSet = null;
 
@@ -529,7 +528,7 @@ public class ParWork implements Closeable {
 //                List<Future<Object>> results = executor.invokeAll(closeCalls, 8, TimeUnit.SECONDS);
 
                 for (Future<Object> future : results) {
-                  future.get();
+                  future.get(10000, TimeUnit.MILLISECONDS); // nocommit
                   if (!future.isDone() || future.isCancelled()) {
                     log.warn("A task did not finish isDone={} isCanceled={}", future.isDone(), future.isCancelled());
                   //  throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "A task did nor finish" +future.isDone()  + " " + future.isCancelled());
@@ -539,6 +538,7 @@ public class ParWork implements Closeable {
               } catch (InterruptedException e1) {
                 log.warn(WORK_WAS_INTERRUPTED);
                 Thread.currentThread().interrupt();
+                return;
               }
             }
           }
@@ -580,6 +580,10 @@ public class ParWork implements Closeable {
   public static void sizePoolByLoad() {
     Integer maxPoolsSize = getMaxPoolSize();
 
+    Integer minThreads;
+
+    minThreads = Integer.getInteger("solr.per_thread_exec.min_threads", 3);
+
     ThreadPoolExecutor executor = (ThreadPoolExecutor) getExecutor();
     double load =  ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
     if (load < 0) {
@@ -591,14 +595,14 @@ public class ParWork implements Closeable {
     if (ourLoad > 1) {
       int cMax = executor.getMaximumPoolSize();
       if (cMax > 2) {
-        executor.setMaximumPoolSize(Math.max(MIN_THREADS, (int) ((double)cMax * 0.60D)));
+        executor.setMaximumPoolSize(Math.max(minThreads, (int) ((double)cMax * 0.60D)));
       }
     } else {
       double sLoad = load / (double) PROC_COUNT;
       if (sLoad > 1.0D) {
         int cMax =  executor.getMaximumPoolSize();
         if (cMax > 2) {
-          executor.setMaximumPoolSize(Math.max(MIN_THREADS, (int) ((double) cMax * 0.60D)));
+          executor.setMaximumPoolSize(Math.max(minThreads, (int) ((double) cMax * 0.60D)));
         }
       } else if (sLoad < 0.9D && maxPoolsSize != executor.getMaximumPoolSize()) {
         executor.setMaximumPoolSize(maxPoolsSize);
@@ -616,8 +620,12 @@ public class ParWork implements Closeable {
         log.debug("Starting a new executor");
       }
 
-      // figure out thread usage - maybe try to adjust based on current thread count
-      exec = getExecutorService(0, Math.max(4, Runtime.getRuntime().availableProcessors() / 3), 1);
+      Integer minThreads;
+      Integer maxThreads;
+      minThreads = Integer.getInteger("solr.per_thread_exec.min_threads", 3);
+      maxThreads = Integer.getInteger("solr.per_thread_exec.max_threads",  Runtime.getRuntime().availableProcessors() / 3);
+      exec = getExecutorService(0, Math.max(minThreads, maxThreads), 1); // keep alive directly affects how long a worker might
+      // be stuck in poll without an enqueue on shutdown
       THREAD_LOCAL_EXECUTOR.set(exec);
     }
 
@@ -633,7 +641,7 @@ public class ParWork implements Closeable {
 
   private static Integer getMaxPoolSize() {
     return Integer.getInteger("solr.maxThreadExecPoolSize",
-            (int) Math.max(MIN_THREADS, Math.round(Runtime.getRuntime().availableProcessors() / 3)));
+            (int) Math.max(4, Math.round(Runtime.getRuntime().availableProcessors() / 3)));
   }
 
   private void handleObject(String label, AtomicReference<Throwable> exception, final TimeTracker workUnitTracker, Object object) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
index 4dc89dd..2842759 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
@@ -191,12 +191,6 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
         ensureThreads();
     }
 
-
-    public void prepareToStop() throws Exception {
-        super.doStop();
-    }
-
-
     @Override
     protected void doStop() throws Exception
     {
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ValidatingJsonMap.java b/solr/solrj/src/java/org/apache/solr/common/util/ValidatingJsonMap.java
index a2f60b8..3272688 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/ValidatingJsonMap.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/ValidatingJsonMap.java
@@ -322,7 +322,7 @@ public class ValidatingJsonMap implements Map<String, Object>, NavigableObject {
     return new ObjectBuilder(jp) {
       @Override
       public Object newObject() throws IOException {
-        return new ValidatingJsonMap();
+        return new ValidatingJsonMap(64);
       }
     };
   }
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
index b63615e..165b645 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
@@ -214,6 +214,8 @@ public class SolrTestCase extends LuceneTestCase {
       System.setProperty("solr.tests.infostream", "false");
       System.setProperty("numVersionBuckets", "8192");
 
+    //  System.setProperty("solr.per_thread_exec.max_threads", "2");
+   //   System.setProperty("solr.per_thread_exec.min_threads", "1");
 
       System.setProperty("zookeeper.nio.numSelectorThreads", "1");
       System.setProperty("zookeeper.nio.numWorkerThreads", "3");
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
index 90032e4..e6f05b6 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
@@ -307,8 +307,24 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
       }
     }
     if (qtp != null) {
+      try (ParWork closer = new ParWork("qtp")) {
+        closer.collect(() -> {
+          try {
+            qtp.stop();
+          } catch (Exception e) {
+            log.error("Error stopping qtp", e);
+          }
+        });
+        closer.collect(() -> {
+          try {
+            qtp.waitForStopping();
+          } catch (InterruptedException e) {
+            ParWork.propegateInterrupt(e);
+          }
+        });
+        closer.addCollect("qtp_close");
+      }
 
-      qtp.stop();
       qtp = null;
     }
   }