You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/09 15:00:35 UTC
[lucene-solr] branch reference_impl_dev updated: @821 Still finding
great puzzle piece moves.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new 64a6e8a @821 Still finding great puzzle piece moves.
64a6e8a is described below
commit 64a6e8a23aa0469ef56f91817f3a4b54d64ede74
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Sep 9 10:00:14 2020 -0500
@821 Still finding great puzzle piece moves.
---
.../src/java/org/apache/solr/cloud/Overseer.java | 2 +-
.../CollectionsAPIAsyncDistributedZkTest.java | 1 +
.../TestCollectionsAPIViaSolrCloudCluster.java | 1 +
.../src/java/org/apache/solr/common/ParWork.java | 2 +-
.../org/apache/solr/common/ParWorkExecutor.java | 2 +-
.../solr/common/cloud/ConnectionManager.java | 136 ++++++++--------
.../solr/common/util/SolrQueuedThreadPool.java | 173 +++++----------------
.../java/org/apache/solr/cloud/ZkTestServer.java | 2 +-
8 files changed, 105 insertions(+), 214 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 320cc42..6df281e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -617,7 +617,7 @@ public class Overseer implements SolrCloseable {
try {
super.run();
} finally {
- ParWork.closeMyPerThreadExecutor();
+ ParWork.closeMyPerThreadExecutor(true);
}
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java
index 14dde94..43583ae 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java
@@ -75,6 +75,7 @@ public class CollectionsAPIAsyncDistributedZkTest extends SolrCloudTestCase {
}
@Test
+ @Ignore // nocommit harden
public void testSolrJAPICalls() throws Exception {
final CloudHttp2SolrClient client = cluster.getSolrClient();
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionsAPIViaSolrCloudCluster.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionsAPIViaSolrCloudCluster.java
index d018655..ea45b68 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionsAPIViaSolrCloudCluster.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionsAPIViaSolrCloudCluster.java
@@ -117,6 +117,7 @@ public class TestCollectionsAPIViaSolrCloudCluster extends SolrCloudTestCase {
}
@Test
+ @Ignore // nocommit harden
public void testCollectionCreateSearchDelete() throws Exception {
final CloudHttp2SolrClient client = cluster.getSolrClient();
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index e16bf5f..f996f65 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -106,7 +106,7 @@ public class ParWork implements Closeable {
}
public static void closeMyPerThreadExecutor() {
- closeMyPerThreadExecutor(true);
+ closeMyPerThreadExecutor(false);
}
public static void closeMyPerThreadExecutor(boolean unlockClose) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
index 1f34f64..8b5c6f0 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
@@ -97,7 +97,7 @@ public class ParWorkExecutor extends ThreadPoolExecutor {
try {
r.run();
} finally {
- ParWork.closeMyPerThreadExecutor();
+ ParWork.closeMyPerThreadExecutor(true);
}
}
};
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
index 7d713c1..ca37d76 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
@@ -245,96 +245,82 @@ public class ConnectionManager implements Watcher, Closeable {
private synchronized void reconnect() {
if (isClosed()) return;
- try {
- if (beforeReconnect != null) {
+
+ if (beforeReconnect != null) {
+ try {
+ beforeReconnect.command();
+ } catch (Exception e) {
+ ParWork.propegateInterrupt("Exception running beforeReconnect command", e);
+ if (e instanceof InterruptedException || e instanceof AlreadyClosedException) {
+ return;
+ }
+ }
+ }
+
+ synchronized (keeperLock) {
+ if (isClosed()) return;
+ if (keeper != null) {
+ // if there was a problem creating the new SolrZooKeeper
+ // or if we cannot run our reconnect command, close the keeper
+ // our retry loop will try to create one again
try {
- beforeReconnect.command();
+ ParWork.close(keeper);
+ keeper = null;
} catch (Exception e) {
- ParWork
- .propegateInterrupt("Exception running beforeReconnect command",
- e);
- if (e instanceof InterruptedException
- || e instanceof AlreadyClosedException) {
+ ParWork.propegateInterrupt("Exception closing keeper after hitting exception", e);
+ if (e instanceof InterruptedException || e instanceof AlreadyClosedException) {
return;
}
}
}
- synchronized (keeperLock) {
- if (isClosed()) return;
- if (keeper != null) {
- // if there was a problem creating the new SolrZooKeeper
- // or if we cannot run our reconnect command, close the keeper
- // our retry loop will try to create one again
- try {
- ParWork.close(keeper);
- keeper = null;
- } catch (Exception e) {
- ParWork.propegateInterrupt(
- "Exception closing keeper after hitting exception", e);
- if (e instanceof InterruptedException
- || e instanceof AlreadyClosedException) {
- return;
- }
- }
- }
-
- }
+ }
- do {
- if (isClosed()) return;
- // This loop will break if a valid connection is made. If a connection is not made then it will repeat and
- // try again to create a new connection.
- log.info("Running reconnect strategy");
+ do {
+ if (isClosed()) return;
+ // This loop will break if a valid connection is made. If a connection is not made then it will repeat and
+ // try again to create a new connection.
+ log.info("Running reconnect strategy");
+ try {
+ updatezk();
try {
- updatezk();
- try {
- waitForConnected(5000);
- if (isClosed()) return;
- if (onReconnect != null) {
- try {
- onReconnect.command();
- } catch (Exception e) {
- SolrException exp = new SolrException(
- SolrException.ErrorCode.SERVER_ERROR, e);
- ParWork.propegateInterrupt(
- "$ZkClientConnectionStrategy.ZkUpdate.update(SolrZooKeeper="
- + keeper + ")", e);
- if (e instanceof InterruptedException
- || e instanceof AlreadyClosedException) {
- return;
- }
- throw exp;
+ waitForConnected(5000);
+ if (isClosed()) return;
+ if (onReconnect != null) {
+ try {
+ onReconnect.command();
+ } catch (Exception e) {
+ SolrException exp = new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ ParWork.propegateInterrupt("$ZkClientConnectionStrategy.ZkUpdate.update(SolrZooKeeper=" + keeper + ")", e);
+ if (e instanceof InterruptedException || e instanceof AlreadyClosedException) {
+ return;
}
+ throw exp;
}
- } catch (InterruptedException | AlreadyClosedException e) {
- ParWork.propegateInterrupt(e);
- return;
- } catch (Exception e1) {
- log.error("Exception updating zk instance", e1);
- SolrException exp = new SolrException(
- SolrException.ErrorCode.SERVER_ERROR, e1);
- throw exp;
}
-
- if (log.isDebugEnabled()) {
- log.debug(
- "$ZkClientConnectionStrategy.ZkUpdate.update(SolrZooKeeper) - end");
- }
- } catch (AlreadyClosedException e) {
+ } catch (InterruptedException | AlreadyClosedException e) {
+ ParWork.propegateInterrupt(e);
return;
- } catch (Exception e) {
- SolrException.log(log, "", e);
- log.info("Could not connect due to error, trying again ..");
- ParWork.close(keeper);
- break;
+ } catch (Exception e1) {
+ log.error("Exception updating zk instance", e1);
+ SolrException exp = new SolrException(SolrException.ErrorCode.SERVER_ERROR, e1);
+ throw exp;
}
- } while (!isClosed() || Thread.currentThread().isInterrupted());
- } finally {
- ParWork
- .closeMyPerThreadExecutor(); // we are using the root exec directly, let's just make sure it's closed here to avoid a slight delay leak
- }
+ if (log.isDebugEnabled()) {
+ log.debug("$ZkClientConnectionStrategy.ZkUpdate.update(SolrZooKeeper) - end");
+ }
+ } catch (AlreadyClosedException e) {
+ return;
+ } catch (Exception e) {
+ SolrException.log(log, "", e);
+ log.info("Could not connect due to error, trying again ..");
+ ParWork.close(keeper);
+ break;
+ }
+
+ } while (!isClosed() || Thread.currentThread().isInterrupted());
+
log.info("zkClient Connected: {}", connected);
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
index 0b30387..5d15477 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
@@ -26,6 +26,7 @@ import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.eclipse.jetty.util.thread.ThreadPoolBudget;
@@ -39,13 +40,17 @@ import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFactory, ThreadPool.SizedThreadPool, Dumpable, TryExecutor, Closeable {
@@ -65,7 +70,10 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
*/
private final AtomicBiInteger _counts = new AtomicBiInteger(Integer.MIN_VALUE, 0);
private final AtomicLong _lastShrink = new AtomicLong();
- private final Set<Thread> _threads = ConcurrentHashMap.newKeySet();
+ private final Map<Runnable, Future> _threads = new ConcurrentHashMap<>(256);
+
+ private final Set<Future> _threadFutures = ConcurrentHashMap.newKeySet();
+
private final Object _joinLock = new Object();
private final BlockingQueue<Runnable> _jobs;
private final ThreadGroup _threadGroup;
@@ -206,15 +214,14 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
close();
}
- private void joinThreads(long stopByNanos) throws InterruptedException
- {
- for (Thread thread : _threads)
+ private void joinThreads(long stopByNanos) throws InterruptedException, TimeoutException, ExecutionException {
+ for (Future thread : _threadFutures)
{
long canWait = TimeUnit.NANOSECONDS.toMillis(stopByNanos - System.nanoTime());
if (LOG.isDebugEnabled())
LOG.debug("Waiting for {} for {}", thread, canWait);
if (canWait > 0)
- thread.join(canWait);
+ thread.get(canWait, TimeUnit.NANOSECONDS);
}
}
@@ -585,11 +592,11 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
boolean started = false;
try
{
- Thread thread = _threadFactory.newThread(_runnable);
- ParWork.getRootSharedExecutor().execute(thread);
+ Runnable runnable = newRunnable(_runnable);
+ Future future = ParWork.getRootSharedExecutor().submit(runnable);
if (LOG.isDebugEnabled())
- LOG.debug("Starting {}", thread);
- _threads.add(thread);
+ LOG.debug("Starting {}", runnable);
+ _threads.put(runnable, future);
_lastShrink.set(System.nanoTime());
_runnable.waitForStart();
started = true;
@@ -616,8 +623,7 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
}
}
- @Override
- public Thread newThread(Runnable runnable)
+ public Runnable newRunnable(Runnable runnable)
{
ThreadGroup group;
@@ -627,77 +633,17 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
}
- Thread thread = new MyThread(group, runnable);
- thread.setDaemon(isDaemon());
- thread.setPriority(getThreadsPriority());
- thread.setName(_name + "-" + thread.getId());
+ Runnable thread = new MyRunnable(runnable);
+
return thread;
}
- protected void removeThread(Thread thread)
+ protected void removeThread(Runnable thread)
{
_threads.remove(thread);
}
@Override
- public void dump(Appendable out, String indent) throws IOException
- {
- List<Object> threads = new ArrayList<>(getMaxThreads());
- for (final Thread thread : _threads)
- {
- final StackTraceElement[] trace = thread.getStackTrace();
- String knownMethod = "";
- for (StackTraceElement t : trace)
- {
- if ("idleJobPoll".equals(t.getMethodName()) && t.getClassName().equals(SolrQueuedThreadPool.Runner.class.getName()))
- {
- knownMethod = "IDLE ";
- break;
- }
-
- if ("reservedWait".equals(t.getMethodName()) && t.getClassName().endsWith("ReservedThread"))
- {
- knownMethod = "RESERVED ";
- break;
- }
-
- if ("select".equals(t.getMethodName()) && t.getClassName().endsWith("SelectorProducer"))
- {
- knownMethod = "SELECTING ";
- break;
- }
-
- if ("accept".equals(t.getMethodName()) && t.getClassName().contains("ServerConnector"))
- {
- knownMethod = "ACCEPTING ";
- break;
- }
- }
- final String known = knownMethod;
-
- if (isDetailedDump())
- {
- threads.add(new MyDumpable(known, thread, trace));
- }
- else
- {
- int p = thread.getPriority();
- threads.add(thread.getId() + " " + thread.getName() + " " + known + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (p == Thread.NORM_PRIORITY ? "" : (" prio=" + p)));
- }
- }
-
- if (isDetailedDump())
- {
- List<Runnable> jobs = new ArrayList<>(getQueue());
- dumpObjects(out, indent, new DumpableCollection("threads", threads), new DumpableCollection("jobs", jobs));
- }
- else
- {
- dumpObjects(out, indent, new DumpableCollection("threads", threads));
- }
- }
-
- @Override
public String toString()
{
long count = _counts.get();
@@ -733,8 +679,6 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
} catch (Error error) {
log.error("Error in Jetty thread pool thread", error);
this.error = error;
- } finally {
- ParWork.closeMyPerThreadExecutor();
}
synchronized (notify) {
@@ -759,63 +703,21 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
throw new UnsupportedOperationException("Use constructor injection");
}
- /**
- * @param id the thread ID to interrupt.
- * @return true if the thread was found and interrupted.
- */
- @ManagedOperation("interrupts a pool thread")
- public boolean interruptThread(@Name("id") long id)
- {
- for (Thread thread : _threads)
- {
- if (thread.getId() == id)
- {
- thread.interrupt();
- return true;
- }
- }
- return false;
- }
-
- /**
- * @param id the thread ID to interrupt.
- * @return the stack frames dump
- */
- @ManagedOperation("dumps a pool thread stack")
- public String dumpThread(@Name("id") long id)
- {
- for (Thread thread : _threads)
- {
- if (thread.getId() == id)
- {
- StringBuilder buf = new StringBuilder();
- buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ");
- buf.append(thread.getState()).append(":").append(System.lineSeparator());
- for (StackTraceElement element : thread.getStackTrace())
- {
- buf.append(" at ").append(element.toString()).append(System.lineSeparator());
- }
- return buf.toString();
- }
- }
+ @Override
+ public Thread newThread(Runnable runnable) {
return null;
}
- private static class MyThread extends Thread {
+ private static class MyRunnable implements Runnable {
private final Runnable runnable;
- public MyThread(ThreadGroup group, Runnable runnable) {
- super(group, "");
+ public MyRunnable(Runnable runnable) {
this.runnable = runnable;
}
@Override
public void run() {
- try {
- runnable.run();
- } finally {
- ParWork.closeMyPerThreadExecutor();
- }
+ runnable.run();
}
}
@@ -955,13 +857,13 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
}
finally
{
- Thread thread = Thread.currentThread();
- removeThread(thread);
+
+ removeThread(this);
// Decrement the total thread count and the idle count if we had no job
addCounts(-1, idle ? -1 : 0);
if (LOG.isDebugEnabled())
- LOG.debug("{} exited for {}", thread, SolrQueuedThreadPool.this);
+ LOG.debug("{} exited for {}", this, SolrQueuedThreadPool.this);
// There is a chance that we shrunk just as a job was queued for us, so
// check again if we have sufficient threads to meet demand
@@ -1013,20 +915,21 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
}
// interrupt threads
- for (Thread thread : _threads)
+ for (Future thread : _threadFutures)
{
if (LOG.isDebugEnabled())
LOG.debug("Interrupting {}", thread);
- thread.interrupt();
+ thread.cancel(true);
}
- while (getBusyThreads() > 0) {
- try {
- Thread.sleep(50);
- } catch (InterruptedException e) {
- ParWork.propegateInterrupt(e, true);
- break;
- }
+ try {
+ joinThreads(15000);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted in joinThreads on close {}", e);
+ } catch (TimeoutException e) {
+ LOG.warn("Timeout in joinThreads on close {}", e);
+ } catch (ExecutionException e) {
+ LOG.warn("Execution exception in joinThreads on close {}", e);
}
// Close any un-executed jobs
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
index 78bf83a..4f5016f 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
@@ -576,7 +576,7 @@ public class ZkTestServer implements Closeable {
log.error("zkServer error", t);
}
} finally {
- ParWork.closeMyPerThreadExecutor();
+ ParWork.closeMyPerThreadExecutor(true);
}
}
};