You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/21 16:46:31 UTC

[lucene-solr] branch reference_impl_dev updated: @867 Thread management.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new 697b601  @867 Thread management.
697b601 is described below

commit 697b6012f247fcd0b89395de618960c246e702f6
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Sep 21 11:46:13 2020 -0500

    @867 Thread management.
---
 .../solr/cloud/api/collections/AddReplicaCmd.java  |  2 +-
 .../api/collections/MaintainRoutedAliasCmd.java    |  2 +-
 .../OverseerCollectionMessageHandler.java          | 13 +++++
 .../apache/solr/servlet/SolrDispatchFilter.java    | 13 ++++-
 .../apache/solr/servlet/SolrShutdownHandler.java   | 60 +++++++++++-----------
 .../apache/solr/common/cloud/ZkStateReader.java    |  4 +-
 6 files changed, 60 insertions(+), 34 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
index bd734db..21e8de7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -288,7 +288,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
 
       if (onComplete != null) onComplete.run();
     } else {
-      ParWork.getRootSharedExecutor().execute(runnable);
+      ocmh.tpe.submit(runnable);
     }
 
     return createReplicas.stream()
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainRoutedAliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainRoutedAliasCmd.java
index c6a3f8e..c9c12ed 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainRoutedAliasCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MaintainRoutedAliasCmd.java
@@ -125,7 +125,7 @@ public class MaintainRoutedAliasCmd extends AliasCmd {
       switch (action.actionType) {
         case ENSURE_REMOVED:
           if (exists) {
-            ParWork.getRootSharedExecutor().submit(
+            ocmh.tpe.submit(
              () -> {
                 try {
                   deleteTargetCollection(clusterState, results, aliasName, aliasesManager, action);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 0e62b25..6cf5445 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -28,6 +28,8 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -55,6 +57,7 @@ import org.apache.solr.cloud.Stats;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.ParWork;
+import org.apache.solr.common.PerThreadExecService;
 import org.apache.solr.common.SolrCloseable;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
@@ -73,9 +76,11 @@ import org.apache.solr.common.params.CollectionParams.CollectionAction;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.common.util.SuppressForbidden;
 import org.apache.solr.common.util.TimeSource;
@@ -170,6 +175,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
   // This is used for handling mutual exclusion of the tasks.
 
   final private LockTree lockTree = new LockTree();
+  ExecutorService tpe = new PerThreadExecService(ParWork.getRootSharedExecutor(), 15, true, true);
 
   public static final Random RANDOM;
   static {
@@ -924,7 +930,14 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
       cloudManager.close();
     } catch (NullPointerException e) {
       // okay
+    } finally {
+      if (tpe != null) {
+        if (!tpe.isShutdown()) {
+          ExecutorUtil.shutdownAndAwaitTermination(tpe);
+        }
+      }
     }
+
     assert ObjectReleaseTracker.release(this);
   }
 
diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
index e15091b..5710ce2 100644
--- a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
+++ b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
@@ -125,6 +125,7 @@ public class SolrDispatchFilter extends BaseSolrFilter {
   private String registryName;
   private volatile boolean closeOnDestroy = true;
   private volatile SolrZkClient zkClient;
+  private boolean shutdownRootExec = true;
 
   /**
    * Enum to define action that needs to be processed.
@@ -390,9 +391,19 @@ public class SolrDispatchFilter extends BaseSolrFilter {
         ParWork.close(zkClient);
       }
       GlobalTracer.get().close();
+
+      assert disableRootExecShutdownForTests();
+      if (shutdownRootExec) {
+        ParWork.shutdownRootSharedExec(true);
+      }
     }
   }
-  
+
+  private boolean disableRootExecShutdownForTests() {
+    shutdownRootExec = false;
+    return true;
+  }
+
   @Override
   public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
     doFilter(request, response, chain, false);
diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrShutdownHandler.java b/solr/core/src/java/org/apache/solr/servlet/SolrShutdownHandler.java
index 2804014..cf459ac 100644
--- a/solr/core/src/java/org/apache/solr/servlet/SolrShutdownHandler.java
+++ b/solr/core/src/java/org/apache/solr/servlet/SolrShutdownHandler.java
@@ -14,35 +14,35 @@ public class SolrShutdownHandler extends ShutdownHandler {
         super("solrrocks");
     }
 
-//    protected void doShutdown(Request baseRequest, HttpServletResponse response) throws IOException {
-//        for (Connector connector : getServer().getConnectors()) {
-//            connector.shutdown();
-//        }
-//
-//        baseRequest.setHandled(true);
-//        response.setStatus(200);
-//        response.flushBuffer();
-//
-//        final Server server = getServer();
-//        new Thread() {
-//            @Override
-//            public void run() {
-//                try {
-//                    shutdownServer(server);
-//                } catch (InterruptedException e) {
-//
-//                } catch (Exception e) {
-//                    throw new RuntimeException("Shutting down server", e);
-//                }
-//            }
-//        }.start();
-//    }
-//
-//    private void shutdownServer(Server server) throws Exception
-//    {
-//        server.stop();
-//        ParWork.shutdownRootSharedExec();
-//        System.exit(0);
-//    }
+    protected void doShutdown(Request baseRequest, HttpServletResponse response) throws IOException {
+        for (Connector connector : getServer().getConnectors()) {
+            connector.shutdown();
+        }
+
+        baseRequest.setHandled(true);
+        response.setStatus(200);
+        response.flushBuffer();
+
+        final Server server = getServer();
+        new Thread() {
+            @Override
+            public void run() {
+                try {
+                    shutdownServer(server);
+                } catch (InterruptedException e) {
+
+                } catch (Exception e) {
+                    throw new RuntimeException("Shutting down server", e);
+                }
+            }
+        }.start();
+    }
+
+    private void shutdownServer(Server server) throws Exception
+    {
+        server.stop();
+        ParWork.shutdownRootSharedExec();
+        System.exit(0);
+    }
 
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index b9967a5..b4131a3 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -57,6 +57,7 @@ import org.apache.solr.common.params.AutoScalingParams;
 import org.apache.solr.common.params.CollectionAdminParams;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.util.CloseTracker;
+import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.Pair;
@@ -213,7 +214,7 @@ public class ZkStateReader implements SolrCloseable {
 
   private Set<CloudCollectionsListener> cloudCollectionsListeners = ConcurrentHashMap.newKeySet();
 
-  private final ExecutorService notifications = ParWork.getRootSharedExecutor();
+  private final ExecutorService notifications = ParWork.getExecutorService(10, true, true);
 
   private final Set<LiveNodesListener> liveNodesListeners = ConcurrentHashMap.newKeySet();
 
@@ -892,6 +893,7 @@ public class ZkStateReader implements SolrCloseable {
     log.info("Closing ZkStateReader");
     assert closeTracker.close();
     this.closed = true;
+    ExecutorUtil.shutdownAndAwaitTermination(notifications);
     try {
       if (closeClient) {
         IOUtils.closeQuietly(zkClient);