You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/10/04 17:14:26 UTC
[lucene-solr] branch reference_impl_dev updated: @940 Separate http
client for recovery (shorter connect timeout,
out of update/search queue(s)/path), some other tweaks.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new 9c5b81c @940 Separate http client for recovery (shorter connect timeout, out of update/search queue(s)/path), some other tweaks.
9c5b81c is described below
commit 9c5b81c56b0581678332cd13b10aa94e6f729349
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sun Oct 4 11:58:36 2020 -0500
@940 Separate http client for recovery (shorter connect timeout, out of update/search queue(s)/path), some other tweaks.
---
.../org/apache/solr/cloud/RecoveryStrategy.java | 72 ++++++----------------
.../java/org/apache/solr/core/CoreContainer.java | 4 +-
.../org/apache/solr/update/UpdateShardHandler.java | 22 ++++++-
.../solr/client/solrj/impl/Http2SolrClient.java | 7 +--
.../solr/client/solrj/impl/LBSolrClient.java | 6 +-
5 files changed, 48 insertions(+), 63 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index a6ec9a1..a302256 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -16,19 +16,6 @@
*/
package org.apache.solr.cloud;
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory;
@@ -53,7 +40,6 @@ import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
@@ -70,7 +56,6 @@ import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.PeerSyncWithLeader;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.UpdateLog.RecoveryInfo;
-import org.apache.solr.update.UpdateShardHandlerConfig;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.SolrPluginUtils;
@@ -78,6 +63,19 @@ import org.apache.solr.util.plugin.NamedListInitializedPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* This class may change in future and customisations are not supported between versions in terms of API or back compat
* behaviour.
@@ -325,7 +323,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
final private void commitOnLeader(String leaderUrl) throws SolrServerException,
IOException {
- Http2SolrClient client = core.getCoreContainer().getUpdateShardHandler().getTheSharedHttpClient();
+ Http2SolrClient client = core.getCoreContainer().getUpdateShardHandler().getRecoveryOnlyClient();
UpdateRequest ureq = new UpdateRequest();
ureq.setBasePath(leaderUrl);
ureq.setParams(new ModifiableSolrParams());
@@ -629,7 +627,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
// though
try {
CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
- final Replica leader = pingLeader(ourUrl, this.coreDescriptor, true);
+ final Replica leader = getLeader(ourUrl, this.coreDescriptor, true);
if (isClosed()) {
log.info("RecoveryStrategy has been closed");
break;
@@ -842,15 +840,13 @@ public class RecoveryStrategy implements Runnable, Closeable {
log.info("Finished recovery process, successful=[{}]", successfulRecovery);
}
- private final Replica pingLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown)
+ private final Replica getLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown)
throws Exception {
int numTried = 0;
while (true) {
CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
DocCollection docCollection = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName());
- if (!isClosed() && mayPutReplicaAsDown && numTried == 1 &&
- docCollection.getReplica(coreDesc.getCloudDescriptor().getCoreNodeName())
- .getState() == Replica.State.ACTIVE) {
+ if (!isClosed() && mayPutReplicaAsDown && numTried == 1 && docCollection.getReplica(coreDesc.getCloudDescriptor().getCoreNodeName()).getState() == Replica.State.ACTIVE) {
// this operation may take a long time, by putting replica into DOWN state, client won't query this replica
//zkController.publish(coreDesc, Replica.State.DOWN);
// We should be in recovery and ignored by queries
@@ -865,10 +861,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
Replica leaderReplica = null;
-
try {
- leaderReplica = zkStateReader.getLeaderRetry(
- cloudDesc.getCollectionName(), cloudDesc.getShardId(), 5000);
+ leaderReplica = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 5000);
} catch (SolrException e) {
Thread.sleep(500);
log.info("Could not find leader, looping again ...", e);
@@ -876,39 +870,11 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
return leaderReplica;
-// try {
-// try (Http2SolrClient httpSolrClient = buildRecoverySolrClient(leaderReplica.getCoreUrl())) {
-// SolrPing req = new SolrPing();
-// req.setBasePath(leaderReplica.getCoreUrl());
-// SolrPingResponse resp = req.process(httpSolrClient, null);
-// return leaderReplica;
-// }
-// } catch (IOException e) {
-// // let the recovery throttle handle pauses
-// log.error("Failed to connect leader {} on recovery, try again", leaderReplica.getCoreUrl(), e);
-// } catch (Exception e) {
-// ParWork.propagateInterrupt("Failed to connect leader " + leaderReplica.getCoreUrl() + " on recovery, try again", e);
-// if (e.getCause() instanceof IOException) {
-// log.error("Failed to connect leader {} on recovery, try again", leaderReplica.getCoreUrl());
-// } else {
-// throw new SolrException(ErrorCode.SERVER_ERROR, e);
-// }
-// }
}
}
public static Runnable testing_beforeReplayBufferingUpdates;
- /** Builds a new HttpSolrClient for use in recovery. Caller must close */
- private final Http2SolrClient buildRecoverySolrClient(final String leaderUrl) {
- return new Http2SolrClient.Builder(leaderUrl)
- .connectionTimeout(2000)
- .idleTimeout(5000)
- .withHttpClient(cc.getUpdateShardHandler().getTheSharedHttpClient())
- .markInternalRequest().build();
- }
-
-
final private Future<RecoveryInfo> replay(SolrCore core)
throws InterruptedException, ExecutionException {
if (testing_beforeReplayBufferingUpdates != null) {
@@ -996,7 +962,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
int readTimeout = conflictWaitMs + Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "30000"));
// nocommit
try (Http2SolrClient client = new Http2SolrClient.Builder(leaderBaseUrl).withHttpClient(core.getCoreContainer().getUpdateShardHandler().
- getTheSharedHttpClient()).idleTimeout(readTimeout).connectionTimeout(3000).markInternalRequest().build()) {
+ getRecoveryOnlyClient()).idleTimeout(readTimeout).connectionTimeout(3000).markInternalRequest().build()) {
prepCmd.setBasePath(leaderBaseUrl);
log.info("Sending prep recovery command to [{}]; [{}]", leaderBaseUrl, prepCmd);
Cancellable result = client.asyncRequest(prepCmd, null, new NamedListAsyncListener());
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 4b29f02..a325e61 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -394,8 +394,8 @@ public class CoreContainer implements Closeable {
containerProperties.putAll(cfg.getSolrProperties());
- solrCoreLoadExecutor = new PerThreadExecService(ParWork.getRootSharedExecutor(), Math.max(12, Runtime.getRuntime().availableProcessors() / 2),
- true, true);
+ solrCoreLoadExecutor = new PerThreadExecService(ParWork.getRootSharedExecutor(), Math.max(32, Runtime.getRuntime().availableProcessors()),
+ false, false);
// if (solrCoreLoadExecutor == null) {
// synchronized (CoreContainer.class) {
// if (solrCoreLoadExecutor == null) {
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
index 9fb84e0..4f6612f 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
@@ -58,6 +58,9 @@ public class UpdateShardHandler implements SolrInfoBean {
private final Http2SolrClient updateOnlyClient;
+ private final Http2SolrClient recoveryOnlyClient;
+
+
private final CloseableHttpClient defaultClient;
private ExecutorService recoveryExecutor;
@@ -105,7 +108,8 @@ public class UpdateShardHandler implements SolrInfoBean {
.connectionTimeout(cfg.getDistributedConnectionTimeout())
.idleTimeout(cfg.getDistributedSocketTimeout());
}
- updateOnlyClient = updateOnlyClientBuilder.markInternalRequest().maxThreadPoolSize(ParWork.PROC_COUNT).maxRequestsQueuedPerDestination(16000).build();
+ updateOnlyClient = updateOnlyClientBuilder.markInternalRequest().maxThreadPoolSize(ParWork.PROC_COUNT)
+ .maxRequestsQueuedPerDestination(16000).strictEventOrdering(true).build();
updateOnlyClient.enableCloseLock();
// updateOnlyClient.addListenerFactory(updateHttpListenerFactory);
Set<String> queryParams = new HashSet<>(2);
@@ -113,6 +117,15 @@ public class UpdateShardHandler implements SolrInfoBean {
queryParams.add(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM);
updateOnlyClient.setQueryParams(queryParams);
+
+ Http2SolrClient.Builder recoveryOnlyClientBuilder = new Http2SolrClient.Builder();
+ if (cfg != null) {
+ recoveryOnlyClientBuilder.connectionTimeout(5000).idleTimeout(30000);
+ }
+
+ recoveryOnlyClient = recoveryOnlyClientBuilder.markInternalRequest().maxThreadPoolSize(ParWork.PROC_COUNT).build();
+ recoveryOnlyClient.enableCloseLock();
+
// ThreadFactory recoveryThreadFactory = new SolrNamedThreadFactory("recoveryExecutor");
// if (cfg != null && cfg.getMaxRecoveryThreads() > 0) {
// if (log.isDebugEnabled()) {
@@ -191,6 +204,10 @@ public class UpdateShardHandler implements SolrInfoBean {
return updateOnlyClient;
}
+ public Http2SolrClient getRecoveryOnlyClient() {
+ return recoveryOnlyClient;
+ }
+
public PoolingHttpClientConnectionManager getDefaultConnectionManager() {
return defaultConnectionManager;
}
@@ -206,13 +223,14 @@ public class UpdateShardHandler implements SolrInfoBean {
public void close() {
assert closeTracker.close();
if (updateOnlyClient != null) updateOnlyClient.disableCloseLock();
+ if (recoveryOnlyClient != null) recoveryOnlyClient.disableCloseLock();
try (ParWork closer = new ParWork(this, true)) {
closer.collect("", () -> {
HttpClientUtil.close(defaultClient);
return defaultClient;
});
closer.addCollect();
-
+ closer.collect(recoveryOnlyClient);
closer.collect(updateOnlyClient);
closer.collect(defaultConnectionManager);
closer.collect("SolrInfoBean", () -> {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 2f2bdb9..aea6ce9 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -164,9 +164,9 @@ public class Http2SolrClient extends SolrClient {
}
this.serverBaseUrl = serverBaseUrl;
}
- Integer moar = -1;
+ Integer moar = 1000;
if (builder.maxOutstandingAsyncRequests != null) moar = builder.maxOutstandingAsyncRequests;
- asyncTracker = new AsyncTracker(-1); // nocommit
+ asyncTracker = new AsyncTracker(moar); // nocommit
this.headers = builder.headers;
this.strictEventOrdering = builder.strictEventOrdering;
@@ -542,8 +542,7 @@ public class Http2SolrClient extends SolrClient {
}
private void decorateRequest(Request req, SolrRequest solrRequest) {
- req.header(HttpHeader.ACCEPT_ENCODING, null);
- req.timeout(idleTimeout, TimeUnit.MILLISECONDS);
+ req.header(HttpHeader.ACCEPT_ENCODING, null).idleTimeout(idleTimeout, TimeUnit.MILLISECONDS);
if (solrRequest.getUserPrincipal() != null) {
req.attribute(REQ_PRINCIPAL_KEY, solrRequest.getUserPrincipal());
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
index fbe6448..4f4dae6 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
@@ -473,7 +473,7 @@ public abstract class LBSolrClient extends SolrClient {
synchronized (this) {
if (aliveCheckExecutor == null) {
aliveCheckExecutor = new ScheduledThreadPoolExecutor(1,
- new SolrNamedThreadFactory("aliveCheckExecutor"));
+ new SolrNamedThreadFactory("aliveCheckExecutor", true));
aliveCheckExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
aliveCheckExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
aliveCheckExecutor.scheduleAtFixedRate(
@@ -749,7 +749,9 @@ public abstract class LBSolrClient extends SolrClient {
@Override
public void close() {
this.closed = true;
- ExecutorUtil.shutdownAndAwaitTermination(aliveCheckExecutor);
+ if (aliveCheckExecutor != null) {
+ aliveCheckExecutor.shutdown();
+ }
assert ObjectReleaseTracker.release(this);
}
}