You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/21 02:21:11 UTC
[lucene-solr] 01/01: @863 Finish up search side async,
Http2 improvements, thread pools.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 48e048674161543c27f96f5723b60300117d9496
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sun Sep 20 21:19:17 2020 -0500
@863 Finish up search side async, Http2 improvements, thread pools.
---
gradle/validation/check-environment.gradle | 7 -
.../lucene/benchmark/byTask/utils/Algorithm.java | 1 +
settings.gradle | 1 +
solr/cloud-dev/cloud.sh | 6 +-
.../client/solrj/embedded/JettySolrRunner.java | 111 +++++++---------
.../src/java/org/apache/solr/cloud/Overseer.java | 2 +-
.../apache/solr/handler/admin/PrepRecoveryOp.java | 36 ++---
.../org/apache/solr/servlet/SolrQoSFilter.java | 35 +++--
.../org/apache/solr/update/SolrCmdDistributor.java | 68 +++++-----
.../org/apache/solr/update/UpdateShardHandler.java | 1 +
.../processor/DistributedUpdateProcessor.java | 2 +-
solr/server/etc/jetty-http.xml | 4 +-
solr/server/etc/jetty-https.xml | 4 +-
solr/server/etc/jetty.xml | 12 +-
solr/server/resources/log4j2.xml | 17 ++-
.../solr/configsets/_default/conf/solrconfig.xml | 13 +-
.../solr/client/solrj/impl/Http2SolrClient.java | 146 ++++++++++++++-------
.../solr/client/solrj/impl/HttpClientUtil.java | 37 ++++--
.../src/java/org/apache/solr/common/ParWork.java | 4 +-
.../org/apache/solr/common/ParWorkExecutor.java | 6 +-
.../apache/solr/common/PerThreadExecService.java | 2 +-
.../solr/common/util/SolrQueuedThreadPool.java | 82 ++++++------
.../src/java/org/apache/solr/SolrTestCase.java | 7 +-
.../java/org/apache/solr/cloud/ZkTestServer.java | 2 +-
24 files changed, 329 insertions(+), 277 deletions(-)
diff --git a/gradle/validation/check-environment.gradle b/gradle/validation/check-environment.gradle
index 0f3a084..c1b155c 100644
--- a/gradle/validation/check-environment.gradle
+++ b/gradle/validation/check-environment.gradle
@@ -37,11 +37,4 @@ configure(rootProject) {
+ "[${System.getProperty('java.vm.name')} ${System.getProperty('java.vm.version')}]")
}
- // If we're regenerating the wrapper, skip the check.
- if (!gradle.startParameter.taskNames.contains("wrapper")) {
- def currentGradleVersion = GradleVersion.current()
- if (currentGradleVersion != GradleVersion.version(expectedGradleVersion)) {
- throw new GradleException("Gradle ${expectedGradleVersion} is required (hint: use the gradlew script): this gradle is ${currentGradleVersion}")
- }
- }
}
diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Algorithm.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Algorithm.java
index 821db38..8e125c0 100644
--- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Algorithm.java
+++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Algorithm.java
@@ -308,6 +308,7 @@ public class Algorithm implements AutoCloseable {
try {
return Class.forName(pkg+'.'+taskName+"Task");
} catch (ClassNotFoundException e) {
+ e.printStackTrace();
// failed in this package, might succeed in the next one...
}
}
diff --git a/settings.gradle b/settings.gradle
index c635186..6ae6b2a 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -52,6 +52,7 @@ include "lucene:spatial3d"
include "lucene:suggest"
include "lucene:test-framework"
+include "solr:benchmark"
include "solr:solrj"
include "solr:core"
include "solr:server"
diff --git a/solr/cloud-dev/cloud.sh b/solr/cloud-dev/cloud.sh
index 1f6925f..14b904a 100755
--- a/solr/cloud-dev/cloud.sh
+++ b/solr/cloud-dev/cloud.sh
@@ -254,7 +254,7 @@ cleanIfReq() {
recompileIfReq() {
if [[ "$RECOMPILE" = true ]]; then
pushd "$VCS_WORK"/solr
- ant clean create-package
+ ./gradlew clean distTar
if [[ "$?" -ne 0 ]]; then
echo "BUILD FAIL - cloud.sh stopping, see above output for details"; popd; exit 7;
fi
@@ -274,10 +274,10 @@ copyTarball() {
echo "baz"
pushd # back to original dir to properly resolve vcs working dir
echo "foobar:"$(pwd)
- if [[ ! -f $(ls "$VCS_WORK"/solr/package/solr-*.tgz) ]]; then
+ if [[ ! -f $(ls "$VCS_WORK"/solr/packaging/solr-*.tgz) ]]; then
echo "No solr tarball found try again with -r"; popd; exit 10;
fi
- cp "$VCS_WORK"/solr/package/solr-*.tgz ${CLUSTER_WD}
+ cp "$VCS_WORK"/solr/packaging/solr-*.tgz ${CLUSTER_WD}
pushd # back into cluster wd to unpack
tar xzvf solr-*.tgz
popd
diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index d39698d..af9eb47 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -309,75 +309,64 @@ public class JettySolrRunner implements Closeable {
server.setStopAtShutdown(config.stopAtShutdown);
//if (System.getProperty("jetty.testMode") != null) {
- if (true) {
- // if this property is true, then jetty will be configured to use SSL
- // leveraging the same system properties as java to specify
- // the keystore/truststore if they are set unless specific config
- // is passed via the constructor.
- //
- // This means we will use the same truststore, keystore (and keys) for
- // the server as well as any client actions taken by this JVM in
- // talking to that server, but for the purposes of testing that should
- // be good enough
- final SslContextFactory.Server sslcontext = SSLConfig.createContextFactory(config.sslConfig);
-
- HttpConfiguration configuration = new HttpConfiguration();
- ServerConnector connector;
- if (sslcontext != null) {
- configuration.setSecureScheme("https");
- configuration.addCustomizer(new SecureRequestCustomizer());
- HttpConnectionFactory http1ConnectionFactory = new HttpConnectionFactory(configuration);
-
- if (config.onlyHttp1 || !Constants.JRE_IS_MINIMUM_JAVA9) {
- connector = new ServerConnector(server, qtp, scheduler, null, 1, 2, new SslConnectionFactory(sslcontext,
- http1ConnectionFactory.getProtocol()),
- http1ConnectionFactory);
- } else {
- sslcontext.setCipherComparator(HTTP2Cipher.COMPARATOR);
- connector = new ServerConnector(server, qtp, scheduler, null, 1, 2);
- SslConnectionFactory sslConnectionFactory = new SslConnectionFactory(sslcontext, "alpn");
- connector.addConnectionFactory(sslConnectionFactory);
- connector.setDefaultProtocol(sslConnectionFactory.getProtocol());
+ // if this property is true, then jetty will be configured to use SSL
+ // leveraging the same system properties as java to specify
+ // the keystore/truststore if they are set unless specific config
+ // is passed via the constructor.
+ //
+ // This means we will use the same truststore, keystore (and keys) for
+ // the server as well as any client actions taken by this JVM in
+ // talking to that server, but for the purposes of testing that should
+ // be good enough
+ final SslContextFactory.Server sslcontext = SSLConfig.createContextFactory(config.sslConfig);
+
+ HttpConfiguration configuration = new HttpConfiguration();
+ configuration.setRequestHeaderSize(16 * 1024);
+ configuration.setResponseHeaderSize(16 * 1024);
+ ServerConnector connector;
+ if (sslcontext != null) {
+ configuration.setSecureScheme("https");
+ configuration.addCustomizer(new SecureRequestCustomizer());
+ HttpConnectionFactory http1ConnectionFactory = new HttpConnectionFactory(configuration);
+
+ if (config.onlyHttp1 || !Constants.JRE_IS_MINIMUM_JAVA9) {
+ connector = new ServerConnector(server, qtp, scheduler, null, 1, 2, new SslConnectionFactory(sslcontext, http1ConnectionFactory.getProtocol()), http1ConnectionFactory);
+ } else {
+ sslcontext.setCipherComparator(HTTP2Cipher.COMPARATOR);
- HTTP2ServerConnectionFactory http2ConnectionFactory = new HTTP2ServerConnectionFactory(configuration);
+ connector = new ServerConnector(server, qtp, scheduler, null, 1, 2);
+ SslConnectionFactory sslConnectionFactory = new SslConnectionFactory(sslcontext, "alpn");
+ connector.addConnectionFactory(sslConnectionFactory);
+ connector.setDefaultProtocol(sslConnectionFactory.getProtocol());
- http2ConnectionFactory.setMaxConcurrentStreams(512);
- http2ConnectionFactory.setInputBufferSize(16384);
+ HTTP2ServerConnectionFactory http2ConnectionFactory = new HTTP2ServerConnectionFactory(configuration);
- ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory(
- http2ConnectionFactory.getProtocol(),
- http1ConnectionFactory.getProtocol());
- alpn.setDefaultProtocol(http2ConnectionFactory.getProtocol());
- connector.addConnectionFactory(alpn);
- connector.addConnectionFactory(http1ConnectionFactory);
- connector.addConnectionFactory(http2ConnectionFactory);
- }
+ http2ConnectionFactory.setMaxConcurrentStreams(512);
+ http2ConnectionFactory.setInputBufferSize(16384);
+ http2ConnectionFactory.setStreamIdleTimeout(TimeUnit.MINUTES.toMillis(10));
+
+ ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory(http2ConnectionFactory.getProtocol(), http1ConnectionFactory.getProtocol());
+ alpn.setDefaultProtocol(http2ConnectionFactory.getProtocol());
+ connector.addConnectionFactory(alpn);
+ connector.addConnectionFactory(http1ConnectionFactory);
+ connector.addConnectionFactory(http2ConnectionFactory);
+ }
+ } else {
+ if (config.onlyHttp1) {
+ connector = new ServerConnector(server, qtp, scheduler, null, 1, 2, new HttpConnectionFactory(configuration));
} else {
- if (config.onlyHttp1) {
- connector = new ServerConnector(server, qtp, scheduler, null, 1, 2, new HttpConnectionFactory(configuration));
- } else {
- connector = new ServerConnector(server, qtp, scheduler, null, 1, 2, new HttpConnectionFactory(configuration),
- new HTTP2CServerConnectionFactory(configuration));
- }
+ connector = new ServerConnector(server, qtp, scheduler, null, 1, 2, new HttpConnectionFactory(configuration), new HTTP2CServerConnectionFactory(configuration));
}
+ }
+ connector.setIdleTimeout(TimeUnit.MINUTES.toMillis(10));
+ connector.setReuseAddress(true);
+ connector.setSoLingerTime(-1);
+ connector.setPort(port);
+ connector.setHost("127.0.0.1");
- connector.setReuseAddress(true);
- connector.setSoLingerTime(-1);
- connector.setPort(port);
- connector.setHost("127.0.0.1");
-
- server.setConnectors(new Connector[] {connector});
+ server.setConnectors(new Connector[] {connector});
- } else {
- HttpConfiguration configuration = new HttpConfiguration();
- configuration.setIdleTimeout(Integer.getInteger("solr.containerThreadsIdle", THREAD_POOL_MAX_IDLE_TIME_MS));
- ServerConnector connector = new ServerConnector(server, new HttpConnectionFactory(configuration));
- connector.setReuseAddress(true);
- connector.setPort(port);
- connector.setSoLingerTime(-1);
- server.setConnectors(new Connector[] {connector});
- }
//server.setDumpAfterStart(true);
// server.setDumpBeforeStop(true);
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 6499bcf..ca3fe73 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -616,7 +616,7 @@ public class Overseer implements SolrCloseable {
try {
super.run();
} finally {
- ParWork.closeMyPerThreadExecutor(true);
+ //ParWork.closeMyPerThreadExecutor(true);
}
}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index ba65d40..12a2f6b 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -57,14 +57,13 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
Replica.State waitForState = Replica.State.getState(params.get(ZkStateReader.STATE_PROP));
Boolean checkLive = params.getBool("checkLive");
Boolean onlyIfLeader = params.getBool("onlyIfLeader");
- Boolean onlyIfLeaderActive = params.getBool("onlyIfLeaderActive");
CoreContainer coreContainer = it.handler.coreContainer;
// wait long enough for the leader conflict to work itself out plus a little extra
int conflictWaitMs = coreContainer.getZkController().getLeaderConflictResolveWait();
log.info(
- "Going to wait for coreNodeName: {}, state: {}, checkLive: {}, onlyIfLeader: {}, onlyIfLeaderActive: {}",
- coreNodeName, waitForState, checkLive, onlyIfLeader, onlyIfLeaderActive);
+ "Going to wait for coreNodeName: {}, state: {}, checkLive: {}, onlyIfLeader: {}: {}",
+ coreNodeName, waitForState, checkLive, onlyIfLeader);
String collectionName;
CloudDescriptor cloudDescriptor;
@@ -97,8 +96,10 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
AtomicReference<String> errorMessage = new AtomicReference<>();
try {
coreContainer.getZkController().getZkStateReader().waitForState(collectionName, conflictWaitMs, TimeUnit.MILLISECONDS, (n, c) -> {
- if (c == null)
+ if (c == null) {
+ log.info("collection not found {}",collectionName);
return false;
+ }
// wait until we are sure the recovering node is ready
// to accept updates
@@ -113,20 +114,6 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
final Replica.State localState = cloudDescriptor.getLastPublished();
- // TODO: This is funky but I've seen this in testing where the replica asks the
- // leader to be in recovery? Need to track down how that happens ... in the meantime,
- // this is a safeguard
- boolean leaderDoesNotNeedRecovery = (onlyIfLeader != null &&
- onlyIfLeader &&
- cname.equals(replica.getStr("core")) &&
- waitForState == Replica.State.RECOVERING &&
- localState == Replica.State.ACTIVE &&
- state == Replica.State.ACTIVE);
-
- if (leaderDoesNotNeedRecovery) {
- log.warn("Leader {} ignoring request to be in the recovering state because it is live and active.", cname);
- }
-
ZkShardTerms shardTerms = coreContainer.getZkController().getShardTerms(collectionName, slice.getName());
// if the replica is waiting for leader to see recovery state, the leader should refresh its terms
if (waitForState == Replica.State.RECOVERING && shardTerms.registered(coreNodeName)
@@ -137,24 +124,27 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
shardTerms.refreshTerms(null);
}
- boolean onlyIfActiveCheckResult = onlyIfLeaderActive != null && onlyIfLeaderActive
- && localState != Replica.State.ACTIVE;
if (log.isInfoEnabled()) {
log.info(
"In WaitForState(" + waitForState + "): collection=" + collectionName + ", shard=" + slice.getName() +
- ", thisCore=" + cname + ", leaderDoesNotNeedRecovery=" + leaderDoesNotNeedRecovery +
+ ", thisCore=" + cname +
", isLeader? " + cloudDescriptor.isLeader() +
", live=" + live + ", checkLive=" + checkLive + ", currentState=" + state
+ ", localState=" + localState + ", nodeName=" + nodeName +
- ", coreNodeName=" + coreNodeName + ", onlyIfActiveCheckResult=" + onlyIfActiveCheckResult
+ ", coreNodeName=" + coreNodeName
+ ", nodeProps: " + replica); //LOGOK
}
- if (!onlyIfActiveCheckResult && replica != null && (state == waitForState || leaderDoesNotNeedRecovery)) {
+
+ log.info("replica={} state={} waitForState={}", replica, state, waitForState);
+ if (replica != null && (state == waitForState)) {
if (checkLive == null) {
+ log.info("checkLive=false, return true");
return true;
} else if (checkLive && live) {
+ log.info("checkLive=true live={}, return true", live);
return true;
} else if (!checkLive && !live) {
+ log.info("checkLive=false live={}, return true", live);
return true;
}
}
diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
index 6ff27a7..e02ec88 100644
--- a/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
+++ b/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
@@ -42,17 +42,25 @@ public class SolrQoSFilter extends QoSFilter {
static final String SUSPEND_INIT_PARAM = "suspendMs";
static final int PROC_COUNT = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors();
- protected int _origMaxRequests;
+ protected volatile int _origMaxRequests;
private static SysStats sysStats = ParWork.getSysStats();
+ private QoSFilter internQosFilter;
+ private volatile long lastUpdate;
@Override
public void init(FilterConfig filterConfig) {
super.init(filterConfig);
- _origMaxRequests = Integer.getInteger("solr.concurrentRequests.max", 1000);
+ _origMaxRequests = Integer.getInteger("solr.concurrentRequests.max", 10000);
super.setMaxRequests(_origMaxRequests);
super.setSuspendMs(Integer.getInteger("solr.concurrentRequests.suspendms", 20000));
- super.setWaitMs(Integer.getInteger("solr.concurrentRequests.waitms", 5000));
+ super.setWaitMs(Integer.getInteger("solr.concurrentRequests.waitms", 500));
+
+ internQosFilter = new QoSFilter();
+ internQosFilter.init(filterConfig);
+ internQosFilter.setMaxRequests(10000);
+ internQosFilter.setSuspendMs(30000);
+ internQosFilter.setWaitMs(50);
}
@Override
@@ -75,30 +83,39 @@ public class SolrQoSFilter extends QoSFilter {
if (cMax > 5) {
int max = Math.max(5, (int) ((double)cMax * 0.60D));
log.warn("Our individual load is {}, set max concurrent requests to {}", ourLoad, max);
- //setMaxRequests(max);
+ updateMaxRequests(max);
}
} else {
// nocommit - deal with no supported, use this as a fail safe with high and low watermark?
double sLoad = sysStats.getSystemLoad();
- if (sLoad > 1) {
+ if (sLoad > 1.3) {
int cMax = getMaxRequests();
if (cMax > 5) {
- int max = Math.max(5, (int) ((double) cMax * 0.60D));
+ int max = Math.max(5, (int) ((double) cMax * 0.95D));
log.warn("System load is {}, set max concurrent requests to {}", sLoad, max);
- //setMaxRequests(max);
+ updateMaxRequests(max);
}
} else if (sLoad < 0.95 && _origMaxRequests != getMaxRequests()) {
if (log.isDebugEnabled()) log.debug("set max concurrent requests to orig value {}", _origMaxRequests);
- //setMaxRequests(_origMaxRequests);
+ updateMaxRequests(_origMaxRequests);
}
}
+ //chain.doFilter(req, response);
super.doFilter(req, response, chain);
} else {
if (log.isDebugEnabled()) log.debug("internal request, allow");
- chain.doFilter(req, response);
+ // chain.doFilter(req, response);
+ internQosFilter.doFilter(req, response, chain);
+ }
+ }
+
+ private void updateMaxRequests(int max) {
+ if (System.currentTimeMillis() - lastUpdate < 10000) {
+ lastUpdate = System.currentTimeMillis();
+ setMaxRequests(max);
}
}
}
\ No newline at end of file
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index a65d0a4..0fd781a 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -39,7 +39,6 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.core.Diagnostics;
@@ -53,7 +52,7 @@ import org.slf4j.LoggerFactory;
* Used for distributing commands from a shard leader to its replicas.
*/
public class SolrCmdDistributor implements Closeable {
- private static final int MAX_RETRIES_ON_FORWARD = 3;
+ private static final int MAX_RETRIES_ON_FORWARD = 1;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private volatile boolean finished = false; // see finish()
@@ -68,19 +67,23 @@ public class SolrCmdDistributor implements Closeable {
private final Http2SolrClient solrClient;
+ Http2SolrClient.AsyncTracker tracker = new Http2SolrClient.AsyncTracker(-1);
+
public SolrCmdDistributor(UpdateShardHandler updateShardHandler) {
assert ObjectReleaseTracker.track(this);
- this.solrClient = new Http2SolrClient.Builder().markInternalRequest().withHttpClient(updateShardHandler.getTheSharedHttpClient()).idleTimeout(60000).build();
+ this.solrClient = updateShardHandler.getTheSharedHttpClient();
+ // nocommit set inorder matters
}
public void finish() {
assert !finished : "lifecycle sanity check";
- solrClient.waitForOutstandingRequests();
+ // nonCommitTracker.waitForComplete();
+ tracker.waitForComplete();
finished = true;
}
public void close() {
- solrClient.close();
+ tracker.close();
assert ObjectReleaseTracker.release(this);
}
@@ -152,7 +155,7 @@ public class SolrCmdDistributor implements Closeable {
} else {
uReq.deleteByQuery(cmd.query);
}
- submit(new Req(cmd, node, uReq, sync, rollupTracker, leaderTracker), true);
+ submit(new Req(cmd, node, uReq, sync, rollupTracker, leaderTracker));
}
}
@@ -176,7 +179,7 @@ public class SolrCmdDistributor implements Closeable {
if (cmd.isInPlaceUpdate()) {
params.set(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, String.valueOf(cmd.prevVersion));
}
- submit(new Req(cmd, node, uReq, synchronous, rollupTracker, leaderTracker), true);
+ submit(new Req(cmd, node, uReq, synchronous, rollupTracker, leaderTracker));
}
}
@@ -192,12 +195,12 @@ public class SolrCmdDistributor implements Closeable {
uReq.setParams(params);
addCommit(uReq, cmd);
- submit(new Req(cmd, node, uReq, false), true);
+ submit(new Req(cmd, node, uReq, false));
}
}
public void blockAndDoRetries() {
- solrClient.waitForOutstandingRequests();
+ tracker.waitForComplete();
}
void addCommit(UpdateRequest ureq, CommitUpdateCommand cmd) {
@@ -206,7 +209,7 @@ public class SolrCmdDistributor implements Closeable {
: AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes, cmd.openSearcher);
}
- private void submit(final Req req, boolean register) {
+ private void submit(final Req req) {
if (log.isDebugEnabled()) {
log.debug("sending update to " + req.node.getUrl() + " retry:" + req.retries + " " + req.cmd + " params:" + req.uReq.getParams());
@@ -234,40 +237,45 @@ public class SolrCmdDistributor implements Closeable {
}
try {
-
- solrClient.asyncRequest(req.uReq, null, new AsyncListener<NamedList<Object>>() {
+ tracker.register();
+ solrClient.asyncRequest(req.uReq, null, new AsyncListener<>() {
@Override
public void onSuccess(NamedList result) {
if (log.isTraceEnabled()) log.trace("Success for distrib update {}", result);
+ tracker.arrive();
}
@Override
public void onFailure(Throwable t) {
log.error("Exception sending dist update", t);
+ try {
+ Error error = new Error();
+ error.t = t;
+ error.req = req;
+ if (t instanceof SolrException) {
+ error.statusCode = ((SolrException) t).code();
+ }
- Error error = new Error();
- error.t = t;
- error.req = req;
- if (t instanceof SolrException) {
- error.statusCode = ((SolrException) t).code();
- }
-
- boolean retry = false;
- if (checkRetry(error)) {
- retry = true;
- }
+ boolean retry = false;
+ if (checkRetry(error)) {
+ retry = true;
+ }
- if (retry) {
- log.info("Retrying distrib update on error: {}", t.getMessage());
- submit(req, true);
- return;
- } else {
- allErrors.add(error);
+ if (retry) {
+ log.info("Retrying distrib update on error: {}", t.getMessage());
+ submit(req);
+ return;
+ } else {
+ allErrors.add(error);
+ }
+ } finally {
+ tracker.arrive();
}
}
});
} catch (Exception e) {
log.error("Exception sending dist update", e);
+ tracker.arrive();
Error error = new Error();
error.t = e;
error.req = req;
@@ -275,7 +283,7 @@ public class SolrCmdDistributor implements Closeable {
error.statusCode = ((SolrException) e).code();
}
if (checkRetry(error)) {
- submit(req, true);
+ submit(req);
} else {
allErrors.add(error);
}
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
index eec60ee..9c39e1d 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
@@ -103,6 +103,7 @@ public class UpdateShardHandler implements SolrInfoBean {
if (cfg != null) {
updateOnlyClientBuilder
.connectionTimeout(cfg.getDistributedConnectionTimeout())
+ .maxOutstandingAsyncRequests(-1)
.idleTimeout(cfg.getDistributedSocketTimeout());
}
updateOnlyClient = updateOnlyClientBuilder.markInternalRequest().build();
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index c3d2423..1124f7e 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -250,7 +250,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
- try (ParWork worker = new ParWork(this)) {
+ try (ParWork worker = new ParWork(this, false, true)) {
if (!forwardToLeader && !didLocalAdd) {
worker.collect("localAddUpdate", () -> {
diff --git a/solr/server/etc/jetty-http.xml b/solr/server/etc/jetty-http.xml
index 0410559..7d0a233 100644
--- a/solr/server/etc/jetty-http.xml
+++ b/solr/server/etc/jetty-http.xml
@@ -35,10 +35,10 @@
<New class="org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory">
<Arg name="config"><Ref refid="httpConfig" /></Arg>
<Set name="maxConcurrentStreams">512</Set>
- <Set name="inputBufferSize">8192</Set>
+ <Set name="inputBufferSize">16384</Set>
<Set name="rateControlFactory">
<New class="org.eclipse.jetty.http2.parser.WindowRateControl$Factory">
- <Arg type="int"><Property name="jetty.http2.rateControl.maxEventsPerSecond" default="1000"/></Arg>
+ <Arg type="int"><Property name="jetty.http2.rateControl.maxEventsPerSecond" default="1000000"/></Arg>
</New>
</Set>
</New>
diff --git a/solr/server/etc/jetty-https.xml b/solr/server/etc/jetty-https.xml
index 4c5ba8f..97998fa 100644
--- a/solr/server/etc/jetty-https.xml
+++ b/solr/server/etc/jetty-https.xml
@@ -55,10 +55,10 @@
<New class="org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory">
<Arg name="config"><Ref refid="sslHttpConfig"/></Arg>
<Set name="maxConcurrentStreams">512</Set>
- <Set name="inputBufferSize">8192</Set>
+ <Set name="inputBufferSize">16384</Set>
<Set name="rateControlFactory">
<New class="org.eclipse.jetty.http2.parser.WindowRateControl$Factory">
- <Arg type="int"><Property name="jetty.http2.rateControl.maxEventsPerSecond" default="1000"/></Arg>
+ <Arg type="int"><Property name="jetty.http2.rateControl.maxEventsPerSecond" default="1000000"/></Arg>
</New>
</Set>
</New>
diff --git a/solr/server/etc/jetty.xml b/solr/server/etc/jetty.xml
index cb289f2..e4ae796 100644
--- a/solr/server/etc/jetty.xml
+++ b/solr/server/etc/jetty.xml
@@ -40,14 +40,6 @@
</New>
</Arg>
-<Get name="ThreadPool">
- <Set name="minThreads" type="int"><Property name="solr.jetty.threads.min" default="12"/></Set>
- <Set name="maxThreads" type="int"><Property name="solr.jetty.threads.max" default="10000"/></Set>
- <Set name="idleTimeout" type="int"><Property name="solr.jetty.threads.idle.timeout" default="45000"/></Set>
- <Set name="stopTimeout" type="int"><Property name="solr.jetty.threads.stop.timeout" default="180000"/></Set>
- <Set name="detailedDump">false</Set>
-</Get>
-
<!-- =========================================================== -->
<!-- Http Configuration. -->
@@ -69,8 +61,8 @@
<Set name="securePort"><Property name="solr.jetty.secure.port" default="8443" /></Set>
<Set name="outputBufferSize"><Property name="solr.jetty.output.buffer.size" default="65536" /></Set>
<Set name="outputAggregationSize"><Property name="solr.jetty.output.aggregation.size" default="16384" /></Set>
- <Set name="requestHeaderSize"><Property name="solr.jetty.request.header.size" default="8192" /></Set>
- <Set name="responseHeaderSize"><Property name="solr.jetty.response.header.size" default="8192" /></Set>
+ <Set name="requestHeaderSize"><Property name="solr.jetty.request.header.size" default="16384" /></Set>
+ <Set name="responseHeaderSize"><Property name="solr.jetty.response.header.size" default="16384" /></Set>
<Set name="sendServerVersion"><Property name="solr.jetty.send.server.version" default="false" /></Set>
<Set name="sendDateHeader"><Property name="solr.jetty.send.date.header" default="false" /></Set>
<Set name="headerCacheSize"><Property name="solr.jetty.header.cache.size" default="1024" /></Set>
diff --git a/solr/server/resources/log4j2.xml b/solr/server/resources/log4j2.xml
index fd83a504..22cbe7f 100644
--- a/solr/server/resources/log4j2.xml
+++ b/solr/server/resources/log4j2.xml
@@ -63,19 +63,18 @@
</Appenders>
<Loggers>
- <AsyncLogger name="org.eclipse.jetty.servlets" level="DEBUG"/>
- <AsyncLogger name="org.eclipse.jetty" level="warn"/>
- <AsyncLogger name="org.eclipse.jetty.server.Server" level="INFO"/>
- <AsyncLogger name="org.apache.hadoop" level="warn"/>
- <AsyncLogger name="org.apache.solr.update.LoggingInfoStream" level="off"/>
- <AsyncLogger name="org.apache.zookeeper" level="warn"/>
- <AsyncLogger name="org.apache.solr.core.SolrCore.SlowRequest" level="info" additivity="false">
+ <AsyncLogger name="org.eclipse.jetty.servlets" level="WARN"/>
+ <AsyncLogger name="org.eclipse.jetty" level="WARN"/>
+ <AsyncLogger name="org.eclipse.jetty.server.Server" level="WARN"/>
+ <AsyncLogger name="org.apache.hadoop" level="WARN"/>
+ <AsyncLogger name="org.apache.solr.update.LoggingInfoStream" level="OFF"/>
+ <AsyncLogger name="org.apache.zookeeper" level="WARN"/>
+ <AsyncLogger name="org.apache.solr.core.SolrCore.SlowRequest" level="INFO" additivity="false">
<AppenderRef ref="SlowLogFile"/>
</AsyncLogger>
- <AsyncRoot level="info">
+ <AsyncRoot level="WARN">
<AppenderRef ref="MainLogFile"/>
- <AppenderRef ref="STDOUT"/>
</AsyncRoot>
</Loggers>
</Configuration>
diff --git a/solr/server/solr/configsets/_default/conf/solrconfig.xml b/solr/server/solr/configsets/_default/conf/solrconfig.xml
index 9009170..a0a6ea2 100644
--- a/solr/server/solr/configsets/_default/conf/solrconfig.xml
+++ b/solr/server/solr/configsets/_default/conf/solrconfig.xml
@@ -149,7 +149,7 @@
before flushing.
If both ramBufferSizeMB and maxBufferedDocs is set, then
Lucene will flush based on whichever limit is hit first. -->
- <!-- <ramBufferSizeMB>100</ramBufferSizeMB> -->
+ <ramBufferSizeMB>300</ramBufferSizeMB>
<!-- <maxBufferedDocs>1000</maxBufferedDocs> -->
<!-- Expert: ramPerThreadHardLimitMB sets the maximum amount of RAM that can be consumed
@@ -181,9 +181,14 @@
can perform merges in the background using separate threads.
The SerialMergeScheduler (Lucene 2.2 default) does not.
-->
- <!--
- <mergeScheduler class="org.apache.lucene.index.ConcurrentMergeScheduler"/>
- -->
+
+ <mergeScheduler class="org.apache.lucene.index.ConcurrentMergeScheduler">
+ <int name="maxThreadCount">6</int>
+ <int name="maxMergeCount">8</int>
+ <bool name="ioThrottle">false</bool>
+
+
+ </mergeScheduler>
<!-- LockFactory
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 0e88ff6..02d719b 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -36,6 +36,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser;
@@ -77,7 +78,6 @@ import org.apache.solr.common.util.SolrQueuedThreadPool;
import org.apache.solr.common.util.SolrScheduledExecutorScheduler;
import org.apache.solr.common.util.Utils;
import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.ProtocolHandlers;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
@@ -118,6 +118,7 @@ import static org.apache.solr.common.util.Utils.getObjectByPath;
*/
public class Http2SolrClient extends SolrClient {
public static final String REQ_PRINCIPAL_KEY = "solr-req-principal";
+ private static final int MAX_OUTSTANDING_REQUESTS = 1000;
private static volatile SSLConfig defaultSSLConfig;
@@ -140,10 +141,11 @@ public class Http2SolrClient extends SolrClient {
private volatile HttpClient httpClient;
private volatile Set<String> queryParams = Collections.emptySet();
private int idleTimeout;
+ private boolean strictEventOrdering;
private volatile ResponseParser parser = new BinaryResponseParser();
private volatile RequestWriter requestWriter = new BinaryRequestWriter();
private final Set<HttpListenerFactory> listenerFactory = ConcurrentHashMap.newKeySet();
- private final AsyncTracker asyncTracker = new AsyncTracker();
+ private final AsyncTracker asyncTracker;
/**
* The URL of the Solr server.
*/
@@ -164,8 +166,11 @@ public class Http2SolrClient extends SolrClient {
}
this.serverBaseUrl = serverBaseUrl;
}
-
+ Integer moar = -1;
+ if (builder.maxOutstandingAsyncRequests != null) moar = builder.maxOutstandingAsyncRequests;
+ asyncTracker = new AsyncTracker(moar);
this.headers = builder.headers;
+ this.strictEventOrdering = builder.strictEventOrdering;
if (builder.idleTimeout != null && builder.idleTimeout > 0) idleTimeout = builder.idleTimeout;
else idleTimeout = HttpClientUtil.DEFAULT_SO_TIMEOUT;
@@ -211,25 +216,34 @@ public class Http2SolrClient extends SolrClient {
ssl = true;
}
// nocommit - look at config again as well
- httpClientExecutor = new SolrQueuedThreadPool("httpClient",Integer.getInteger("solr.maxHttp2ClientThreads", Math.max(12, ParWork.PROC_COUNT / 2)), Integer.getInteger("solr.minHttp2ClientThreads", 8), idleTimeout);
+ int minThreads = Integer.getInteger("solr.minHttp2ClientThreads", 12);
+ httpClientExecutor = new SolrQueuedThreadPool("http2Client",
+ Integer.getInteger("solr.maxHttp2ClientThreads", Math.min(16, ParWork.PROC_COUNT / 2)),
+ minThreads,
+ this.headers != null && this.headers.containsKey(QoSParams.REQUEST_SOURCE) &&
+ this.headers.get(QoSParams.REQUEST_SOURCE).equals(QoSParams.INTERNAL) ? 3000 : 5000, new ArrayBlockingQueue<>(minThreads, true),
+ (int) TimeUnit.SECONDS.toMillis(30), null);
httpClientExecutor.setLowThreadsThreshold(-1);
boolean sslOnJava8OrLower = ssl && !Constants.JRE_IS_MINIMUM_JAVA9;
- HttpClientTransport transport;
if (builder.useHttp1_1 || sslOnJava8OrLower) {
if (sslOnJava8OrLower && !builder.useHttp1_1) {
log.warn("Create Http2SolrClient with HTTP/1.1 transport since Java 8 or lower versions does not support SSL + HTTP/2");
} else {
log.debug("Create Http2SolrClient with HTTP/1.1 transport");
}
- transport = new HttpClientTransportOverHTTP(2);
+ SolrHttpClientTransportOverHTTP transport = new SolrHttpClientTransportOverHTTP(1);
+ transport.getHttpClient().setIdleTimeout(idleTimeout);
httpClient = new HttpClient(transport, sslContextFactory);
if (builder.maxConnectionsPerHost != null) httpClient.setMaxConnectionsPerDestination(builder.maxConnectionsPerHost);
} else {
log.debug("Create Http2SolrClient with HTTP/2 transport");
HTTP2Client http2client = new HTTP2Client();
- http2client.setSelectors(2);
- transport = new HttpClientTransportOverHTTP2(http2client);
+ http2client.setSelectors(1);
+ http2client.setIdleTimeout(idleTimeout);
+ http2client.setMaxConcurrentPushedStreams(512);
+ http2client.setInputBufferSize(16384);
+ HttpClientTransportOverHTTP2 transport = new HttpClientTransportOverHTTP2(http2client);
httpClient = new SolrInternalHttpClient(transport, sslContextFactory);
if (builder.maxConnectionsPerHost != null) httpClient.setMaxConnectionsPerDestination(builder.maxConnectionsPerHost);
}
@@ -243,10 +257,10 @@ public class Http2SolrClient extends SolrClient {
httpClient.manage(scheduler);
httpClient.setExecutor(httpClientExecutor);
httpClient.manage(httpClientExecutor);
- httpClient.setStrictEventOrdering(true);
+ httpClient.setStrictEventOrdering(strictEventOrdering);
httpClient.setConnectBlocking(false);
httpClient.setFollowRedirects(false);
- httpClient.setMaxRequestsQueuedPerDestination(1024);
+ httpClient.setMaxRequestsQueuedPerDestination(100000);
httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT, AGENT));
httpClient.setIdleTimeout(idleTimeout);
httpClient.setTCPNoDelay(true);
@@ -414,41 +428,53 @@ public class Http2SolrClient extends SolrClient {
final ResponseParser parser = solrRequest.getResponseParser() == null
? this.parser: solrRequest.getResponseParser();
asyncTracker.register();
- req.send(new InputStreamResponseListener() {
- @Override
- public void onHeaders(Response response) {
- super.onHeaders(response);
- InputStreamResponseListener listener = this;
- ParWork.getRootSharedExecutor().execute(() -> {
- InputStream is = listener.getInputStream();
- try {
- NamedList<Object> body = processErrorsAndResponse(solrRequest, parser, response, is);
- asyncListener.onSuccess(body);
- } catch (RemoteSolrException e) {
- if (SolrException.getRootCause(e) != CANCELLED_EXCEPTION) {
- asyncListener.onFailure(e);
- }
- } catch (SolrServerException e) {
- asyncListener.onFailure(e);
- } finally {
- asyncTracker.arrive();
- }
- });
- }
-
- @Override
- public void onFailure(Response response, Throwable failure) {
+ try {
+ req.send(new InputStreamResponseListener() {
+ @Override
+ public void onHeaders(Response response) {
+ super.onHeaders(response);
+ InputStreamResponseListener listener = this;
+ ParWork.getRootSharedExecutor().execute(() -> {
+ if (log.isDebugEnabled()) log.debug("async response ready");
+ InputStream is = listener.getInputStream();
try {
- super.onFailure(response, failure);
- if (failure != CANCELLED_EXCEPTION) {
- asyncListener.onFailure(new SolrServerException(failure.getMessage(), failure));
+ NamedList<Object> body = processErrorsAndResponse(solrRequest, parser, response, is);
+ asyncListener.onSuccess(body);
+ } catch (RemoteSolrException e) {
+ if (SolrException.getRootCause(e) != CANCELLED_EXCEPTION) {
+ asyncListener.onFailure(e);
}
+ } catch (SolrServerException e) {
+ asyncListener.onFailure(e);
} finally {
asyncTracker.arrive();
}
+ });
+ }
+
+ @Override
+ public void onFailure(Response response, Throwable failure) {
+ try {
+ super.onFailure(response, failure);
+ if (failure != CANCELLED_EXCEPTION) {
+ asyncListener.onFailure(new SolrServerException(failure.getMessage(), failure));
+ }
+ } finally {
+ asyncTracker.arrive();
}
- });
- return () -> req.abort(CANCELLED_EXCEPTION);
+ }
+ });
+ } catch (Exception e) {
+ asyncTracker.arrive();
+ throw new SolrException(SolrException.ErrorCode.UNKNOWN, e);
+ }
+ return () -> {
+ try {
+ req.abort(CANCELLED_EXCEPTION);
+ } finally {
+ asyncTracker.arrive();
+ }
+ };
}
@Override
@@ -875,16 +901,18 @@ public class Http2SolrClient extends SolrClient {
public static class AsyncTracker {
- private static final int MAX_OUTSTANDING_REQUESTS = 50;
-
private final Semaphore available;
// wait for async requests
private final Phaser phaser = new ThePhaser(1);
// maximum outstanding requests left
- public AsyncTracker() {
- available = new Semaphore(MAX_OUTSTANDING_REQUESTS, true);
+ public AsyncTracker(int maxOutstandingAsyncRequests) {
+ if (maxOutstandingAsyncRequests > 0) {
+ available = new Semaphore(maxOutstandingAsyncRequests, true);
+ } else {
+ available = null;
+ }
}
int getMaxRequestsQueuedPerDestination() {
@@ -902,7 +930,7 @@ public class Http2SolrClient extends SolrClient {
public void close() {
phaser.forceTermination();
- available.release(available.getQueueLength() + 5);
+ if (available != null) available.release(available.getQueueLength() + 5);
}
public void register() {
@@ -911,7 +939,7 @@ public class Http2SolrClient extends SolrClient {
}
phaser.register();
try {
- available.acquire();
+ if (available != null) available.acquire();
} catch (InterruptedException e) {
log.warn("interrupted", e);
}
@@ -919,7 +947,7 @@ public class Http2SolrClient extends SolrClient {
public void arrive() {
try {
- available.release();
+ if (available != null) available.release();
} finally {
phaser.arriveAndDeregister();
}
@@ -947,12 +975,14 @@ public class Http2SolrClient extends SolrClient {
private Http2SolrClient http2SolrClient;
private SSLConfig sslConfig = defaultSSLConfig;
- private Integer idleTimeout = Integer.getInteger("solr.http2solrclient.default.idletimeout", 30000);
+ private Integer idleTimeout = Integer.getInteger("solr.http2solrclient.default.idletimeout", 120000);
private Integer connectionTimeout;
- private Integer maxConnectionsPerHost = 128;
+ private Integer maxConnectionsPerHost = 2;
private boolean useHttp1_1 = Boolean.getBoolean("solr.http1");
protected String baseSolrUrl;
protected Map<String,String> headers = new ConcurrentHashMap<>();
+ protected boolean strictEventOrdering = false;
+ private Integer maxOutstandingAsyncRequests;
public Builder() {
@@ -997,6 +1027,11 @@ public class Http2SolrClient extends SolrClient {
return this;
}
+ public Builder strictEventOrdering(boolean strictEventOrdering) {
+ this.strictEventOrdering = strictEventOrdering;
+ return this;
+ }
+
public Builder connectionTimeout(int connectionTimeOut) {
this.connectionTimeout = connectionTimeOut;
return this;
@@ -1022,6 +1057,11 @@ public class Http2SolrClient extends SolrClient {
this.headers.put(header, value);
return this;
}
+
+ public Builder maxOutstandingAsyncRequests(int maxOutstandingAsyncRequests) {
+ this.maxOutstandingAsyncRequests = maxOutstandingAsyncRequests;
+ return this;
+ }
}
public Set<String> getQueryParams() {
@@ -1244,4 +1284,14 @@ public class Http2SolrClient extends SolrClient {
ContentResponse response = httpClient.newRequest(url).method(PUT).content(new BytesContentProvider(bytes), contentType).send();
return response.getContentAsString();
}
+
+ private static class SolrHttpClientTransportOverHTTP extends HttpClientTransportOverHTTP {
+ public SolrHttpClientTransportOverHTTP(int selectors) {
+ super(selectors);
+ }
+
+ public HttpClient getHttpClient() {
+ return super.getHttpClient();
+ }
+ }
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
index 21d9083..202454b 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
@@ -23,6 +23,7 @@ import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.zip.GZIPInputStream;
import java.util.zip.InflaterInputStream;
@@ -79,7 +80,7 @@ public class HttpClientUtil {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final int DEFAULT_CONNECT_TIMEOUT = 10000;
- public static final int DEFAULT_SO_TIMEOUT = 30000;
+ public static final int DEFAULT_SO_TIMEOUT = (int) TimeUnit.MINUTES.toMillis(5);
public static final int DEFAULT_MAXCONNECTIONSPERHOST = 100000;
public static final int DEFAULT_MAXCONNECTIONS = 100000;
@@ -189,19 +190,29 @@ public class HttpClientUtil {
// don't synchronize traversal - can lead to deadlock - CopyOnWriteArrayList is critical
// we also do not want to have to acquire the mutex when the list is empty or put a global
// mutex around the process calls
- interceptors.forEach(new Consumer<HttpRequestInterceptor>() {
-
- @Override
- public void accept(HttpRequestInterceptor interceptor) {
- try {
- interceptor.process(request, context);
- } catch (Exception e) {
- ParWork.propagateInterrupt(e);
- log.error("", e);
- }
- }
- });
+ interceptors.forEach(new HttpRequestInterceptorConsumer(request, context));
+
+ }
+
+ private static class HttpRequestInterceptorConsumer implements Consumer<HttpRequestInterceptor> {
+
+ private final HttpRequest request;
+ private final HttpContext context;
+ public HttpRequestInterceptorConsumer(HttpRequest request, HttpContext context) {
+ this.request = request;
+ this.context = context;
+ }
+
+ @Override
+ public void accept(HttpRequestInterceptor interceptor) {
+ try {
+ interceptor.process(request, context);
+ } catch (Exception e) {
+ ParWork.propagateInterrupt(e);
+ log.error("", e);
+ }
+ }
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 18473d8..6950f9f 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -78,7 +78,7 @@ public class ParWork implements Closeable {
if (EXEC == null) {
synchronized (ParWork.class) {
if (EXEC == null) {
- EXEC = (ThreadPoolExecutor) getParExecutorService(12, Integer.MAX_VALUE, 30000, new SynchronousQueue<>());
+ EXEC = (ThreadPoolExecutor) getParExecutorService(Integer.getInteger("solr.rootSharedThreadPoolCoreSize", 250), Integer.MAX_VALUE, 30000, new SynchronousQueue<>());
((ParWorkExecutor)EXEC).enableCloseLock();
}
}
@@ -429,7 +429,7 @@ public class ParWork implements Closeable {
for (Future<Object> future : results) {
try {
future.get(
- Integer.getInteger("solr.parwork.task_timeout", 120000),
+ Long.getLong("solr.parwork.task_timeout", TimeUnit.MINUTES.toMillis(10)),
TimeUnit.MILLISECONDS); // nocommit
if (!future.isDone() || future.isCancelled()) {
log.warn("A task did not finish isDone={} isCanceled={}",
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
index 818a708..b277e90 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public class ParWorkExecutor extends ThreadPoolExecutor {
private static final Logger log = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
- public static final int KEEP_ALIVE_TIME = 120000;
+ public static final int KEEP_ALIVE_TIME = 30000;
private static AtomicInteger threadNumber = new AtomicInteger(0);
@@ -55,7 +55,7 @@ public class ParWorkExecutor extends ThreadPoolExecutor {
}
public void shutdown() {
- closeTracker.close();
+ assert closeTracker.close();
super.shutdown();
}
@@ -98,7 +98,7 @@ public class ParWorkExecutor extends ThreadPoolExecutor {
try {
r.run();
} finally {
- ParWork.closeMyPerThreadExecutor(true);
+ // ParWork.closeMyPerThreadExecutor(true);
}
}
};
diff --git a/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java b/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
index 3df89ef..f8c175d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
@@ -152,7 +152,7 @@ public class PerThreadExecService extends AbstractExecutorService {
throw new IllegalCallerException();
}
assert ObjectReleaseTracker.release(this);
- //closeTracker.close();
+ // assert closeTracker.close();
this.shutdown = true;
// worker.interrupt();
// workQueue.clear();
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
index eb3768b..92b45c6 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
@@ -78,6 +78,7 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
private final BlockingQueue<Runnable> _jobs;
private final ThreadGroup _threadGroup;
private final ThreadFactory _threadFactory;
+ private final int _queueOfferTimeout;
private String _name = "qtp" + hashCode();
private int _idleTimeout;
private int _maxThreads;
@@ -96,74 +97,57 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
}
public SolrQueuedThreadPool(String name) {
- this(Integer.MAX_VALUE, Integer.getInteger("solr.minContainerThreads", 18),
- 120000, -1, // no reserved executor threads - we can process requests after shutdown or some race - we try to limit without threadpool limits no anyway
- null, null,
+ this(name, Integer.MAX_VALUE, Integer.getInteger("solr.minContainerThreads", 250),
+ 5000, 0, // no reserved executor threads - we can process requests after shutdown or some race - we try to limit without threadpool limits no anyway
+ null, -1, null,
new SolrNamedThreadFactory(name));
this.name = name;
}
public SolrQueuedThreadPool(String name, int maxThreads, int minThreads, int idleTimeout) {
- this(maxThreads, minThreads,
+ this(name, maxThreads, minThreads,
idleTimeout, -1,
- null, null,
+ null, -1, null,
new SolrNamedThreadFactory(name));
- this.name = name;
- }
-
- public SolrQueuedThreadPool(@Name("maxThreads") int maxThreads)
- {
- this(maxThreads, Math.min(8, maxThreads));
- }
-
- public SolrQueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads)
- {
- this(maxThreads, minThreads, 60000);
- }
-
- public SolrQueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("queue") BlockingQueue<Runnable> queue)
- {
- this(maxThreads, minThreads, 60000, -1, queue, null);
- }
-
- public SolrQueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout)
- {
- this(maxThreads, minThreads, idleTimeout, null);
}
- public SolrQueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue)
+ public SolrQueuedThreadPool(String name, @Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout,
+ @Name("queue") BlockingQueue<Runnable> queue, @Name("queueOfferTimeout") int queueOfferTimeout)
{
- this(maxThreads, minThreads, idleTimeout, queue, null);
+ this(name, maxThreads, minThreads, idleTimeout, queue, queueOfferTimeout,null);
}
- public SolrQueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup)
+ public SolrQueuedThreadPool(String name, @Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout,
+ @Name("queue") BlockingQueue<Runnable> queue, @Name("queueOfferTimeout") int queueOfferTimeout, @Name("threadGroup") ThreadGroup threadGroup)
{
- this(maxThreads, minThreads, idleTimeout, -1, queue, threadGroup);
+ this(name, maxThreads, minThreads, idleTimeout, -1, queue, queueOfferTimeout, threadGroup);
}
- public SolrQueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads,
+ private SolrQueuedThreadPool(String name, @Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads,
@Name("idleTimeout") int idleTimeout, @Name("reservedThreads") int reservedThreads,
- @Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup)
+ @Name("queue") BlockingQueue<Runnable> queue, @Name("queueOfferTimeout") int queueOfferTimeout, @Name("threadGroup") ThreadGroup threadGroup)
{
- this(maxThreads, minThreads, idleTimeout, reservedThreads, queue, threadGroup, null);
+ this(name, maxThreads, minThreads, idleTimeout, reservedThreads, queue, queueOfferTimeout, threadGroup, null);
}
- public SolrQueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads,
+ private SolrQueuedThreadPool(String name, @Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads,
@Name("idleTimeout") int idleTimeout, @Name("reservedThreads") int reservedThreads,
- @Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup,
+ @Name("queue") BlockingQueue<Runnable> queue, @Name("queueOfferTimeout") int queueOfferTimeout, @Name("threadGroup") ThreadGroup threadGroup,
@Name("threadFactory") ThreadFactory threadFactory)
{
if (maxThreads < minThreads)
throw new IllegalArgumentException("max threads (" + maxThreads + ") less than min threads (" + minThreads + ")");
+ this.name = name;
+ this._queueOfferTimeout = queueOfferTimeout;
setMinThreads(minThreads);
setMaxThreads(maxThreads);
setIdleTimeout(idleTimeout);
- setReservedThreads(0);
+ setReservedThreads(reservedThreads);
setLowThreadsThreshold(-1);
setStopTimeout(5000);
if (queue == null)
{
- int capacity = Math.max(_minThreads, 8) * 1024;
+ int capacity = Math.max(_minThreads, 8) * 2048;
queue = new BlockingArrayQueue<>(capacity, capacity);
}
_jobs = queue;
@@ -477,12 +461,22 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
break;
}
- if (!_jobs.offer(job))
- {
- // reverse our changes to _counts.
- if (addCounts(-startThread, 1 - startThread))
- LOG.warn("{} rejected {}", this, job);
- throw new RejectedExecutionException(job.toString());
+ if (_queueOfferTimeout == -1) {
+ if (!_jobs.offer(job)) {
+ // reverse our changes to _counts.
+ if (addCounts(-startThread, 1 - startThread)) LOG.warn("{} rejected {}", this, job);
+ throw new RejectedExecutionException(job.toString());
+ }
+ } else {
+ try {
+ if (!_jobs.offer(job, _queueOfferTimeout, TimeUnit.MILLISECONDS)) {
+ // reverse our changes to _counts.
+ if (addCounts(-startThread, 1 - startThread)) LOG.warn("{} rejected {}", this, job);
+ throw new RejectedExecutionException(job.toString());
+ }
+ } catch (InterruptedException e) {
+
+ }
}
if (LOG.isDebugEnabled())
@@ -904,7 +898,7 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
_tryExecutor = TryExecutor.NO_TRY;
int threads = getBusyThreads() + getIdleThreads() + getThreads() * 2;
- BlockingArrayQueue<Runnable> jobs = (BlockingArrayQueue<Runnable>) getQueue();
+ BlockingQueue<Runnable> jobs = (BlockingQueue<Runnable>) getQueue();
//setIdleTimeout(1);
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
index ee6da6c..ba3aba2 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
@@ -290,8 +290,9 @@ public class SolrTestCase extends LuceneTestCase {
// unlimited - System.setProperty("solr.maxContainerThreads", "300");
System.setProperty("solr.lowContainerThreadsThreshold", "-1");
- System.setProperty("solr.minContainerThreads", "8");
- System.setProperty("solr.minHttp2ClientThreads", "8");
+ System.setProperty("solr.minContainerThreads", "4");
+ System.setProperty("solr.rootSharedThreadPoolCoreSize", "60");
+ System.setProperty("solr.minHttp2ClientThreads", "4");
ScheduledTriggers.DEFAULT_COOLDOWN_PERIOD_SECONDS = 1;
@@ -440,7 +441,7 @@ public class SolrTestCase extends LuceneTestCase {
SysStats.getSysStats().stopMonitor();
- ParWork.closeMyPerThreadExecutor(true);
+ //ParWork.closeMyPerThreadExecutor(true);
ParWork.shutdownRootSharedExec();
AlreadyClosedException lastAlreadyClosedExp = CloseTracker.lastAlreadyClosedEx;
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
index 51263f7..74d0615 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
@@ -576,7 +576,7 @@ public class ZkTestServer implements Closeable {
log.error("zkServer error", t);
}
} finally {
- ParWork.closeMyPerThreadExecutor(true);
+ // ParWork.closeMyPerThreadExecutor(true);
}
}
};