You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/07/20 13:55:19 UTC
[lucene-solr] branch reference_impl updated: @248 - Another
overseeer thread is benched, expensive test sidelined to nightly,
fix up this executor.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl by this push:
new 3765bb9 @248 - Another overseeer thread is benched, expensive test sidelined to nightly, fix up this executor.
3765bb9 is described below
commit 3765bb92bb7970e074bafe08fda0c21e9859cf8a
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Jul 20 08:54:50 2020 -0500
@248 - Another overseeer thread is benched, expensive test sidelined to nightly, fix up this executor.
---
.../src/java/org/apache/solr/cloud/Overseer.java | 3 +-
.../apache/solr/cloud/OverseerTaskProcessor.java | 22 ++++---
.../java/org/apache/solr/core/CoreContainer.java | 18 +++---
.../org/apache/solr/search/TestRangeQuery.java | 2 +
solr/reference_branch/prod/Dockerfile | 8 +--
.../src/java/org/apache/solr/common/ParWork.java | 51 +++------------
.../org/apache/solr/common/ParWorkExecutor.java | 72 ++++++++++++++++++++++
.../org/apache/solr/common/cloud/SolrZkClient.java | 33 +---------
.../src/java/org/apache/solr/SolrTestCaseJ4.java | 37 +----------
9 files changed, 114 insertions(+), 132 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index fcf09ed..a925a27 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -641,7 +641,8 @@ public class Overseer implements SolrCloseable {
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
- OverseerNodePrioritizer overseerPrioritizer = new OverseerNodePrioritizer(reader, getStateUpdateQueue(), adminPath, shardHandler.getShardHandlerFactory(), updateShardHandler.getUpdateOnlyHttpClient());
+ // nocommit - I don't know about this guy..
+ OverseerNodePrioritizer overseerPrioritizer = null; // new OverseerNodePrioritizer(reader, getStateUpdateQueue(), adminPath, shardHandler.getShardHandlerFactory(), updateShardHandler.getUpdateOnlyHttpClient());
overseerCollectionConfigSetProcessor = new OverseerCollectionConfigSetProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this, overseerPrioritizer);
ccThread = new OverseerThread(ccTg, overseerCollectionConfigSetProcessor, "OverseerCollectionConfigSetProcessor-" + id);
ccThread.setDaemon(true);
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index 1dd7f2c..9a7a0d0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -176,17 +176,19 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
else
log.debug("Found already existing elements in the work-queue. Last element: {}", oldestItemInWorkQueue);
- try {
- prioritizer.prioritizeOverseerNodes(myId);
- } catch (Exception e) {
- ParWork.propegateInterrupt(e);
- if (e instanceof KeeperException.SessionExpiredException) {
- return;
- }
- if (e instanceof InterruptedException || e instanceof AlreadyClosedException) {
- return;
+ if (prioritizer != null) {
+ try {
+ prioritizer.prioritizeOverseerNodes(myId);
+ } catch (Exception e) {
+ ParWork.propegateInterrupt(e);
+ if (e instanceof KeeperException.SessionExpiredException) {
+ return;
+ }
+ if (e instanceof InterruptedException || e instanceof AlreadyClosedException) {
+ return;
+ }
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
try {
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 7fed407..f8e9a55 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -780,12 +780,6 @@ public class CoreContainer implements Closeable {
metricsHandler.initializeMetrics(solrMetricsContext, METRICS_PATH);
});
- if (!Boolean.getBoolean("solr.disableMetricsHistoryHandler")) {
- work.collect(() -> {
- createMetricsHistoryHandler();
- });
- }
-
work.collect(() -> {
autoscalingHistoryHandler = createHandler(AUTOSCALING_HISTORY_PATH, AutoscalingHistoryHandler.class.getName(), AutoscalingHistoryHandler.class);
metricsCollectorHandler = createHandler(MetricsCollectorHandler.HANDLER_PATH, MetricsCollectorHandler.class.getName(), MetricsCollectorHandler.class);
@@ -793,8 +787,6 @@ public class CoreContainer implements Closeable {
metricsCollectorHandler.init(null);
});
- work.addCollect("ccload");
-
work.collect(() -> {
containerHandlers.put(AUTHZ_PATH, securityConfHandler);
securityConfHandler.initializeMetrics(solrMetricsContext, AUTHZ_PATH);
@@ -808,7 +800,15 @@ public class CoreContainer implements Closeable {
metricManager.loadReporters(metricReporters, loader, this, null, null, SolrInfoBean.Group.jetty);
});
- work.addCollect("ccload2");
+ work.addCollect("ccload");
+
+ if (!Boolean.getBoolean("solr.disableMetricsHistoryHandler")) {
+ work.collect(() -> {
+ createMetricsHistoryHandler();
+ });
+ }
+
+ work.addCollect("metricsHistoryHandlers");
}
// initialize gauges for reporting the number of cores and disk total/free
diff --git a/solr/core/src/test/org/apache/solr/search/TestRangeQuery.java b/solr/core/src/test/org/apache/solr/search/TestRangeQuery.java
index 8c5de75..f6e0c85 100644
--- a/solr/core/src/test/org/apache/solr/search/TestRangeQuery.java
+++ b/solr/core/src/test/org/apache/solr/search/TestRangeQuery.java
@@ -27,6 +27,7 @@ import java.util.Locale;
import java.util.Map;
import com.carrotsearch.hppc.IntHashSet;
+import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrInputDocument;
@@ -41,6 +42,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+@LuceneTestCase.Nightly // nocommit slow
public class TestRangeQuery extends SolrTestCaseJ4 {
private final static long DATE_START_TIME_RANDOM_TEST = 1499797224224L;
diff --git a/solr/reference_branch/prod/Dockerfile b/solr/reference_branch/prod/Dockerfile
index 9925d5f..a32cf82 100644
--- a/solr/reference_branch/prod/Dockerfile
+++ b/solr/reference_branch/prod/Dockerfile
@@ -12,14 +12,12 @@ RUN chmod +x /start-solr.sh
RUN apt-get -y update; apt-get -y upgrade; apt-get -y install ant
RUN git clone https://github.com/apache/lucene-solr.git --branch reference_impl --single-branch reference_impl; \
- cd reference_impl;ant ivy-bootstrap;cd solr;ant package -Dversion=9.0.0-miller_ref_impl; \
- cp package/*miller_ref_impl/ ${INSTALL_DIR}; \
- chmod +x ${INSTALL_DIR}/reference_impl/solr/bin/solr
+ cd reference_impl;ant ivy-bootstrap;cd solr;ant package -Dversion=9.0.0-miller_ref_impl;
WORKDIR ${BUILD_DIR}
ENV PATH=$PATH:/opt/solr/miller_ref_impl/bin
-#ENTRYPOINT "/bin/bash"
-ENTRYPOINT "/start-solr.sh"
\ No newline at end of file
+ENTRYPOINT "/bin/bash"
+#ENTRYPOINT "/start-solr.sh"
\ No newline at end of file
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 542630f..543303e 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -66,12 +66,6 @@ public class ParWork implements Closeable {
protected final static ThreadLocal<ExecutorService> THREAD_LOCAL_EXECUTOR = new ThreadLocal<>();
- public static volatile int MAXIMUM_POOL_SIZE;
- public static final long KEEP_ALIVE_TIME = 10;
-
- public static volatile int CAPACITY = 30;
- private static final int GROWBY = 30;
-
private Set<Object> collectSet = null;
private static SysStats sysStats = SysStats.getSysStats();
@@ -557,6 +551,8 @@ public class ParWork implements Closeable {
}
public static void sizePoolByLoad() {
+ Integer maxPoolsSize = getMaxPoolSize();
+
ThreadPoolExecutor executor = (ThreadPoolExecutor) getExecutor();
double load = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
if (load < 0) {
@@ -577,8 +573,8 @@ public class ParWork implements Closeable {
if (cMax > 2) {
executor.setMaximumPoolSize(Math.max(2, (int) ((double) cMax * 0.60D)));
}
- } else if (sLoad < 0.9D && MAXIMUM_POOL_SIZE != executor.getMaximumPoolSize()) {
- executor.setMaximumPoolSize(MAXIMUM_POOL_SIZE);
+ } else if (sLoad < 0.9D && maxPoolsSize != executor.getMaximumPoolSize()) {
+ executor.setMaximumPoolSize(maxPoolsSize);
}
if (log.isDebugEnabled()) log.debug("ParWork, load:" + sLoad); //nocommit: remove when testing is done
@@ -602,43 +598,16 @@ public class ParWork implements Closeable {
}
public static ExecutorService getExecutorService(int corePoolSize, int maximumPoolSize, int keepAliveTime) {
- MAXIMUM_POOL_SIZE = Integer.getInteger("solr.maxThreadExecPoolSize",
- (int) Math.max(2, Math.round(Runtime.getRuntime().availableProcessors() / 2.0d)));
- CAPACITY = Integer.getInteger("solr.threadExecQueueSize", 80);
ThreadPoolExecutor exec;
- exec = new ThreadPoolExecutor(0, MAXIMUM_POOL_SIZE,
- KEEP_ALIVE_TIME, TimeUnit.SECONDS,
- new BlockingArrayQueue<>(CAPACITY, GROWBY), // size?
- new ThreadFactory() {
- AtomicInteger threadNumber = new AtomicInteger(1);
- ThreadGroup group;
-
- {
- SecurityManager s = System.getSecurityManager();
- group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
- }
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(group, r, "ParWork" + threadNumber.getAndIncrement(), 0);
- t.setDaemon(false);
- // t.setPriority(priority);
- return t;
- }
- }, new RejectedExecutionHandler() {
-
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- log.warn("Task was rejected, running in caller thread");
- if (executor.isShutdown() || executor.isTerminated() || executor.isTerminating()) {
- throw new AlreadyClosedException();
- }
- executor.execute(r);
- }
- });
+ exec = new ParWorkExecutor("ParWork", getMaxPoolSize());
return exec;
}
+ private static Integer getMaxPoolSize() {
+ return Integer.getInteger("solr.maxThreadExecPoolSize",
+ (int) Math.max(6, Math.round(Runtime.getRuntime().availableProcessors())));
+ }
+
private void handleObject(String label, AtomicReference<Throwable> exception, final TimeTracker workUnitTracker, Object object) {
if (log.isDebugEnabled()) {
log.debug(
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
new file mode 100644
index 0000000..6ab259d
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
@@ -0,0 +1,72 @@
+package org.apache.solr.common;
+
+import org.eclipse.jetty.util.BlockingArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ParWorkExecutor extends ThreadPoolExecutor {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ public static final long KEEP_ALIVE_TIME = 10;
+ private static final int GROW_BY = 30;
+
+ private final Object lock = new Object();
+
+ private static AtomicInteger threadNumber = new AtomicInteger(1);
+
+ public ParWorkExecutor(String name, int maxPoolsSize) {
+ super(0, maxPoolsSize, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new BlockingArrayQueue<>(Integer.getInteger("solr.threadExecQueueSize", 80), GROW_BY), new ThreadFactory() {
+
+ ThreadGroup group;
+
+ {
+ SecurityManager s = System.getSecurityManager();
+ group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(group, r, name + threadNumber.getAndIncrement(), 0);
+ t.setDaemon(false);
+ // t.setPriority(priority);
+ return t;
+ }
+ });
+
+ setRejectedExecutionHandler(new RejectedExecutionHandler() {
+
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ log.warn("Task was rejected, running in caller thread");
+ if (executor.isShutdown() || executor.isTerminated() || executor.isTerminating()) {
+ throw new AlreadyClosedException();
+ }
+ synchronized (lock) {
+ try {
+ lock.wait(10000);
+ } catch (InterruptedException e) {
+ ParWork.propegateInterrupt(e, true);
+ }
+ }
+ executor.execute(r);
+ }
+ });
+ }
+
+
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ super.afterExecute(r, t);
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
+
+}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index c853283..922d0f8 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -57,6 +57,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
+import org.apache.solr.common.ParWorkExecutor;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.StringUtils;
import org.apache.solr.common.cloud.ConnectionManager.IsClosed;
@@ -101,37 +102,7 @@ public class SolrZkClient implements Closeable {
private volatile SolrZooKeeper keeper;
- private final ExecutorService zkCallbackExecutor =
- new ThreadPoolExecutor(1, 1,
- 30L, TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(120), // size?
- new ThreadFactory() {
- AtomicInteger threadNumber = new AtomicInteger(1);
- ThreadGroup group;
-
- {
- SecurityManager s = System.getSecurityManager();
- group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
- }
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(group, r, "ZkCallback" + threadNumber.getAndIncrement(), 0);
- t.setDaemon(false);
- // t.setPriority(priority);
- return t;
- }
- }, new RejectedExecutionHandler() {
-
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- log.warn("Task was rejected, running in caller thread");
- if (executor.isShutdown() || executor.isTerminated() || executor.isTerminating()) {
- throw new AlreadyClosedException();
- }
- executor.execute(r);
- }
- });
+ private final ExecutorService zkCallbackExecutor = new ParWorkExecutor("ZkCallback", 1);
private final ExecutorService zkConnManagerCallbackExecutor =
ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("zkConnectionManagerCallback"));
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
index 66a22ca..570fbbb 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
@@ -95,6 +95,7 @@ import org.apache.solr.cloud.IpTables;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
+import org.apache.solr.common.ParWorkExecutor;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
@@ -237,42 +238,8 @@ public abstract class SolrTestCaseJ4 extends SolrTestCase {
initialRootLogLevel = StartupLoggingUtils.getLogLevelString();
initClassLogLevels();
resetExceptionIgnores();
-
- testExecutor = new ThreadPoolExecutor(0, Math.max(1, Runtime.getRuntime().availableProcessors()),
- 3000, TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(30), // size?
- new ThreadFactory() {
- AtomicInteger threadNumber = new AtomicInteger(1);
- ThreadGroup group;
-
- {
- SecurityManager s = System.getSecurityManager();
- group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
- }
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(group, r, "testExecutor" + threadNumber.getAndIncrement(), 0);
- t.setDaemon(false);
- return t;
- }
- }, new RejectedExecutionHandler() {
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- log.warn("Task was rejected, running in caller thread");
- if (executor.isShutdown() || executor.isTerminated() || executor.isTerminating()) {
- throw new AlreadyClosedException();
- }
-// try {
-// Thread.sleep(1000);
-// } catch (InterruptedException e) {
-// Thread.currentThread().interrupt();
-// }
-// executor.execute(r);
- r.run();
- }
- });
+ testExecutor = new ParWorkExecutor("testExecutor", Math.max(1, Runtime.getRuntime().availableProcessors()));
// set solr.install.dir needed by some test configs outside of the test sandbox (!)
System.setProperty("solr.install.dir", ExternalPaths.SOURCE_HOME);