You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/19 23:01:20 UTC
[lucene-solr] 01/02: tmptmp5
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 618d7e940bf834018b4d31c7287f5c89a28c7fd7
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sat Sep 19 17:47:01 2020 -0500
tmptmp5
---
gradle/validation/check-environment.gradle | 7 -
.../lucene/benchmark/byTask/utils/Algorithm.java | 1 +
settings.gradle | 1 +
solr/benchmark/build.gradle | 159 +++++++++++++++++++++
solr/cloud-dev/cloud.sh | 6 +-
.../client/solrj/embedded/JettySolrRunner.java | 109 +++++++-------
.../apache/solr/handler/admin/PrepRecoveryOp.java | 36 ++---
.../org/apache/solr/servlet/SolrQoSFilter.java | 3 +-
.../org/apache/solr/update/SolrCmdDistributor.java | 65 +++++----
solr/server/etc/jetty.xml | 8 --
.../solr/configsets/_default/conf/solrconfig.xml | 13 +-
.../solr/client/solrj/impl/Http2SolrClient.java | 95 +++++++-----
.../org/apache/solr/common/ParWorkExecutor.java | 4 +-
.../apache/solr/common/PerThreadExecService.java | 2 +-
.../solr/common/util/SolrQueuedThreadPool.java | 46 ++----
.../src/java/org/apache/solr/SolrTestCase.java | 4 +-
16 files changed, 351 insertions(+), 208 deletions(-)
diff --git a/gradle/validation/check-environment.gradle b/gradle/validation/check-environment.gradle
index 0f3a084..c1b155c 100644
--- a/gradle/validation/check-environment.gradle
+++ b/gradle/validation/check-environment.gradle
@@ -37,11 +37,4 @@ configure(rootProject) {
+ "[${System.getProperty('java.vm.name')} ${System.getProperty('java.vm.version')}]")
}
- // If we're regenerating the wrapper, skip the check.
- if (!gradle.startParameter.taskNames.contains("wrapper")) {
- def currentGradleVersion = GradleVersion.current()
- if (currentGradleVersion != GradleVersion.version(expectedGradleVersion)) {
- throw new GradleException("Gradle ${expectedGradleVersion} is required (hint: use the gradlew script): this gradle is ${currentGradleVersion}")
- }
- }
}
diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Algorithm.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Algorithm.java
index 821db38..8e125c0 100644
--- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Algorithm.java
+++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Algorithm.java
@@ -308,6 +308,7 @@ public class Algorithm implements AutoCloseable {
try {
return Class.forName(pkg+'.'+taskName+"Task");
} catch (ClassNotFoundException e) {
+ e.printStackTrace();
// failed in this package, might succeed in the next one...
}
}
diff --git a/settings.gradle b/settings.gradle
index c635186..6ae6b2a 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -52,6 +52,7 @@ include "lucene:spatial3d"
include "lucene:suggest"
include "lucene:test-framework"
+include "solr:benchmark"
include "solr:solrj"
include "solr:core"
include "solr:server"
diff --git a/solr/benchmark/build.gradle b/solr/benchmark/build.gradle
new file mode 100644
index 0000000..191c582
--- /dev/null
+++ b/solr/benchmark/build.gradle
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'java'
+// NOT a 'java-library'. Maybe 'application' but seems too limiting.
+
+description = 'System for benchmarking Solr'
+
+dependencies {
+ implementation project(':lucene:benchmark')
+ implementation project(':lucene:core')
+ implementation project(':solr:core')
+ implementation project(':solr:solrj')
+
+ implementation project(':lucene:analysis:common')
+ implementation project(':lucene:facet')
+ implementation project(':lucene:highlighter')
+ implementation project(':lucene:queries')
+ implementation project(':lucene:spatial-extras')
+ implementation project(':lucene:queryparser')
+
+ implementation "org.apache.commons:commons-compress"
+ implementation "com.ibm.icu:icu4j"
+ implementation "org.locationtech.spatial4j:spatial4j"
+ implementation("net.sourceforge.nekohtml:nekohtml", {
+ exclude module: "xml-apis"
+ })
+
+ runtimeOnly project(':lucene:analysis:icu')
+
+ testImplementation project(':lucene:test-framework')
+}
+
+def tempDir = file("temp")
+def workDir = file("work")
+
+task run(type: JavaExec) {
+ description "Run a perf test (optional: -PtaskAlg=conf/your-algorithm-file -PmaxHeapSize=1G)"
+ main 'org.apache.lucene.benchmark.byTask.Benchmark'
+ classpath sourceSets.main.runtimeClasspath.plus(sourceSets.main.output.classesDirs)
+ // allow these to be specified on the CLI via -PtaskAlg= for example
+ args = [propertyOrDefault('taskAlg', 'conf/micro-standard.alg')]
+
+ maxHeapSize = propertyOrDefault('maxHeapSize', '1G')
+
+ String stdOutStr = propertyOrDefault('standardOutput', null)
+ if (stdOutStr != null) {
+ standardOutput = new File(stdOutStr).newOutputStream()
+ }
+
+ debugOptions {
+ enabled = false
+ port = 5005
+ suspend = true
+ }
+}
+
+/* Old "collation" Ant target:
+gradle getTop100kWikiWordFiles run -PtaskAlg=conf/collation.alg -PstandardOutput=work/collation.benchmark.output.txt
+perl -CSD scripts/collation.bm2jira.pl work/collation.benchmark.output.txt
+ */
+
+/* Old "shingle" Ant target:
+gradle getReuters run -PtaskAlg=conf/shingle.alg -PstandardOutput=work/shingle.benchmark.output.txt
+perl -CSD scripts/shingle.bm2jira.pl work/shingle.benchmark.output.txt
+ */
+
+// The remaining tasks just get / extract / prepare data
+
+task getEnWiki(type: Download) {
+ def finalName = "enwiki-20070527-pages-articles.xml"
+ src "https://home.apache.org/~dsmiley/data/" + finalName + ".bz2"
+ dest file("$tempDir/" + finalName + ".bz2")
+ overwrite false
+ compress false
+
+ doLast {
+ ant.bunzip2(src: dest, dest: tempDir)
+ }
+ outputs.file file("$tempDir/$finalName")
+}
+
+task getGeoNames(type: Download) {
+ // note: latest data is at: https://download.geonames.org/export/dump/allCountries.zip
+ // and then randomize with: gsort -R -S 1500M file.txt > file_random.txt
+ // and then compress with: bzip2 -9 -k file_random.txt
+ def finalName = "geonames_20130921_randomOrder_allCountries.txt"
+ src "https://home.apache.org/~dsmiley/data/" + finalName + ".bz2"
+ dest file("$tempDir/" + finalName + ".bz2")
+ overwrite false
+ compress false
+
+ doLast {
+ ant.bunzip2(src: dest, dest: tempDir) // will chop off .bz2
+ }
+ outputs.file file("$tempDir/$finalName")
+}
+
+task getTop100kWikiWordFiles(type: Download) {
+ src "https://home.apache.org/~rmuir/wikipedia/top.100k.words.de.en.fr.uk.wikipedia.2009-11.tar.bz2"
+ dest file("$tempDir/${src.file.split('/').last()}")
+ overwrite false
+ compress false
+
+ def finalPath = file("$workDir/top100k-out")
+
+ doLast {
+ project.sync {
+ from tarTree(dest) // defined above. Will decompress on the fly
+ into finalPath
+ }
+ }
+ outputs.dir finalPath
+}
+
+task getReuters(type: Download) {
+ // note: there is no HTTPS url and we don't care because this is merely test/perf data
+ src "http://www.daviddlewis.com/resources/testcollections/reuters21578/reuters21578.tar.gz"
+ dest file("$tempDir/${src.file.split('/').last()}")
+ overwrite false
+ compress false
+
+ def untarPath = file("$workDir/reuters")
+ def finalPath = file("$workDir/reuters-out")
+ dependsOn sourceSets.main.runtimeClasspath
+
+ doLast {
+ project.sync {
+ from(tarTree(dest)) { // defined above. Will decompress on the fly
+ exclude '*.txt'
+ }
+ into untarPath
+ }
+ println "Extracting reuters to $finalPath"
+ finalPath.deleteDir() // necessary
+ // TODO consider porting ExtractReuters to groovy?
+ project.javaexec {
+ main = 'org.apache.lucene.benchmark.utils.ExtractReuters'
+ classpath = sourceSets.main.runtimeClasspath
+ maxHeapSize = '1G'
+ args = [untarPath, finalPath]
+ }
+ }
+ outputs.dir finalPath
+}
diff --git a/solr/cloud-dev/cloud.sh b/solr/cloud-dev/cloud.sh
index 1f6925f..14b904a 100755
--- a/solr/cloud-dev/cloud.sh
+++ b/solr/cloud-dev/cloud.sh
@@ -254,7 +254,7 @@ cleanIfReq() {
recompileIfReq() {
if [[ "$RECOMPILE" = true ]]; then
pushd "$VCS_WORK"/solr
- ant clean create-package
+ ./gradlew clean distTar
if [[ "$?" -ne 0 ]]; then
echo "BUILD FAIL - cloud.sh stopping, see above output for details"; popd; exit 7;
fi
@@ -274,10 +274,10 @@ copyTarball() {
echo "baz"
pushd # back to original dir to properly resolve vcs working dir
echo "foobar:"$(pwd)
- if [[ ! -f $(ls "$VCS_WORK"/solr/package/solr-*.tgz) ]]; then
+ if [[ ! -f $(ls "$VCS_WORK"/solr/packaging/solr-*.tgz) ]]; then
echo "No solr tarball found try again with -r"; popd; exit 10;
fi
- cp "$VCS_WORK"/solr/package/solr-*.tgz ${CLUSTER_WD}
+ cp "$VCS_WORK"/solr/packaging/solr-*.tgz ${CLUSTER_WD}
pushd # back into cluster wd to unpack
tar xzvf solr-*.tgz
popd
diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index d39698d..239b035 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -309,75 +309,62 @@ public class JettySolrRunner implements Closeable {
server.setStopAtShutdown(config.stopAtShutdown);
//if (System.getProperty("jetty.testMode") != null) {
- if (true) {
- // if this property is true, then jetty will be configured to use SSL
- // leveraging the same system properties as java to specify
- // the keystore/truststore if they are set unless specific config
- // is passed via the constructor.
- //
- // This means we will use the same truststore, keystore (and keys) for
- // the server as well as any client actions taken by this JVM in
- // talking to that server, but for the purposes of testing that should
- // be good enough
- final SslContextFactory.Server sslcontext = SSLConfig.createContextFactory(config.sslConfig);
-
- HttpConfiguration configuration = new HttpConfiguration();
- ServerConnector connector;
- if (sslcontext != null) {
- configuration.setSecureScheme("https");
- configuration.addCustomizer(new SecureRequestCustomizer());
- HttpConnectionFactory http1ConnectionFactory = new HttpConnectionFactory(configuration);
-
- if (config.onlyHttp1 || !Constants.JRE_IS_MINIMUM_JAVA9) {
- connector = new ServerConnector(server, qtp, scheduler, null, 1, 2, new SslConnectionFactory(sslcontext,
- http1ConnectionFactory.getProtocol()),
- http1ConnectionFactory);
- } else {
- sslcontext.setCipherComparator(HTTP2Cipher.COMPARATOR);
- connector = new ServerConnector(server, qtp, scheduler, null, 1, 2);
- SslConnectionFactory sslConnectionFactory = new SslConnectionFactory(sslcontext, "alpn");
- connector.addConnectionFactory(sslConnectionFactory);
- connector.setDefaultProtocol(sslConnectionFactory.getProtocol());
+ // if this property is true, then jetty will be configured to use SSL
+ // leveraging the same system properties as java to specify
+ // the keystore/truststore if they are set unless specific config
+ // is passed via the constructor.
+ //
+ // This means we will use the same truststore, keystore (and keys) for
+ // the server as well as any client actions taken by this JVM in
+ // talking to that server, but for the purposes of testing that should
+ // be good enough
+ final SslContextFactory.Server sslcontext = SSLConfig.createContextFactory(config.sslConfig);
+
+ HttpConfiguration configuration = new HttpConfiguration();
+ ServerConnector connector;
+ if (sslcontext != null) {
+ configuration.setSecureScheme("https");
+ configuration.addCustomizer(new SecureRequestCustomizer());
+ HttpConnectionFactory http1ConnectionFactory = new HttpConnectionFactory(configuration);
+
+ if (config.onlyHttp1 || !Constants.JRE_IS_MINIMUM_JAVA9) {
+ connector = new ServerConnector(server, qtp, scheduler, null, 1, 2, new SslConnectionFactory(sslcontext, http1ConnectionFactory.getProtocol()), http1ConnectionFactory);
+ } else {
+ sslcontext.setCipherComparator(HTTP2Cipher.COMPARATOR);
- HTTP2ServerConnectionFactory http2ConnectionFactory = new HTTP2ServerConnectionFactory(configuration);
+ connector = new ServerConnector(server, qtp, scheduler, null, 1, 2);
+ SslConnectionFactory sslConnectionFactory = new SslConnectionFactory(sslcontext, "alpn");
+ connector.addConnectionFactory(sslConnectionFactory);
+ connector.setDefaultProtocol(sslConnectionFactory.getProtocol());
- http2ConnectionFactory.setMaxConcurrentStreams(512);
- http2ConnectionFactory.setInputBufferSize(16384);
+ HTTP2ServerConnectionFactory http2ConnectionFactory = new HTTP2ServerConnectionFactory(configuration);
- ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory(
- http2ConnectionFactory.getProtocol(),
- http1ConnectionFactory.getProtocol());
- alpn.setDefaultProtocol(http2ConnectionFactory.getProtocol());
- connector.addConnectionFactory(alpn);
- connector.addConnectionFactory(http1ConnectionFactory);
- connector.addConnectionFactory(http2ConnectionFactory);
- }
+ http2ConnectionFactory.setMaxConcurrentStreams(512);
+ http2ConnectionFactory.setInputBufferSize(8192);
+ http2ConnectionFactory.setStreamIdleTimeout(TimeUnit.MINUTES.toMillis(10));
+
+ ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory(http2ConnectionFactory.getProtocol(), http1ConnectionFactory.getProtocol());
+ alpn.setDefaultProtocol(http2ConnectionFactory.getProtocol());
+ connector.addConnectionFactory(alpn);
+ connector.addConnectionFactory(http1ConnectionFactory);
+ connector.addConnectionFactory(http2ConnectionFactory);
+ }
+ } else {
+ if (config.onlyHttp1) {
+ connector = new ServerConnector(server, qtp, scheduler, null, 1, 2, new HttpConnectionFactory(configuration));
} else {
- if (config.onlyHttp1) {
- connector = new ServerConnector(server, qtp, scheduler, null, 1, 2, new HttpConnectionFactory(configuration));
- } else {
- connector = new ServerConnector(server, qtp, scheduler, null, 1, 2, new HttpConnectionFactory(configuration),
- new HTTP2CServerConnectionFactory(configuration));
- }
+ connector = new ServerConnector(server, qtp, scheduler, null, 1, 2, new HttpConnectionFactory(configuration), new HTTP2CServerConnectionFactory(configuration));
}
+ }
+ connector.setIdleTimeout(TimeUnit.MINUTES.toMillis(10));
+ connector.setReuseAddress(true);
+ connector.setSoLingerTime(-1);
+ connector.setPort(port);
+ connector.setHost("127.0.0.1");
- connector.setReuseAddress(true);
- connector.setSoLingerTime(-1);
- connector.setPort(port);
- connector.setHost("127.0.0.1");
-
- server.setConnectors(new Connector[] {connector});
+ server.setConnectors(new Connector[] {connector});
- } else {
- HttpConfiguration configuration = new HttpConfiguration();
- configuration.setIdleTimeout(Integer.getInteger("solr.containerThreadsIdle", THREAD_POOL_MAX_IDLE_TIME_MS));
- ServerConnector connector = new ServerConnector(server, new HttpConnectionFactory(configuration));
- connector.setReuseAddress(true);
- connector.setPort(port);
- connector.setSoLingerTime(-1);
- server.setConnectors(new Connector[] {connector});
- }
//server.setDumpAfterStart(true);
// server.setDumpBeforeStop(true);
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index ba65d40..12a2f6b 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -57,14 +57,13 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
Replica.State waitForState = Replica.State.getState(params.get(ZkStateReader.STATE_PROP));
Boolean checkLive = params.getBool("checkLive");
Boolean onlyIfLeader = params.getBool("onlyIfLeader");
- Boolean onlyIfLeaderActive = params.getBool("onlyIfLeaderActive");
CoreContainer coreContainer = it.handler.coreContainer;
// wait long enough for the leader conflict to work itself out plus a little extra
int conflictWaitMs = coreContainer.getZkController().getLeaderConflictResolveWait();
log.info(
- "Going to wait for coreNodeName: {}, state: {}, checkLive: {}, onlyIfLeader: {}, onlyIfLeaderActive: {}",
- coreNodeName, waitForState, checkLive, onlyIfLeader, onlyIfLeaderActive);
+ "Going to wait for coreNodeName: {}, state: {}, checkLive: {}, onlyIfLeader: {}: {}",
+ coreNodeName, waitForState, checkLive, onlyIfLeader);
String collectionName;
CloudDescriptor cloudDescriptor;
@@ -97,8 +96,10 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
AtomicReference<String> errorMessage = new AtomicReference<>();
try {
coreContainer.getZkController().getZkStateReader().waitForState(collectionName, conflictWaitMs, TimeUnit.MILLISECONDS, (n, c) -> {
- if (c == null)
+ if (c == null) {
+ log.info("collection not found {}",collectionName);
return false;
+ }
// wait until we are sure the recovering node is ready
// to accept updates
@@ -113,20 +114,6 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
final Replica.State localState = cloudDescriptor.getLastPublished();
- // TODO: This is funky but I've seen this in testing where the replica asks the
- // leader to be in recovery? Need to track down how that happens ... in the meantime,
- // this is a safeguard
- boolean leaderDoesNotNeedRecovery = (onlyIfLeader != null &&
- onlyIfLeader &&
- cname.equals(replica.getStr("core")) &&
- waitForState == Replica.State.RECOVERING &&
- localState == Replica.State.ACTIVE &&
- state == Replica.State.ACTIVE);
-
- if (leaderDoesNotNeedRecovery) {
- log.warn("Leader {} ignoring request to be in the recovering state because it is live and active.", cname);
- }
-
ZkShardTerms shardTerms = coreContainer.getZkController().getShardTerms(collectionName, slice.getName());
// if the replica is waiting for leader to see recovery state, the leader should refresh its terms
if (waitForState == Replica.State.RECOVERING && shardTerms.registered(coreNodeName)
@@ -137,24 +124,27 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
shardTerms.refreshTerms(null);
}
- boolean onlyIfActiveCheckResult = onlyIfLeaderActive != null && onlyIfLeaderActive
- && localState != Replica.State.ACTIVE;
if (log.isInfoEnabled()) {
log.info(
"In WaitForState(" + waitForState + "): collection=" + collectionName + ", shard=" + slice.getName() +
- ", thisCore=" + cname + ", leaderDoesNotNeedRecovery=" + leaderDoesNotNeedRecovery +
+ ", thisCore=" + cname +
", isLeader? " + cloudDescriptor.isLeader() +
", live=" + live + ", checkLive=" + checkLive + ", currentState=" + state
+ ", localState=" + localState + ", nodeName=" + nodeName +
- ", coreNodeName=" + coreNodeName + ", onlyIfActiveCheckResult=" + onlyIfActiveCheckResult
+ ", coreNodeName=" + coreNodeName
+ ", nodeProps: " + replica); //LOGOK
}
- if (!onlyIfActiveCheckResult && replica != null && (state == waitForState || leaderDoesNotNeedRecovery)) {
+
+ log.info("replica={} state={} waitForState={}", replica, state, waitForState);
+ if (replica != null && (state == waitForState)) {
if (checkLive == null) {
+ log.info("checkLive=false, return true");
return true;
} else if (checkLive && live) {
+ log.info("checkLive=true live={}, return true", live);
return true;
} else if (!checkLive && !live) {
+ log.info("checkLive=false live={}, return true", live);
return true;
}
}
diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
index 6ff27a7..74f8ae1 100644
--- a/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
+++ b/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
@@ -94,7 +94,8 @@ public class SolrQoSFilter extends QoSFilter {
}
}
- super.doFilter(req, response, chain);
+ chain.doFilter(req, response);
+ //super.doFilter(req, response, chain);
} else {
if (log.isDebugEnabled()) log.debug("internal request, allow");
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index a65d0a4..a99c186 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -39,7 +39,6 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.core.Diagnostics;
@@ -68,19 +67,22 @@ public class SolrCmdDistributor implements Closeable {
private final Http2SolrClient solrClient;
+ Http2SolrClient.AsyncTracker tracker = new Http2SolrClient.AsyncTracker();
+
public SolrCmdDistributor(UpdateShardHandler updateShardHandler) {
assert ObjectReleaseTracker.track(this);
- this.solrClient = new Http2SolrClient.Builder().markInternalRequest().withHttpClient(updateShardHandler.getTheSharedHttpClient()).idleTimeout(60000).build();
+ this.solrClient = updateShardHandler.getTheSharedHttpClient();
}
public void finish() {
assert !finished : "lifecycle sanity check";
- solrClient.waitForOutstandingRequests();
+ // nonCommitTracker.waitForComplete();
+ tracker.waitForComplete();
finished = true;
}
public void close() {
- solrClient.close();
+ tracker.close();
assert ObjectReleaseTracker.release(this);
}
@@ -152,7 +154,7 @@ public class SolrCmdDistributor implements Closeable {
} else {
uReq.deleteByQuery(cmd.query);
}
- submit(new Req(cmd, node, uReq, sync, rollupTracker, leaderTracker), true);
+ submit(new Req(cmd, node, uReq, sync, rollupTracker, leaderTracker));
}
}
@@ -176,7 +178,7 @@ public class SolrCmdDistributor implements Closeable {
if (cmd.isInPlaceUpdate()) {
params.set(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, String.valueOf(cmd.prevVersion));
}
- submit(new Req(cmd, node, uReq, synchronous, rollupTracker, leaderTracker), true);
+ submit(new Req(cmd, node, uReq, synchronous, rollupTracker, leaderTracker));
}
}
@@ -192,12 +194,12 @@ public class SolrCmdDistributor implements Closeable {
uReq.setParams(params);
addCommit(uReq, cmd);
- submit(new Req(cmd, node, uReq, false), true);
+ submit(new Req(cmd, node, uReq, false));
}
}
public void blockAndDoRetries() {
- solrClient.waitForOutstandingRequests();
+ tracker.waitForComplete();
}
void addCommit(UpdateRequest ureq, CommitUpdateCommand cmd) {
@@ -206,7 +208,7 @@ public class SolrCmdDistributor implements Closeable {
: AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes, cmd.openSearcher);
}
- private void submit(final Req req, boolean register) {
+ private void submit(final Req req) {
if (log.isDebugEnabled()) {
log.debug("sending update to " + req.node.getUrl() + " retry:" + req.retries + " " + req.cmd + " params:" + req.uReq.getParams());
@@ -234,40 +236,45 @@ public class SolrCmdDistributor implements Closeable {
}
try {
-
- solrClient.asyncRequest(req.uReq, null, new AsyncListener<NamedList<Object>>() {
+ tracker.register();
+ solrClient.asyncRequest(req.uReq, null, new AsyncListener<>() {
@Override
public void onSuccess(NamedList result) {
if (log.isTraceEnabled()) log.trace("Success for distrib update {}", result);
+ tracker.arrive();
}
@Override
public void onFailure(Throwable t) {
log.error("Exception sending dist update", t);
+ try {
+ Error error = new Error();
+ error.t = t;
+ error.req = req;
+ if (t instanceof SolrException) {
+ error.statusCode = ((SolrException) t).code();
+ }
- Error error = new Error();
- error.t = t;
- error.req = req;
- if (t instanceof SolrException) {
- error.statusCode = ((SolrException) t).code();
- }
-
- boolean retry = false;
- if (checkRetry(error)) {
- retry = true;
- }
+ boolean retry = false;
+ if (checkRetry(error)) {
+ retry = true;
+ }
- if (retry) {
- log.info("Retrying distrib update on error: {}", t.getMessage());
- submit(req, true);
- return;
- } else {
- allErrors.add(error);
+ if (retry) {
+ log.info("Retrying distrib update on error: {}", t.getMessage());
+ submit(req);
+ return;
+ } else {
+ allErrors.add(error);
+ }
+ } finally {
+ tracker.arrive();
}
}
});
} catch (Exception e) {
log.error("Exception sending dist update", e);
+ tracker.arrive();
Error error = new Error();
error.t = e;
error.req = req;
@@ -275,7 +282,7 @@ public class SolrCmdDistributor implements Closeable {
error.statusCode = ((SolrException) e).code();
}
if (checkRetry(error)) {
- submit(req, true);
+ submit(req);
} else {
allErrors.add(error);
}
diff --git a/solr/server/etc/jetty.xml b/solr/server/etc/jetty.xml
index cb289f2..310af0d 100644
--- a/solr/server/etc/jetty.xml
+++ b/solr/server/etc/jetty.xml
@@ -40,14 +40,6 @@
</New>
</Arg>
-<Get name="ThreadPool">
- <Set name="minThreads" type="int"><Property name="solr.jetty.threads.min" default="12"/></Set>
- <Set name="maxThreads" type="int"><Property name="solr.jetty.threads.max" default="10000"/></Set>
- <Set name="idleTimeout" type="int"><Property name="solr.jetty.threads.idle.timeout" default="45000"/></Set>
- <Set name="stopTimeout" type="int"><Property name="solr.jetty.threads.stop.timeout" default="180000"/></Set>
- <Set name="detailedDump">false</Set>
-</Get>
-
<!-- =========================================================== -->
<!-- Http Configuration. -->
diff --git a/solr/server/solr/configsets/_default/conf/solrconfig.xml b/solr/server/solr/configsets/_default/conf/solrconfig.xml
index 9009170..a0a6ea2 100644
--- a/solr/server/solr/configsets/_default/conf/solrconfig.xml
+++ b/solr/server/solr/configsets/_default/conf/solrconfig.xml
@@ -149,7 +149,7 @@
before flushing.
If both ramBufferSizeMB and maxBufferedDocs is set, then
Lucene will flush based on whichever limit is hit first. -->
- <!-- <ramBufferSizeMB>100</ramBufferSizeMB> -->
+ <ramBufferSizeMB>300</ramBufferSizeMB>
<!-- <maxBufferedDocs>1000</maxBufferedDocs> -->
<!-- Expert: ramPerThreadHardLimitMB sets the maximum amount of RAM that can be consumed
@@ -181,9 +181,14 @@
can perform merges in the background using separate threads.
The SerialMergeScheduler (Lucene 2.2 default) does not.
-->
- <!--
- <mergeScheduler class="org.apache.lucene.index.ConcurrentMergeScheduler"/>
- -->
+
+ <mergeScheduler class="org.apache.lucene.index.ConcurrentMergeScheduler">
+ <int name="maxThreadCount">6</int>
+ <int name="maxMergeCount">8</int>
+ <bool name="ioThrottle">false</bool>
+
+
+ </mergeScheduler>
<!-- LockFactory
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 0e88ff6..d783c46 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -73,6 +73,7 @@ import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.SolrInternalHttpClient;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.SolrQueuedThreadPool;
import org.apache.solr.common.util.SolrScheduledExecutorScheduler;
import org.apache.solr.common.util.Utils;
@@ -97,6 +98,7 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
+import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.slf4j.Logger;
@@ -140,6 +142,7 @@ public class Http2SolrClient extends SolrClient {
private volatile HttpClient httpClient;
private volatile Set<String> queryParams = Collections.emptySet();
private int idleTimeout;
+ private boolean strictEventOrdering;
private volatile ResponseParser parser = new BinaryResponseParser();
private volatile RequestWriter requestWriter = new BinaryRequestWriter();
private final Set<HttpListenerFactory> listenerFactory = ConcurrentHashMap.newKeySet();
@@ -166,6 +169,7 @@ public class Http2SolrClient extends SolrClient {
}
this.headers = builder.headers;
+ this.strictEventOrdering = builder.strictEventOrdering;
if (builder.idleTimeout != null && builder.idleTimeout > 0) idleTimeout = builder.idleTimeout;
else idleTimeout = HttpClientUtil.DEFAULT_SO_TIMEOUT;
@@ -211,7 +215,10 @@ public class Http2SolrClient extends SolrClient {
ssl = true;
}
// nocommit - look at config again as well
- httpClientExecutor = new SolrQueuedThreadPool("httpClient",Integer.getInteger("solr.maxHttp2ClientThreads", Math.max(12, ParWork.PROC_COUNT / 2)), Integer.getInteger("solr.minHttp2ClientThreads", 8), idleTimeout);
+ httpClientExecutor = new SolrQueuedThreadPool("http2Client",
+ Integer.getInteger("solr.maxHttp2ClientThreads", Math.max(12, ParWork.PROC_COUNT / 2)),
+ Integer.getInteger("solr.minHttp2ClientThreads", 4),
+ this.headers.get(QoSParams.REQUEST_SOURCE).equals(QoSParams.INTERNAL) ? 500 : 5000, null, null);
httpClientExecutor.setLowThreadsThreshold(-1);
boolean sslOnJava8OrLower = ssl && !Constants.JRE_IS_MINIMUM_JAVA9;
@@ -229,6 +236,8 @@ public class Http2SolrClient extends SolrClient {
log.debug("Create Http2SolrClient with HTTP/2 transport");
HTTP2Client http2client = new HTTP2Client();
http2client.setSelectors(2);
+ http2client.setIdleTimeout(idleTimeout);
+ http2client.setMaxConcurrentPushedStreams(512);
transport = new HttpClientTransportOverHTTP2(http2client);
httpClient = new SolrInternalHttpClient(transport, sslContextFactory);
if (builder.maxConnectionsPerHost != null) httpClient.setMaxConnectionsPerDestination(builder.maxConnectionsPerHost);
@@ -243,10 +252,10 @@ public class Http2SolrClient extends SolrClient {
httpClient.manage(scheduler);
httpClient.setExecutor(httpClientExecutor);
httpClient.manage(httpClientExecutor);
- httpClient.setStrictEventOrdering(true);
+ httpClient.setStrictEventOrdering(strictEventOrdering);
httpClient.setConnectBlocking(false);
httpClient.setFollowRedirects(false);
- httpClient.setMaxRequestsQueuedPerDestination(1024);
+ httpClient.setMaxRequestsQueuedPerDestination(10000);
httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT, AGENT));
httpClient.setIdleTimeout(idleTimeout);
httpClient.setTCPNoDelay(true);
@@ -414,41 +423,53 @@ public class Http2SolrClient extends SolrClient {
final ResponseParser parser = solrRequest.getResponseParser() == null
? this.parser: solrRequest.getResponseParser();
asyncTracker.register();
- req.send(new InputStreamResponseListener() {
- @Override
- public void onHeaders(Response response) {
- super.onHeaders(response);
- InputStreamResponseListener listener = this;
- ParWork.getRootSharedExecutor().execute(() -> {
- InputStream is = listener.getInputStream();
- try {
- NamedList<Object> body = processErrorsAndResponse(solrRequest, parser, response, is);
- asyncListener.onSuccess(body);
- } catch (RemoteSolrException e) {
- if (SolrException.getRootCause(e) != CANCELLED_EXCEPTION) {
- asyncListener.onFailure(e);
- }
- } catch (SolrServerException e) {
- asyncListener.onFailure(e);
- } finally {
- asyncTracker.arrive();
- }
- });
- }
-
- @Override
- public void onFailure(Response response, Throwable failure) {
+ try {
+ req.send(new InputStreamResponseListener() {
+ @Override
+ public void onHeaders(Response response) {
+ super.onHeaders(response);
+ InputStreamResponseListener listener = this;
+ ParWork.getRootSharedExecutor().execute(() -> {
+ if (log.isDebugEnabled()) log.debug("async response ready");
+ InputStream is = listener.getInputStream();
try {
- super.onFailure(response, failure);
- if (failure != CANCELLED_EXCEPTION) {
- asyncListener.onFailure(new SolrServerException(failure.getMessage(), failure));
+ NamedList<Object> body = processErrorsAndResponse(solrRequest, parser, response, is);
+ asyncListener.onSuccess(body);
+ } catch (RemoteSolrException e) {
+ if (SolrException.getRootCause(e) != CANCELLED_EXCEPTION) {
+ asyncListener.onFailure(e);
}
+ } catch (SolrServerException e) {
+ asyncListener.onFailure(e);
} finally {
asyncTracker.arrive();
}
+ });
+ }
+
+ @Override
+ public void onFailure(Response response, Throwable failure) {
+ try {
+ super.onFailure(response, failure);
+ if (failure != CANCELLED_EXCEPTION) {
+ asyncListener.onFailure(new SolrServerException(failure.getMessage(), failure));
+ }
+ } finally {
+ asyncTracker.arrive();
}
- });
- return () -> req.abort(CANCELLED_EXCEPTION);
+ }
+ });
+ } catch (Exception e) {
+ asyncTracker.arrive();
+ throw new SolrException(SolrException.ErrorCode.UNKNOWN, e);
+ }
+ return () -> {
+ try {
+ req.abort(CANCELLED_EXCEPTION);
+ } finally {
+ asyncTracker.arrive();
+ }
+ };
}
@Override
@@ -875,7 +896,7 @@ public class Http2SolrClient extends SolrClient {
public static class AsyncTracker {
- private static final int MAX_OUTSTANDING_REQUESTS = 50;
+ private static final int MAX_OUTSTANDING_REQUESTS = 500;
private final Semaphore available;
@@ -947,12 +968,13 @@ public class Http2SolrClient extends SolrClient {
private Http2SolrClient http2SolrClient;
private SSLConfig sslConfig = defaultSSLConfig;
- private Integer idleTimeout = Integer.getInteger("solr.http2solrclient.default.idletimeout", 30000);
+ private Integer idleTimeout = Integer.getInteger("solr.http2solrclient.default.idletimeout", 120000);
private Integer connectionTimeout;
private Integer maxConnectionsPerHost = 128;
private boolean useHttp1_1 = Boolean.getBoolean("solr.http1");
protected String baseSolrUrl;
protected Map<String,String> headers = new ConcurrentHashMap<>();
+ protected boolean strictEventOrdering = false;
public Builder() {
@@ -997,6 +1019,11 @@ public class Http2SolrClient extends SolrClient {
return this;
}
+ public Builder strictEventOrdering(boolean strictEventOrdering) {
+ this.strictEventOrdering = strictEventOrdering;
+ return this;
+ }
+
public Builder connectionTimeout(int connectionTimeOut) {
this.connectionTimeout = connectionTimeOut;
return this;
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
index 818a708..48cf2fe 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public class ParWorkExecutor extends ThreadPoolExecutor {
private static final Logger log = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
- public static final int KEEP_ALIVE_TIME = 120000;
+ public static final int KEEP_ALIVE_TIME = 15000;
private static AtomicInteger threadNumber = new AtomicInteger(0);
@@ -55,7 +55,7 @@ public class ParWorkExecutor extends ThreadPoolExecutor {
}
public void shutdown() {
- closeTracker.close();
+ assert closeTracker.close();
super.shutdown();
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java b/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
index 3df89ef..f8c175d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/PerThreadExecService.java
@@ -152,7 +152,7 @@ public class PerThreadExecService extends AbstractExecutorService {
throw new IllegalCallerException();
}
assert ObjectReleaseTracker.release(this);
- //closeTracker.close();
+ // assert closeTracker.close();
this.shutdown = true;
// worker.interrupt();
// workQueue.clear();
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
index eb3768b..4c393f1 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
@@ -96,65 +96,45 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
}
public SolrQueuedThreadPool(String name) {
- this(Integer.MAX_VALUE, Integer.getInteger("solr.minContainerThreads", 18),
- 120000, -1, // no reserved executor threads - we can process requests after shutdown or some race - we try to limit without threadpool limits no anyway
+ this(name, Integer.MAX_VALUE, Integer.getInteger("solr.minContainerThreads", 6),
+ 500, -1, // no reserved executor threads - we can process requests after shutdown or some race - we try to limit without threadpool limits no anyway
null, null,
new SolrNamedThreadFactory(name));
this.name = name;
}
public SolrQueuedThreadPool(String name, int maxThreads, int minThreads, int idleTimeout) {
- this(maxThreads, minThreads,
+ this(name, maxThreads, minThreads,
idleTimeout, -1,
null, null,
new SolrNamedThreadFactory(name));
- this.name = name;
- }
-
- public SolrQueuedThreadPool(@Name("maxThreads") int maxThreads)
- {
- this(maxThreads, Math.min(8, maxThreads));
- }
-
- public SolrQueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads)
- {
- this(maxThreads, minThreads, 60000);
}
- public SolrQueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("queue") BlockingQueue<Runnable> queue)
+ public SolrQueuedThreadPool(String name, @Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue)
{
- this(maxThreads, minThreads, 60000, -1, queue, null);
+ this(name, maxThreads, minThreads, idleTimeout, queue, null);
}
- public SolrQueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout)
+ public SolrQueuedThreadPool(String name, @Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup)
{
- this(maxThreads, minThreads, idleTimeout, null);
+ this(name, maxThreads, minThreads, idleTimeout, -1, queue, threadGroup);
}
- public SolrQueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue)
- {
- this(maxThreads, minThreads, idleTimeout, queue, null);
- }
-
- public SolrQueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup)
- {
- this(maxThreads, minThreads, idleTimeout, -1, queue, threadGroup);
- }
-
- public SolrQueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads,
+ private SolrQueuedThreadPool(String name, @Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads,
@Name("idleTimeout") int idleTimeout, @Name("reservedThreads") int reservedThreads,
@Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup)
{
- this(maxThreads, minThreads, idleTimeout, reservedThreads, queue, threadGroup, null);
+ this(name, maxThreads, minThreads, idleTimeout, reservedThreads, queue, threadGroup, null);
}
- public SolrQueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads,
+ private SolrQueuedThreadPool(String name, @Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads,
@Name("idleTimeout") int idleTimeout, @Name("reservedThreads") int reservedThreads,
@Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup,
@Name("threadFactory") ThreadFactory threadFactory)
{
if (maxThreads < minThreads)
throw new IllegalArgumentException("max threads (" + maxThreads + ") less than min threads (" + minThreads + ")");
+ this.name = name;
setMinThreads(minThreads);
setMaxThreads(maxThreads);
setIdleTimeout(idleTimeout);
@@ -163,8 +143,8 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
setStopTimeout(5000);
if (queue == null)
{
- int capacity = Math.max(_minThreads, 8) * 1024;
- queue = new BlockingArrayQueue<>(capacity, capacity);
+ int capacity = 128;
+ queue = new BlockingArrayQueue<>(capacity, 0);
}
_jobs = queue;
_threadGroup = threadGroup;
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
index ee6da6c..e7b52d9 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
@@ -290,8 +290,8 @@ public class SolrTestCase extends LuceneTestCase {
// unlimited - System.setProperty("solr.maxContainerThreads", "300");
System.setProperty("solr.lowContainerThreadsThreshold", "-1");
- System.setProperty("solr.minContainerThreads", "8");
- System.setProperty("solr.minHttp2ClientThreads", "8");
+ System.setProperty("solr.minContainerThreads", "4");
+ System.setProperty("solr.minHttp2ClientThreads", "4");
ScheduledTriggers.DEFAULT_COOLDOWN_PERIOD_SECONDS = 1;