You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/01 14:34:46 UTC

[lucene-solr] 01/02: @665 Executors, a place for everything and everything in it's place.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 9ee3b9a9a8af504e8cca36035fba59e1f35fff39
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Tue Sep 1 09:12:53 2020 -0500

    @665 Executors, a place for everything and everything in it's place.
---
 .../src/java/org/apache/solr/cloud/Overseer.java   |  2 +-
 .../org/apache/solr/search/TestRealTimeGet.java    |  4 +-
 .../src/java/org/apache/solr/common/ParWork.java   |  8 +--
 .../org/apache/solr/common/ParWorkExecutor.java    |  2 +-
 .../solr/common/cloud/ConnectionManager.java       |  3 +-
 .../org/apache/solr/common/util/ExecutorUtil.java  |  6 ++
 .../solr/common/util/SolrQueuedThreadPool.java     | 76 ++++++++++++++--------
 .../solrj/impl/CloudHttp2SolrClientTest.java       |  3 +-
 .../src/java/org/apache/solr/SolrTestCase.java     | 14 ++--
 .../solr/cloud/AbstractFullDistribZkTestBase.java  | 14 ----
 .../org/apache/solr/cloud/SolrCloudTestCase.java   |  5 --
 .../java/org/apache/solr/cloud/ZkTestServer.java   | 14 +---
 12 files changed, 75 insertions(+), 76 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index e1d6a56..22c176a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -555,7 +555,7 @@ public class Overseer implements SolrCloseable {
       try {
         super.run();
       } finally {
-        ParWork.closeExecutor();
+        ParWork.closeMyPerThreadExecutor();
       }
     }
 
diff --git a/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java b/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
index 527f027..bf8d43f 100644
--- a/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
+++ b/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
@@ -685,7 +685,7 @@ public class TestRealTimeGet extends TestRTGBase {
             operations.set(-1L);
             throw new RuntimeException(e);
           } finally {
-            ParWork.closeExecutor();
+            ParWork.closeMyPerThreadExecutor();
           }
         }
       };
@@ -764,7 +764,7 @@ public class TestRealTimeGet extends TestRTGBase {
             operations.set(-1L);
             throw new RuntimeException(e);
           } finally {
-            ParWork.closeExecutor();
+            ParWork.closeMyPerThreadExecutor();
           }
         }
       };
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 7b99b45..e16bf5f 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -85,7 +85,7 @@ public class ParWork implements Closeable {
   }
 
 
-  public static void shutdownExec() {
+  public static void shutdownRootSharedExec() {
     synchronized (ParWork.class) {
       if (EXEC != null) {
         ((ParWorkExecutor)EXEC).closeLock(false);
@@ -105,11 +105,11 @@ public class ParWork implements Closeable {
     return sysStats;
   }
 
-  public static void closeExecutor() {
-    closeExecutor(true);
+  public static void closeMyPerThreadExecutor() {
+    closeMyPerThreadExecutor(true);
   }
 
-  public static void closeExecutor(boolean unlockClose) {
+  public static void closeMyPerThreadExecutor(boolean unlockClose) {
     PerThreadExecService exec = (PerThreadExecService) THREAD_LOCAL_EXECUTOR.get();
     if (exec != null) {
       if (unlockClose) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
index df6ad7f..22d5d17 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
@@ -95,7 +95,7 @@ public class ParWorkExecutor extends ThreadPoolExecutor {
           try {
             r.run();
           } finally {
-            ParWork.closeExecutor();
+            ParWork.closeMyPerThreadExecutor();
           }
         }
       };
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
index d1a54bb..94da4d3 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
 import static org.apache.zookeeper.Watcher.Event.KeeperState.AuthFailed;
 import static org.apache.zookeeper.Watcher.Event.KeeperState.Disconnected;
 import static org.apache.zookeeper.Watcher.Event.KeeperState.Expired;
-import static org.apache.zookeeper.Watcher.Event.KeeperState.fromInt;
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
@@ -331,7 +330,7 @@ public class ConnectionManager implements Watcher, Closeable {
       } while (!isClosed() || Thread.currentThread().isInterrupted());
     } finally {
       ParWork
-          .closeExecutor(); // we are using the root exec directly, let's just make sure it's closed here to avoid a slight delay leak
+          .closeMyPerThreadExecutor(); // we are using the root exec directly, let's just make sure it's closed here to avoid a slight delay leak
     }
     log.info("zkClient Connected: {}", connected);
   }
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
index 369d4b4..0eeb9e7 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
@@ -33,6 +33,7 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
 
@@ -81,8 +82,13 @@ public class ExecutorUtil {
     boolean shutdown = false;
     // if interrupted, we still wait a short time for thread stoppage, but then quickly bail
     TimeOut interruptTimeout = new TimeOut(3000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+    TimeOut shutdownTimeout = new TimeOut(30000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
     boolean interrupted = false;
     do {
+      if (shutdownTimeout.hasTimedOut()) {
+        throw new RuntimeException("Timeout waiting for executor to shutdown");
+      }
+
       try {
         // Wait a while for existing tasks to terminate
         shutdown = pool.awaitTermination(30, TimeUnit.SECONDS);
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
index 934fd5c..6bf7538 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
@@ -713,16 +713,7 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
                 s.getThreadGroup() :
                 Thread.currentThread().getThreadGroup();
         }
-        Thread thread = new Thread(group, "") {
-            @Override
-            public void run() {
-                try {
-                    runnable.run();
-                } finally {
-                    ParWork.closeExecutor();
-                }
-            }
-        };
+        Thread thread = new MyThread(group, runnable);
         thread.setDaemon(isDaemon());
         thread.setPriority(getThreadsPriority());
         thread.setName(_name + "-" + thread.getId());
@@ -772,23 +763,7 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
 
             if (isDetailedDump())
             {
-                threads.add(new Dumpable()
-                {
-                    @Override
-                    public void dump(Appendable out, String indent) throws IOException
-                    {
-                        if (StringUtil.isBlank(known))
-                            Dumpable.dumpObjects(out, indent, String.format(Locale.ROOT,"%s %s %s %d", thread.getId(), thread.getName(), thread.getState(), thread.getPriority()), (Object[])trace);
-                        else
-                            Dumpable.dumpObjects(out, indent, String.format(Locale.ROOT,"%s %s %s %s %d", thread.getId(), thread.getName(), known, thread.getState(), thread.getPriority()));
-                    }
-
-                    @Override
-                    public String dump()
-                    {
-                        return null;
-                    }
-                });
+                threads.add(new MyDumpable(known, thread, trace));
             }
             else
             {
@@ -845,7 +820,7 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
             log.error("Error in Jetty thread pool thread", error);
             this.error = error;
         } finally {
-            ParWork.closeExecutor();
+            ParWork.closeMyPerThreadExecutor();
         }
 
         synchronized (notify) {
@@ -912,6 +887,51 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
         return null;
     }
 
+    private static class MyThread extends Thread {
+        private final Runnable runnable;
+
+        public MyThread(ThreadGroup group, Runnable runnable) {
+            super(group, "");
+            this.runnable = runnable;
+        }
+
+        @Override
+        public void run() {
+            try {
+                runnable.run();
+            } finally {
+                ParWork.closeMyPerThreadExecutor();
+            }
+        }
+    }
+
+    private static class MyDumpable implements Dumpable {
+        private final String known;
+        private final Thread thread;
+        private final StackTraceElement[] trace;
+
+        public MyDumpable(String known, Thread thread, StackTraceElement[] trace) {
+            this.known = known;
+            this.thread = thread;
+            this.trace = trace;
+        }
+
+        @Override
+        public void dump(Appendable out, String indent) throws IOException
+        {
+            if (StringUtil.isBlank(known))
+                Dumpable.dumpObjects(out, indent, String.format(Locale.ROOT,"%s %s %s %d", thread.getId(), thread.getName(), thread.getState(), thread.getPriority()), (Object[]) trace);
+            else
+                Dumpable.dumpObjects(out, indent, String.format(Locale.ROOT,"%s %s %s %s %d", thread.getId(), thread.getName(), known, thread.getState(), thread.getPriority()));
+        }
+
+        @Override
+        public String dump()
+        {
+            return null;
+        }
+    }
+
     private class Runner implements Runnable
     {
         CountDownLatch latch = new CountDownLatch(1);
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
index 51eba5f0..e43c08e 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
@@ -137,13 +137,12 @@ public class CloudHttp2SolrClientTest extends SolrCloudTestCase {
         zkBasedCloudSolrClient = null;
       }
     }
-    super.tearDown();
-
     // clear the shared collection before next test run
     final CloudHttp2SolrClient solrClient = cluster.getSolrClient();
     if (CollectionAdminRequest.listCollections(solrClient).contains(COLLECTION)) {
       solrClient.deleteByQuery(COLLECTION, "*:*");
     }
+    super.tearDown();
   }
 
   private void createTestCollectionIfNeeded() throws IOException, SolrServerException {
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
index 925f138..cf89ea1 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
@@ -122,6 +122,8 @@ public class SolrTestCase extends LuceneTestCase {
 
   protected volatile static ExecutorService testExecutor;
 
+  protected static volatile SolrQueuedThreadPool qtp;
+
   @Rule
   public TestRule solrTestRules =
           RuleChain.outerRule(new SystemPropertiesRestoreRule()).around(new SolrTestWatcher());
@@ -434,13 +436,17 @@ public class SolrTestCase extends LuceneTestCase {
     log.info("@After Class ------------------------------------------------------");
     try {
 
-      ParWork.closeExecutor(true);
-
-      ParWork.shutdownExec();
-
+      SolrQueuedThreadPool fqtp = qtp;
+      if (fqtp != null) {
+        fqtp.close();
+        qtp = null;
+      }
 
       SysStats.getSysStats().stopMonitor();
 
+      ParWork.closeMyPerThreadExecutor(true);
+      ParWork.shutdownRootSharedExec();
+
       if (!failed && suiteFailureMarker.wasSuccessful() ) {
         String object = null;
         // if the tests passed, make sure everything was closed / released
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
index 59841c3..73cacf0 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
@@ -119,8 +119,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static Path confDir;
 
-  private static SolrQueuedThreadPool qtp;
-
   @BeforeClass
   public static void beforeFullSolrCloudTest() throws IOException {
     qtp = getQtp();
@@ -135,12 +133,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
   @Before
   public void setup() throws IOException {
 
-
-
   }
 
-
-
   private static void copyConfigFileToTmpConf(Path confDir, String file) throws IOException {
     Files.copy(Paths.get(SolrTestCaseJ4.TEST_HOME(), "collection1", "conf", file),
             Paths.get(confDir.toString(), file), StandardCopyOption.REPLACE_EXISTING);
@@ -329,14 +323,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
 
   @AfterClass
   public static void afterClass() throws Exception {
-    if (qtp != null) {
-
-      qtp.close();
-      qtp = null;
-    }
-
-    System.clearProperty("solrcloud.update.delay");
-    System.clearProperty("genericCoreNodeNames");
 
   }
 
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
index df8b7eb..8c6e4be 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
@@ -90,7 +90,6 @@ public class SolrCloudTestCase extends SolrTestCase {
 
   public static final int DEFAULT_TIMEOUT = 15; // this is SECONDS, not MILLIS
   public static final TimeUnit DEFAULT_TIMEOUT_UNIT = TimeUnit.SECONDS;
-  private static volatile SolrQueuedThreadPool qtp;
 
   private static class Config {
     final String name;
@@ -303,10 +302,6 @@ public class SolrCloudTestCase extends SolrTestCase {
         cluster = null;
       }
     }
-    if (qtp != null) {
-      qtp.stop();
-      qtp = null;
-    }
   }
 
   @Before
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
index dc7393a..78bf83a 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
@@ -30,21 +30,17 @@ import java.net.UnknownHostException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.management.JMException;
 
@@ -53,21 +49,13 @@ import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkMaintenanceUtils;
-import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.solr.common.util.CloseTracker;
 import org.apache.solr.common.util.ObjectReleaseTracker;
-import org.apache.solr.common.util.TimeOut;
-import org.apache.solr.common.util.TimeSource;
-import org.apache.solr.common.util.Utils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Op;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeperExposed;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.jmx.ManagedUtil;
 import org.apache.zookeeper.server.NIOServerCnxn;
@@ -588,7 +576,7 @@ public class ZkTestServer implements Closeable {
               log.error("zkServer error", t);
             }
           } finally {
-            ParWork.closeExecutor();
+            ParWork.closeMyPerThreadExecutor();
           }
         }
       };