You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/09 15:00:35 UTC

[lucene-solr] branch reference_impl_dev updated: @821 Still finding great puzzle piece moves.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new 64a6e8a  @821 Still finding great puzzle piece moves.
64a6e8a is described below

commit 64a6e8a23aa0469ef56f91817f3a4b54d64ede74
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Sep 9 10:00:14 2020 -0500

    @821 Still finding great puzzle piece moves.
---
 .../src/java/org/apache/solr/cloud/Overseer.java   |   2 +-
 .../CollectionsAPIAsyncDistributedZkTest.java      |   1 +
 .../TestCollectionsAPIViaSolrCloudCluster.java     |   1 +
 .../src/java/org/apache/solr/common/ParWork.java   |   2 +-
 .../org/apache/solr/common/ParWorkExecutor.java    |   2 +-
 .../solr/common/cloud/ConnectionManager.java       | 136 ++++++++--------
 .../solr/common/util/SolrQueuedThreadPool.java     | 173 +++++----------------
 .../java/org/apache/solr/cloud/ZkTestServer.java   |   2 +-
 8 files changed, 105 insertions(+), 214 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 320cc42..6df281e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -617,7 +617,7 @@ public class Overseer implements SolrCloseable {
       try {
         super.run();
       } finally {
-        ParWork.closeMyPerThreadExecutor();
+        ParWork.closeMyPerThreadExecutor(true);
       }
     }
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java
index 14dde94..43583ae 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java
@@ -75,6 +75,7 @@ public class CollectionsAPIAsyncDistributedZkTest extends SolrCloudTestCase {
   }
 
   @Test
+  @Ignore // nocommit harden
   public void testSolrJAPICalls() throws Exception {
 
     final CloudHttp2SolrClient client = cluster.getSolrClient();
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionsAPIViaSolrCloudCluster.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionsAPIViaSolrCloudCluster.java
index d018655..ea45b68 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionsAPIViaSolrCloudCluster.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionsAPIViaSolrCloudCluster.java
@@ -117,6 +117,7 @@ public class TestCollectionsAPIViaSolrCloudCluster extends SolrCloudTestCase {
   }
 
   @Test
+  @Ignore // nocommit harden
   public void testCollectionCreateSearchDelete() throws Exception {
 
     final CloudHttp2SolrClient client = cluster.getSolrClient();
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index e16bf5f..f996f65 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -106,7 +106,7 @@ public class ParWork implements Closeable {
   }
 
   public static void closeMyPerThreadExecutor() {
-    closeMyPerThreadExecutor(true);
+    closeMyPerThreadExecutor(false);
   }
 
   public static void closeMyPerThreadExecutor(boolean unlockClose) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
index 1f34f64..8b5c6f0 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
@@ -97,7 +97,7 @@ public class ParWorkExecutor extends ThreadPoolExecutor {
           try {
             r.run();
           } finally {
-            ParWork.closeMyPerThreadExecutor();
+            ParWork.closeMyPerThreadExecutor(true);
           }
         }
       };
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
index 7d713c1..ca37d76 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
@@ -245,96 +245,82 @@ public class ConnectionManager implements Watcher, Closeable {
 
   private synchronized void reconnect() {
     if (isClosed()) return;
-    try {
-      if (beforeReconnect != null) {
+
+    if (beforeReconnect != null) {
+      try {
+        beforeReconnect.command();
+      } catch (Exception e) {
+        ParWork.propegateInterrupt("Exception running beforeReconnect command", e);
+        if (e instanceof InterruptedException || e instanceof AlreadyClosedException) {
+          return;
+        }
+      }
+    }
+
+    synchronized (keeperLock) {
+      if (isClosed()) return;
+      if (keeper != null) {
+        // if there was a problem creating the new SolrZooKeeper
+        // or if we cannot run our reconnect command, close the keeper
+        // our retry loop will try to create one again
         try {
-          beforeReconnect.command();
+          ParWork.close(keeper);
+          keeper = null;
         } catch (Exception e) {
-          ParWork
-              .propegateInterrupt("Exception running beforeReconnect command",
-                  e);
-          if (e instanceof InterruptedException
-              || e instanceof AlreadyClosedException) {
+          ParWork.propegateInterrupt("Exception closing keeper after hitting exception", e);
+          if (e instanceof InterruptedException || e instanceof AlreadyClosedException) {
             return;
           }
         }
       }
 
-      synchronized (keeperLock) {
-        if (isClosed()) return;
-        if (keeper != null) {
-          // if there was a problem creating the new SolrZooKeeper
-          // or if we cannot run our reconnect command, close the keeper
-          // our retry loop will try to create one again
-          try {
-            ParWork.close(keeper);
-            keeper = null;
-          } catch (Exception e) {
-            ParWork.propegateInterrupt(
-                "Exception closing keeper after hitting exception", e);
-            if (e instanceof InterruptedException
-                || e instanceof AlreadyClosedException) {
-              return;
-            }
-          }
-        }
-
-      }
+    }
 
-      do {
-        if (isClosed()) return;
-        // This loop will break if a valid connection is made. If a connection is not made then it will repeat and
-        // try again to create a new connection.
-        log.info("Running reconnect strategy");
+    do {
+      if (isClosed()) return;
+      // This loop will break if a valid connection is made. If a connection is not made then it will repeat and
+      // try again to create a new connection.
+      log.info("Running reconnect strategy");
+      try {
+        updatezk();
         try {
-          updatezk();
-          try {
-            waitForConnected(5000);
-            if (isClosed()) return;
-            if (onReconnect != null) {
-              try {
-                onReconnect.command();
-              } catch (Exception e) {
-                SolrException exp = new SolrException(
-                    SolrException.ErrorCode.SERVER_ERROR, e);
-                ParWork.propegateInterrupt(
-                    "$ZkClientConnectionStrategy.ZkUpdate.update(SolrZooKeeper="
-                        + keeper + ")", e);
-                if (e instanceof InterruptedException
-                    || e instanceof AlreadyClosedException) {
-                  return;
-                }
-                throw exp;
+          waitForConnected(5000);
+          if (isClosed()) return;
+          if (onReconnect != null) {
+            try {
+              onReconnect.command();
+            } catch (Exception e) {
+              SolrException exp = new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+              ParWork.propegateInterrupt("$ZkClientConnectionStrategy.ZkUpdate.update(SolrZooKeeper=" + keeper + ")", e);
+              if (e instanceof InterruptedException || e instanceof AlreadyClosedException) {
+                return;
               }
+              throw exp;
             }
-          } catch (InterruptedException | AlreadyClosedException e) {
-            ParWork.propegateInterrupt(e);
-            return;
-          } catch (Exception e1) {
-            log.error("Exception updating zk instance", e1);
-            SolrException exp = new SolrException(
-                SolrException.ErrorCode.SERVER_ERROR, e1);
-            throw exp;
           }
-
-          if (log.isDebugEnabled()) {
-            log.debug(
-                "$ZkClientConnectionStrategy.ZkUpdate.update(SolrZooKeeper) - end");
-          }
-        } catch (AlreadyClosedException e) {
+        } catch (InterruptedException | AlreadyClosedException e) {
+          ParWork.propegateInterrupt(e);
           return;
-        } catch (Exception e) {
-          SolrException.log(log, "", e);
-          log.info("Could not connect due to error, trying again ..");
-          ParWork.close(keeper);
-          break;
+        } catch (Exception e1) {
+          log.error("Exception updating zk instance", e1);
+          SolrException exp = new SolrException(SolrException.ErrorCode.SERVER_ERROR, e1);
+          throw exp;
         }
 
-      } while (!isClosed() || Thread.currentThread().isInterrupted());
-    } finally {
-      ParWork
-          .closeMyPerThreadExecutor(); // we are using the root exec directly, let's just make sure it's closed here to avoid a slight delay leak
-    }
+        if (log.isDebugEnabled()) {
+          log.debug("$ZkClientConnectionStrategy.ZkUpdate.update(SolrZooKeeper) - end");
+        }
+      } catch (AlreadyClosedException e) {
+        return;
+      } catch (Exception e) {
+        SolrException.log(log, "", e);
+        log.info("Could not connect due to error, trying again ..");
+        ParWork.close(keeper);
+        break;
+      }
+
+    } while (!isClosed() || Thread.currentThread().isInterrupted());
+
     log.info("zkClient Connected: {}", connected);
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
index 0b30387..5d15477 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
@@ -26,6 +26,7 @@ import org.eclipse.jetty.util.annotation.Name;
 import org.eclipse.jetty.util.component.ContainerLifeCycle;
 import org.eclipse.jetty.util.component.Dumpable;
 import org.eclipse.jetty.util.component.DumpableCollection;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
 import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
 import org.eclipse.jetty.util.thread.ThreadPool;
 import org.eclipse.jetty.util.thread.ThreadPoolBudget;
@@ -39,13 +40,17 @@ import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFactory, ThreadPool.SizedThreadPool, Dumpable, TryExecutor, Closeable {
@@ -65,7 +70,10 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
      */
     private final AtomicBiInteger _counts = new AtomicBiInteger(Integer.MIN_VALUE, 0);
     private final AtomicLong _lastShrink = new AtomicLong();
-    private final Set<Thread> _threads = ConcurrentHashMap.newKeySet();
+    private final Map<Runnable, Future> _threads = new ConcurrentHashMap<>(256);
+
+    private final Set<Future> _threadFutures = ConcurrentHashMap.newKeySet();
+
     private final Object _joinLock = new Object();
     private final BlockingQueue<Runnable> _jobs;
     private final ThreadGroup _threadGroup;
@@ -206,15 +214,14 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
         close();
     }
 
-    private void joinThreads(long stopByNanos) throws InterruptedException
-    {
-        for (Thread thread : _threads)
+    private void joinThreads(long stopByNanos) throws InterruptedException, TimeoutException, ExecutionException {
+        for (Future thread : _threadFutures)
         {
             long canWait = TimeUnit.NANOSECONDS.toMillis(stopByNanos - System.nanoTime());
             if (LOG.isDebugEnabled())
                 LOG.debug("Waiting for {} for {}", thread, canWait);
             if (canWait > 0)
-                thread.join(canWait);
+                thread.get(canWait, TimeUnit.NANOSECONDS);
         }
     }
 
@@ -585,11 +592,11 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
         boolean started = false;
         try
         {
-            Thread thread = _threadFactory.newThread(_runnable);
-            ParWork.getRootSharedExecutor().execute(thread);
+            Runnable runnable = newRunnable(_runnable);
+            Future future = ParWork.getRootSharedExecutor().submit(runnable);
             if (LOG.isDebugEnabled())
-                LOG.debug("Starting {}", thread);
-            _threads.add(thread);
+                LOG.debug("Starting {}", runnable);
+            _threads.put(runnable, future);
             _lastShrink.set(System.nanoTime());
             _runnable.waitForStart();
             started = true;
@@ -616,8 +623,7 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
         }
     }
 
-    @Override
-    public Thread newThread(Runnable runnable)
+    public Runnable newRunnable(Runnable runnable)
     {
         ThreadGroup group;
 
@@ -627,77 +633,17 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
                 s.getThreadGroup() :
                 Thread.currentThread().getThreadGroup();
         }
-        Thread thread = new MyThread(group, runnable);
-        thread.setDaemon(isDaemon());
-        thread.setPriority(getThreadsPriority());
-        thread.setName(_name + "-" + thread.getId());
+        Runnable thread = new MyRunnable(runnable);
+
         return thread;
     }
 
-    protected void removeThread(Thread thread)
+    protected void removeThread(Runnable thread)
     {
         _threads.remove(thread);
     }
 
     @Override
-    public void dump(Appendable out, String indent) throws IOException
-    {
-        List<Object> threads = new ArrayList<>(getMaxThreads());
-        for (final Thread thread : _threads)
-        {
-            final StackTraceElement[] trace = thread.getStackTrace();
-            String knownMethod = "";
-            for (StackTraceElement t : trace)
-            {
-                if ("idleJobPoll".equals(t.getMethodName()) && t.getClassName().equals(SolrQueuedThreadPool.Runner.class.getName()))
-                {
-                    knownMethod = "IDLE ";
-                    break;
-                }
-
-                if ("reservedWait".equals(t.getMethodName()) && t.getClassName().endsWith("ReservedThread"))
-                {
-                    knownMethod = "RESERVED ";
-                    break;
-                }
-
-                if ("select".equals(t.getMethodName()) && t.getClassName().endsWith("SelectorProducer"))
-                {
-                    knownMethod = "SELECTING ";
-                    break;
-                }
-
-                if ("accept".equals(t.getMethodName()) && t.getClassName().contains("ServerConnector"))
-                {
-                    knownMethod = "ACCEPTING ";
-                    break;
-                }
-            }
-            final String known = knownMethod;
-
-            if (isDetailedDump())
-            {
-                threads.add(new MyDumpable(known, thread, trace));
-            }
-            else
-            {
-                int p = thread.getPriority();
-                threads.add(thread.getId() + " " + thread.getName() + " " + known + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (p == Thread.NORM_PRIORITY ? "" : (" prio=" + p)));
-            }
-        }
-
-        if (isDetailedDump())
-        {
-            List<Runnable> jobs = new ArrayList<>(getQueue());
-            dumpObjects(out, indent, new DumpableCollection("threads", threads), new DumpableCollection("jobs", jobs));
-        }
-        else
-        {
-            dumpObjects(out, indent, new DumpableCollection("threads", threads));
-        }
-    }
-
-    @Override
     public String toString()
     {
         long count = _counts.get();
@@ -733,8 +679,6 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
         } catch (Error error) {
             log.error("Error in Jetty thread pool thread", error);
             this.error = error;
-        } finally {
-            ParWork.closeMyPerThreadExecutor();
         }
 
         synchronized (notify) {
@@ -759,63 +703,21 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
         throw new UnsupportedOperationException("Use constructor injection");
     }
 
-    /**
-     * @param id the thread ID to interrupt.
-     * @return true if the thread was found and interrupted.
-     */
-    @ManagedOperation("interrupts a pool thread")
-    public boolean interruptThread(@Name("id") long id)
-    {
-        for (Thread thread : _threads)
-        {
-            if (thread.getId() == id)
-            {
-                thread.interrupt();
-                return true;
-            }
-        }
-        return false;
-    }
-
-    /**
-     * @param id the thread ID to interrupt.
-     * @return the stack frames dump
-     */
-    @ManagedOperation("dumps a pool thread stack")
-    public String dumpThread(@Name("id") long id)
-    {
-        for (Thread thread : _threads)
-        {
-            if (thread.getId() == id)
-            {
-                StringBuilder buf = new StringBuilder();
-                buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ");
-                buf.append(thread.getState()).append(":").append(System.lineSeparator());
-                for (StackTraceElement element : thread.getStackTrace())
-                {
-                    buf.append("  at ").append(element.toString()).append(System.lineSeparator());
-                }
-                return buf.toString();
-            }
-        }
+    @Override
+    public Thread newThread(Runnable runnable) {
         return null;
     }
 
-    private static class MyThread extends Thread {
+    private static class MyRunnable implements Runnable {
         private final Runnable runnable;
 
-        public MyThread(ThreadGroup group, Runnable runnable) {
-            super(group, "");
+        public MyRunnable(Runnable runnable) {
             this.runnable = runnable;
         }
 
         @Override
         public void run() {
-            try {
-                runnable.run();
-            } finally {
-                ParWork.closeMyPerThreadExecutor();
-            }
+            runnable.run();
         }
     }
 
@@ -955,13 +857,13 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
             }
             finally
             {
-                Thread thread = Thread.currentThread();
-                removeThread(thread);
+
+                removeThread(this);
 
                 // Decrement the total thread count and the idle count if we had no job
                 addCounts(-1, idle ? -1 : 0);
                 if (LOG.isDebugEnabled())
-                    LOG.debug("{} exited for {}", thread, SolrQueuedThreadPool.this);
+                    LOG.debug("{} exited for {}", this, SolrQueuedThreadPool.this);
 
                 // There is a chance that we shrunk just as a job was queued for us, so
                 // check again if we have sufficient threads to meet demand
@@ -1013,20 +915,21 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
         }
 
         // interrupt threads
-        for (Thread thread : _threads)
+        for (Future thread : _threadFutures)
         {
             if (LOG.isDebugEnabled())
                 LOG.debug("Interrupting {}", thread);
-            thread.interrupt();
+            thread.cancel(true);
         }
 
-        while (getBusyThreads() > 0) {
-            try {
-                Thread.sleep(50);
-            } catch (InterruptedException e) {
-               ParWork.propegateInterrupt(e, true);
-               break;
-            }
+        try {
+            joinThreads(15000);
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted in joinThreads on close {}", e);
+        } catch (TimeoutException e) {
+            LOG.warn("Timeout in joinThreads on close {}", e);
+        } catch (ExecutionException e) {
+            LOG.warn("Execution exception in joinThreads on close {}", e);
         }
 
         // Close any un-executed jobs
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
index 78bf83a..4f5016f 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
@@ -576,7 +576,7 @@ public class ZkTestServer implements Closeable {
               log.error("zkServer error", t);
             }
           } finally {
-            ParWork.closeMyPerThreadExecutor();
+            ParWork.closeMyPerThreadExecutor(true);
           }
         }
       };