You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/28 16:58:18 UTC

[lucene-solr] 01/03: @868 Thread management 2.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 28f6f7d597a30e37e31aa4f11f1585864becbe04
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sun Sep 27 15:36:21 2020 -0500

    @868 Thread management 2.
---
 .../src/java/org/apache/solr/cloud/Overseer.java   |   9 +-
 .../apache/solr/cloud/OverseerElectionContext.java |   6 +-
 .../org/apache/solr/metrics/SolrMetricManager.java |  11 +-
 .../org/apache/solr/search/TestRealTimeGet.java    |   4 -
 .../src/java/org/apache/solr/common/ParWork.java   |  94 ++++++++-----
 .../org/apache/solr/common/ParWorkExecutor.java    |  12 +-
 .../apache/solr/common/PerThreadExecService.java   | 152 ++-------------------
 .../java/org/apache/solr/common/SolrThread.java    |  51 +++++++
 .../org/apache/solr/common/util/JavaBinCodec.java  |   4 +-
 .../solr/common/util/SolrQueuedThreadPool.java     |  13 +-
 .../src/java/org/apache/solr/SolrTestCase.java     |   8 +-
 11 files changed, 157 insertions(+), 207 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index ca3fe73..e44fba3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -53,6 +53,7 @@ import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrCloseable;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrThread;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.ConnectionManager;
 import org.apache.solr.common.cloud.DocCollection;
@@ -601,7 +602,7 @@ public class Overseer implements SolrCloseable {
     }
   }
 
-  public static class OverseerThread extends Thread implements Closeable {
+  public static class OverseerThread extends SolrThread implements Closeable {
 
     protected volatile boolean isClosed;
     private final Closeable thread;
@@ -613,11 +614,7 @@ public class Overseer implements SolrCloseable {
 
     @Override
     public void run() {
-      try {
-        super.run();
-      } finally {
-        //ParWork.closeMyPerThreadExecutor(true);
-      }
+      super.run();
     }
 
     @Override
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
index 1ec1e23..f36e203 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
@@ -49,7 +49,7 @@ final class OverseerElectionContext extends ShardLeaderElectionContextBase {
   @Override
   void runLeaderProcess(ElectionContext context, boolean weAreReplacement, int pauseBeforeStartMs) throws KeeperException,
           InterruptedException, IOException {
-    if (isClosed() || !zkClient.isConnected() || overseer.isDone()) {
+    if (isClosed() || overseer.isDone()) {
       return;
     }
 
@@ -77,7 +77,7 @@ final class OverseerElectionContext extends ShardLeaderElectionContextBase {
         log.info("Bailing on becoming leader, we are closed");
         return;
       }
-      if (!isClosed() && !overseer.getZkController().getCoreContainer().isShutDown() && !overseer.isDone() && (overseer.getUpdaterThread() == null || !overseer.getUpdaterThread().isAlive())) {
+      if (!isClosed() && !overseer.isDone() && (overseer.getUpdaterThread() == null || !overseer.getUpdaterThread().isAlive())) {
         try {
           overseer.start(id, context);
         } finally {
@@ -165,7 +165,7 @@ final class OverseerElectionContext extends ShardLeaderElectionContextBase {
 
   @Override
   public boolean isClosed() {
-    return isClosed || overseer.getCoreContainer().isShutDown() || zkClient.isClosed() || overseer.getCoreContainer().getZkController().isClosed();
+    return isClosed || !zkClient.isConnected();
   }
 }
 
diff --git a/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java b/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
index c20896b..77257c2 100644
--- a/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
+++ b/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
@@ -101,6 +101,7 @@ public class SolrMetricManager {
    * system properties. This registry is shared between instances of {@link SolrMetricManager}.
    */
   public static final String JVM_REGISTRY = REGISTRY_NAME_PREFIX + SolrInfoBean.Group.jvm.toString();
+  public static final PluginInfo[] PLUGIN_INFOS_EMPTY = new PluginInfo[0];
 
   private final ConcurrentMap<String, MetricRegistry> registries = new ConcurrentHashMap<>(32);
 
@@ -568,9 +569,9 @@ public class SolrMetricManager {
    */
   public void registerAll(String registry, MetricSet metrics, boolean force, String... metricPath) {
     MetricRegistry metricRegistry = registry(registry);
-    try (ParWork work = new ParWork(this)) {
+ //   try (ParWork work = new ParWork(this)) {
       for (Map.Entry<String,Metric> entry : metrics.getMetrics().entrySet()) {
-        work.collect("registerMetric-" + entry.getKey(), () ->{
+     //   work.collect("registerMetric-" + entry.getKey(), () ->{
           String fullName = mkName(entry.getKey(), metricPath);
           try {
             metricRegistry.register(fullName, entry.getValue());
@@ -582,9 +583,9 @@ public class SolrMetricManager {
               log.warn("Metric already registered: " + fullName);
             }
           }
-        });
+    //    });
       }
-    }
+ //   }
   }
 
   /**
@@ -1249,7 +1250,7 @@ public class SolrMetricManager {
                                                Map<String, Object> defaultInitArgs) {
     List<PluginInfo> result = new ArrayList<>();
     if (pluginInfos == null) {
-      pluginInfos = new PluginInfo[0];
+      pluginInfos = PLUGIN_INFOS_EMPTY;
     }
     for (PluginInfo info : pluginInfos) {
       String groupAttr = info.attributes.get("group");
diff --git a/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java b/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
index 2a05b4b..2748ac5 100644
--- a/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
+++ b/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
@@ -686,8 +686,6 @@ public class TestRealTimeGet extends TestRTGBase {
           } catch (Throwable e) {
             operations.set(-1L);
             throw new RuntimeException(e);
-          } finally {
-            ParWork.closeMyPerThreadExecutor();
           }
         }
       };
@@ -765,8 +763,6 @@ public class TestRealTimeGet extends TestRTGBase {
           } catch (Throwable e) {
             operations.set(-1L);
             throw new RuntimeException(e);
-          } finally {
-            ParWork.closeMyPerThreadExecutor();
           }
         }
       };
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index fc1bd2a..0efc22c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -48,6 +48,7 @@ import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.OrderedExecutor;
 import org.apache.solr.common.util.SysStats;
 import org.apache.zookeeper.KeeperException;
+import org.eclipse.jetty.util.BlockingArrayQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,7 +79,9 @@ public class ParWork implements Closeable {
     if (EXEC == null) {
       synchronized (ParWork.class) {
         if (EXEC == null) {
-          EXEC = (ThreadPoolExecutor) getParExecutorService("RootExec", Integer.getInteger("solr.rootSharedThreadPoolCoreSize", 250), Integer.MAX_VALUE, 30000, new SynchronousQueue<>());
+          EXEC = (ThreadPoolExecutor) getParExecutorService("RootExec",
+              Integer.getInteger("solr.rootSharedThreadPoolCoreSize", 250), Integer.MAX_VALUE, 30000,
+              new BlockingArrayQueue(1024));
           ((ParWorkExecutor)EXEC).enableCloseLock();
         }
       }
@@ -111,21 +114,6 @@ public class ParWork implements Closeable {
     return sysStats;
   }
 
-  public static void closeMyPerThreadExecutor() {
-    closeMyPerThreadExecutor(false);
-  }
-
-  public static void closeMyPerThreadExecutor(boolean unlockClose) {
-    PerThreadExecService exec = (PerThreadExecService) THREAD_LOCAL_EXECUTOR.get();
-    if (exec != null) {
-      if (unlockClose) {
-        exec.closeLock(false);
-      }
-      ExecutorUtil.shutdownAndAwaitTermination(exec);
-      THREAD_LOCAL_EXECUTOR.set(null);
-    }
-  }
-
     private static class WorkUnit {
     private final Set<ParObject> objects;
     private final TimeTracker tracker;
@@ -485,24 +473,33 @@ public class ParWork implements Closeable {
   }
 
   public static ExecutorService getMyPerThreadExecutor() {
-     // if (executor != null) return executor;
-    ExecutorService exec = THREAD_LOCAL_EXECUTOR.get();
-    if (exec == null) {
-      if (log.isDebugEnabled()) {
-        log.debug("Starting a new executor");
-      }
+    Thread thread = Thread.currentThread();
 
-      Integer minThreads;
-      Integer maxThreads;
-      minThreads = 3;
-      maxThreads = PROC_COUNT;
-      exec = getExecutorService(Math.max(minThreads, maxThreads)); // keep alive directly affects how long a worker might
-      ((PerThreadExecService)exec).closeLock(true);
-      // be stuck in poll without an enqueue on shutdown
-      THREAD_LOCAL_EXECUTOR.set(exec);
+    ExecutorService service = null;
+    if (thread instanceof  SolrThread) {
+      service = ((SolrThread) thread).getExecutorService();
     }
 
-    return exec;
+    if (service == null) {
+      ExecutorService exec = THREAD_LOCAL_EXECUTOR.get();
+      if (exec == null) {
+        if (log.isDebugEnabled()) {
+          log.debug("Starting a new executor");
+        }
+
+        Integer minThreads;
+        Integer maxThreads;
+        minThreads = 4;
+        maxThreads = PROC_COUNT / 2;
+        exec = getExecutorService(Math.max(minThreads, maxThreads)); // keep alive directly affects how long a worker might
+       // ((PerThreadExecService)exec).closeLock(true);
+        // be stuck in poll without an enqueue on shutdown
+        THREAD_LOCAL_EXECUTOR.set(exec);
+      }
+      service = exec;
+    }
+
+    return service;
   }
 
   public static ExecutorService getParExecutorService(String name, int corePoolSize, int maxPoolSize, int keepAliveTime, BlockingQueue queue) {
@@ -703,13 +700,44 @@ public class ParWork implements Closeable {
     public abstract Object call() throws Exception;
   }
 
-  public static class SolrFutureTask extends FutureTask {
-    public SolrFutureTask(Callable callable) {
+  public static class SolrFutureTask extends FutureTask implements SolrThread.CreateThread {
+
+    private final boolean callerThreadAllowed;
+    private final SolrThread createThread;
+
+    public SolrFutureTask(Callable callable, boolean callerThreadAllowed) {
       super(callable);
+      this.callerThreadAllowed = callerThreadAllowed;
+      Thread thread = Thread.currentThread();
+      if (thread instanceof  SolrThread) {
+        this.createThread = (SolrThread) Thread.currentThread();
+      } else {
+        this.createThread = null;
+      }
     }
 
     public SolrFutureTask(Runnable runnable, Object value) {
+      this(runnable, value, true);
+    }
+
+    public SolrFutureTask(Runnable runnable, Object value, boolean callerThreadAllowed) {
       super(runnable, value);
+      this.callerThreadAllowed = callerThreadAllowed;
+      Thread thread = Thread.currentThread();
+      if (thread instanceof  SolrThread) {
+        this.createThread = (SolrThread) Thread.currentThread();
+      } else {
+        this.createThread = null;
+      }
+    }
+
+    public boolean isCallerThreadAllowed() {
+      return callerThreadAllowed;
+    }
+
+    @Override
+    public SolrThread getCreateThread() {
+      return createThread;
     }
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
index f3768d1..8383e76 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
@@ -91,16 +91,8 @@ public class ParWorkExecutor extends ThreadPoolExecutor {
       group = (s != null)? s.getThreadGroup() :
           Thread.currentThread().getThreadGroup();
 
-      Thread t = new Thread(group,
-          name + threadNumber.getAndIncrement()) {
-        public void run() {
-          try {
-            r.run();
-          } finally {
-           // ParWork.closeMyPerThreadExecutor(true);
-          }
-        }
-      };
+      SolrThread t = new SolrThread(group, r,
+          name + threadNumber.getAndIncrement());
       t.setDaemon(true);
       return t;
     }
diff --git a/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java b/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
index f8c175d..c913269 100644
--- a/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
@@ -39,74 +39,11 @@ public class PerThreadExecService extends AbstractExecutorService {
 
   private final Object awaitTerminate = new Object();
 
-//  private final BlockingArrayQueue<Runnable> workQueue = new BlockingArrayQueue<>(30, 0);
-//  private volatile Worker worker;
-//  private volatile Future<?> workerFuture;
-
   private CloseTracker closeTracker;
 
   private SysStats sysStats = ParWork.getSysStats();
   private volatile boolean closeLock;
 
-//  private class Worker implements Runnable {
-//
-//    Worker() {
-//    //  setName("ParExecWorker");
-//    }
-//
-//    @Override
-//    public void run() {
-//      while (!terminated && !Thread.currentThread().isInterrupted()) {
-//        Runnable runnable = null;
-//        try {
-//          runnable = workQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
-//        } catch (InterruptedException e) {
-//           ParWork.propagateInterrupt(e);
-//           return;
-//        }
-//        if (runnable == null) {
-//          running.decrementAndGet();
-//          synchronized (awaitTerminate) {
-//            awaitTerminate.notifyAll();
-//          }
-//          return;
-//        }
-//
-//        if (runnable instanceof ParWork.SolrFutureTask) {
-//
-//        } else {
-//
-//          try {
-//            boolean success = available.tryAcquire();
-//            // I think if we wait here for available instead of running in caller thread
-//            // this is why we could not use the per thread executor in the stream classes
-//            // this means order cannot matter, but it should generally not matter
-//            if (!success) {
-//              runIt(runnable, true, true, false);
-//              return;
-//            }
-//          } catch (Exception e) {
-//            ParWork.propagateInterrupt(e);
-//            running.decrementAndGet();
-//            synchronized (awaitTerminate) {
-//              awaitTerminate.notifyAll();
-//            }
-//            return;
-//          }
-//
-//        }
-//
-//        Runnable finalRunnable = runnable;
-//        service.execute(new Runnable() {
-//          @Override
-//          public void run() {
-//            runIt(finalRunnable, true, false, false);
-//          }
-//        });
-//      }
-//    }
-//  }
-
   public PerThreadExecService(ExecutorService service) {
     this(service, -1);
   }
@@ -120,7 +57,6 @@ public class PerThreadExecService extends AbstractExecutorService {
     assert (closeTracker = new CloseTracker()) != null;
     this.noCallerRunsAllowed = noCallerRunsAllowed;
     this.noCallerRunsAvailableLimit = noCallerRunsAvailableLimit;
-    //assert ObjectReleaseTracker.track(this);
     if (maxSize == -1) {
       this.maxSize = MAX_AVAILABLE;
     } else {
@@ -132,55 +68,26 @@ public class PerThreadExecService extends AbstractExecutorService {
   @Override
   protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
     if (noCallerRunsAllowed) {
-      return (RunnableFuture) new ParWork.SolrFutureTask(runnable, value);
+      return (RunnableFuture) new ParWork.SolrFutureTask(runnable, value, false);
     }
-    return new FutureTask(runnable, value);
-
+    return (RunnableFuture) new ParWork.SolrFutureTask(runnable, value);
   }
 
   @Override
   protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
     if (noCallerRunsAllowed || callable instanceof ParWork.NoLimitsCallable) {
-      return (RunnableFuture) new ParWork.SolrFutureTask(callable);
+      return (RunnableFuture) new ParWork.SolrFutureTask(callable, false);
     }
-    return new FutureTask(callable);
+    return (RunnableFuture) new ParWork.SolrFutureTask(callable, true);
   }
 
   @Override
   public void shutdown() {
-    if (closeLock) {
-      throw new IllegalCallerException();
-    }
+//    if (closeLock) {
+//      throw new IllegalCallerException();
+//    }
     assert ObjectReleaseTracker.release(this);
-    // assert closeTracker.close();
     this.shutdown = true;
-   // worker.interrupt();
-  //  workQueue.clear();
-//    try {
-//      workQueue.offer(new Runnable() {
-//        @Override
-//        public void run() {
-//          // noop to wake from take
-//        }
-//      });
-//      workQueue.offer(new Runnable() {
-//        @Override
-//        public void run() {
-//          // noop to wake from take
-//        }
-//      });
-//      workQueue.offer(new Runnable() {
-//        @Override
-//        public void run() {
-//          // noop to wake from take
-//        }
-//      });
-
-
-   //   workerFuture.cancel(true);
-//    } catch (NullPointerException e) {
-//      // okay
-//    }
   }
 
   @Override
@@ -208,7 +115,7 @@ public class PerThreadExecService extends AbstractExecutorService {
         throw new RuntimeException("Timeout");
       }
 
-     //zaa System.out.println("WAIT : " + workQueue.size() + " " + available.getQueueLength() + " " + workQueue.toString());
+      // System.out.println("WAIT : " + workQueue.size() + " " + available.getQueueLength() + " " + workQueue.toString());
       synchronized (awaitTerminate) {
         awaitTerminate.wait(500);
       }
@@ -228,7 +135,7 @@ public class PerThreadExecService extends AbstractExecutorService {
       throw new RejectedExecutionException(closeTracker.getCloseStack());
     }
     running.incrementAndGet();
-    if (runnable instanceof ParWork.SolrFutureTask) {
+    if (runnable instanceof ParWork.SolrFutureTask && !((ParWork.SolrFutureTask) runnable).isCallerThreadAllowed()) {
       if (noCallerRunsAvailableLimit) {
         try {
           available.acquire();
@@ -237,13 +144,10 @@ public class PerThreadExecService extends AbstractExecutorService {
         }
       }
       try {
-        service.execute(new Runnable() {
-          @Override
-          public void run() {
-            runIt(runnable, noCallerRunsAvailableLimit, false);
-            if (noCallerRunsAvailableLimit) {
-              available.release();
-            }
+        service.execute(() -> {
+          runIt(runnable, noCallerRunsAvailableLimit, false);
+          if (noCallerRunsAvailableLimit) {
+            available.release();
           }
         });
       } catch (Exception e) {
@@ -270,41 +174,13 @@ public class PerThreadExecService extends AbstractExecutorService {
 
     Runnable finalRunnable = runnable;
     try {
-      service.execute(new Runnable() {
-      @Override
-      public void run() {
-          runIt(finalRunnable, true, false);
-      }
-    });
+      service.execute(() -> runIt(finalRunnable, true, false));
     } catch (Exception e) {
       running.decrementAndGet();
       synchronized (awaitTerminate) {
         awaitTerminate.notifyAll();
       }
     }
-
-//    boolean success = this.workQueue.offer(runnable);
-//    if (!success) {
-//     // log.warn("No room in the queue, running in caller thread {} {} {} {}", workQueue.size(), isShutdown(), isTerminated(), worker.isAlive());
-//      try {
-//        runnable.run();
-//      } finally {
-//        running.decrementAndGet();
-//        synchronized (awaitTerminate) {
-//          awaitTerminate.notifyAll();
-//        }
-//      }
-//    } else {
-//      if (worker == null) {
-//        synchronized (this) {
-//          if (worker == null) {
-//            worker = new Worker();
-//
-//            workerFuture = ParWork.getEXEC().submit(worker);
-//          }
-//        }
-//      }
-//    }
   }
 
   private void runIt(Runnable runnable, boolean acquired, boolean alreadyShutdown) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/SolrThread.java b/solr/solrj/src/java/org/apache/solr/common/SolrThread.java
new file mode 100644
index 0000000..998a0e6
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/SolrThread.java
@@ -0,0 +1,51 @@
+package org.apache.solr.common;
+
+import java.util.concurrent.ExecutorService;
+
+public class SolrThread extends Thread {
+
+  private ExecutorService executorService;
+
+  public SolrThread(ThreadGroup group, Runnable r, String name) {
+    super(group, r, name);
+
+    Thread createThread = Thread.currentThread();
+    if (createThread instanceof SolrThread) {
+      ExecutorService service = ((SolrThread) createThread).getExecutorService();
+      if (service == null) {
+        createExecutorService();
+      } else {
+        setExecutorService(service);
+      }
+    }
+
+  }
+
+  public void run() {
+    super.run();
+  }
+
+  private void setExecutorService(ExecutorService service) {
+    this.executorService = service;
+  }
+
+  private void createExecutorService() {
+    Integer minThreads;
+    Integer maxThreads;
+    minThreads = 4;
+    maxThreads = ParWork.PROC_COUNT / 2;
+    this.executorService = ParWork.getExecutorService(Math.max(minThreads, maxThreads));
+  }
+
+  public ExecutorService getExecutorService() {
+    return executorService;
+  }
+
+  public static SolrThread getCurrentThread() {
+    return (SolrThread) currentThread();
+  }
+
+  public interface CreateThread  {
+     SolrThread getCreateThread();
+  }
+}
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java b/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
index 901baa9..9d1c08b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/JavaBinCodec.java
@@ -920,8 +920,8 @@ public class JavaBinCodec implements PushWriter {
     }
   }
 
-  protected final static ThreadLocal<CharArr> THREAD_LOCAL_ARR = new ThreadLocal<>();
-  protected final static ThreadLocal<ByteBuffer> THREAD_LOCAL_BRR = new ThreadLocal<>();
+  public final static ThreadLocal<CharArr> THREAD_LOCAL_ARR = new ThreadLocal<>();
+  public final static ThreadLocal<ByteBuffer> THREAD_LOCAL_BRR = new ThreadLocal<>();
 
   public static ByteBuffer getByteArr(int sz, boolean resize) {
     ByteBuffer brr = THREAD_LOCAL_BRR.get();
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
index 92b45c6..a200b6f 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
@@ -98,7 +98,7 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
 
     public SolrQueuedThreadPool(String name) {
         this(name, Integer.MAX_VALUE, Integer.getInteger("solr.minContainerThreads", 250),
-            5000, 0, // no reserved executor threads - we can process requests after shutdown or some race - we try to limit without threadpool limits no anyway
+            30000, 0, // no reserved executor threads - we can process requests after shutdown or some race - we try to limit without threadpool limits no anyway
                 null, -1, null,
                 new  SolrNamedThreadFactory(name));
         this.name = name;
@@ -714,7 +714,16 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
 
         @Override
         public void run() {
-            runnable.run();
+            try {
+                runnable.run();
+            } finally {
+                cleanupThreadLocals();
+            }
+        }
+
+        private void cleanupThreadLocals() {
+            JavaBinCodec.THREAD_LOCAL_ARR.remove();
+            JavaBinCodec.THREAD_LOCAL_BRR.remove();
         }
     }
 
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
index 2dc1e15..a4c3c78 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
@@ -195,7 +195,7 @@ public class SolrTestCase extends LuceneTestCase {
     testStartTime = System.nanoTime();
 
 
-    testExecutor = ParWork.getMyPerThreadExecutor();
+    testExecutor = new PerThreadExecService(ParWork.getRootSharedExecutor(), 12, true, false);
     ((PerThreadExecService) testExecutor).closeLock(true);
     // stop zkserver threads that can linger
     //interruptThreadsOnTearDown("nioEventLoopGroup", false);
@@ -285,9 +285,9 @@ public class SolrTestCase extends LuceneTestCase {
 
       // unlimited - System.setProperty("solr.maxContainerThreads", "300");
       System.setProperty("solr.lowContainerThreadsThreshold", "-1");
-      System.setProperty("solr.minContainerThreads", "4");
+      System.setProperty("solr.minContainerThreads", "8");
       System.setProperty("solr.rootSharedThreadPoolCoreSize", "16");
-      System.setProperty("solr.minHttp2ClientThreads", "4");
+      System.setProperty("solr.minHttp2ClientThreads", "6");
 
 
       ScheduledTriggers.DEFAULT_COOLDOWN_PERIOD_SECONDS = 1;
@@ -436,7 +436,7 @@ public class SolrTestCase extends LuceneTestCase {
 
       SysStats.getSysStats().stopMonitor();
 
-      //ParWork.closeMyPerThreadExecutor(true);
+    //  testExecutor.shutdown();
       ParWork.shutdownRootSharedExec();
 
       AlreadyClosedException lastAlreadyClosedExp = CloseTracker.lastAlreadyClosedEx;