You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/10/04 17:14:26 UTC

[lucene-solr] branch reference_impl_dev updated: @940 Separate http client for recovery (shorter connect timeout, out of update/search queue(s)/path), some other tweaks.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new 9c5b81c  @940 Separate http client for recovery (shorter connect timeout, out of update/search queue(s)/path), some other tweaks.
9c5b81c is described below

commit 9c5b81c56b0581678332cd13b10aa94e6f729349
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sun Oct 4 11:58:36 2020 -0500

    @940 Separate http client for recovery (shorter connect timeout, out of update/search queue(s)/path), some other tweaks.
---
 .../org/apache/solr/cloud/RecoveryStrategy.java    | 72 ++++++----------------
 .../java/org/apache/solr/core/CoreContainer.java   |  4 +-
 .../org/apache/solr/update/UpdateShardHandler.java | 22 ++++++-
 .../solr/client/solrj/impl/Http2SolrClient.java    |  7 +--
 .../solr/client/solrj/impl/LBSolrClient.java       |  6 +-
 5 files changed, 48 insertions(+), 63 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index a6ec9a1..a302256 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -16,19 +16,6 @@
  */
 package org.apache.solr.cloud;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.store.Directory;
@@ -53,7 +40,6 @@ import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
@@ -70,7 +56,6 @@ import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.update.PeerSyncWithLeader;
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.update.UpdateLog.RecoveryInfo;
-import org.apache.solr.update.UpdateShardHandlerConfig;
 import org.apache.solr.update.processor.DistributedUpdateProcessor;
 import org.apache.solr.util.RefCounted;
 import org.apache.solr.util.SolrPluginUtils;
@@ -78,6 +63,19 @@ import org.apache.solr.util.plugin.NamedListInitializedPlugin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * This class may change in future and customisations are not supported between versions in terms of API or back compat
  * behaviour.
@@ -325,7 +323,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
   final private void commitOnLeader(String leaderUrl) throws SolrServerException,
       IOException {
-    Http2SolrClient client = core.getCoreContainer().getUpdateShardHandler().getTheSharedHttpClient();
+    Http2SolrClient client = core.getCoreContainer().getUpdateShardHandler().getRecoveryOnlyClient();
     UpdateRequest ureq = new UpdateRequest();
     ureq.setBasePath(leaderUrl);
     ureq.setParams(new ModifiableSolrParams());
@@ -629,7 +627,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
                                                                                             // though
       try {
         CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
-        final Replica leader = pingLeader(ourUrl, this.coreDescriptor, true);
+        final Replica leader = getLeader(ourUrl, this.coreDescriptor, true);
         if (isClosed()) {
           log.info("RecoveryStrategy has been closed");
           break;
@@ -842,15 +840,13 @@ public class RecoveryStrategy implements Runnable, Closeable {
     log.info("Finished recovery process, successful=[{}]", successfulRecovery);
   }
 
-  private final Replica pingLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown)
+  private final Replica getLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown)
       throws Exception {
     int numTried = 0;
     while (true) {
       CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
       DocCollection docCollection = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName());
-      if (!isClosed() && mayPutReplicaAsDown && numTried == 1 &&
-          docCollection.getReplica(coreDesc.getCloudDescriptor().getCoreNodeName())
-              .getState() == Replica.State.ACTIVE) {
+      if (!isClosed() && mayPutReplicaAsDown && numTried == 1 && docCollection.getReplica(coreDesc.getCloudDescriptor().getCoreNodeName()).getState() == Replica.State.ACTIVE) {
         // this operation may take a long time, by putting replica into DOWN state, client won't query this replica
         //zkController.publish(coreDesc, Replica.State.DOWN);
         // We should be in recovery and ignored by queries
@@ -865,10 +861,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
       Replica leaderReplica = null;
 
-
       try {
-        leaderReplica = zkStateReader.getLeaderRetry(
-            cloudDesc.getCollectionName(), cloudDesc.getShardId(), 5000);
+        leaderReplica = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 5000);
       } catch (SolrException e) {
         Thread.sleep(500);
         log.info("Could not find leader, looping again ...", e);
@@ -876,39 +870,11 @@ public class RecoveryStrategy implements Runnable, Closeable {
       }
 
       return leaderReplica;
-//      try {
-//        try (Http2SolrClient httpSolrClient = buildRecoverySolrClient(leaderReplica.getCoreUrl())) {
-//          SolrPing req = new SolrPing();
-//          req.setBasePath(leaderReplica.getCoreUrl());
-//          SolrPingResponse resp = req.process(httpSolrClient, null);
-//          return leaderReplica;
-//        }
-//      } catch (IOException e) {
-//        // let the recovery throttle handle pauses
-//        log.error("Failed to connect leader {} on recovery, try again", leaderReplica.getCoreUrl(), e);
-//      } catch (Exception e) {
-//        ParWork.propagateInterrupt("Failed to connect leader " + leaderReplica.getCoreUrl() + " on recovery, try again", e);
-//        if (e.getCause() instanceof IOException) {
-//          log.error("Failed to connect leader {} on recovery, try again", leaderReplica.getCoreUrl());
-//        } else {
-//          throw new SolrException(ErrorCode.SERVER_ERROR, e);
-//        }
-//      }
     }
   }
 
   public static Runnable testing_beforeReplayBufferingUpdates;
 
-    /** Builds a new HttpSolrClient for use in recovery.  Caller must close */
-    private final Http2SolrClient buildRecoverySolrClient(final String leaderUrl) {
-      return new Http2SolrClient.Builder(leaderUrl)
-          .connectionTimeout(2000)
-          .idleTimeout(5000)
-          .withHttpClient(cc.getUpdateShardHandler().getTheSharedHttpClient())
-          .markInternalRequest().build();
-    }
-
-
     final private Future<RecoveryInfo> replay(SolrCore core)
       throws InterruptedException, ExecutionException {
     if (testing_beforeReplayBufferingUpdates != null) {
@@ -996,7 +962,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
     int readTimeout = conflictWaitMs + Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "30000"));
     // nocommit
     try (Http2SolrClient client = new Http2SolrClient.Builder(leaderBaseUrl).withHttpClient(core.getCoreContainer().getUpdateShardHandler().
-        getTheSharedHttpClient()).idleTimeout(readTimeout).connectionTimeout(3000).markInternalRequest().build()) {
+        getRecoveryOnlyClient()).idleTimeout(readTimeout).connectionTimeout(3000).markInternalRequest().build()) {
       prepCmd.setBasePath(leaderBaseUrl);
       log.info("Sending prep recovery command to [{}]; [{}]", leaderBaseUrl, prepCmd);
       Cancellable result = client.asyncRequest(prepCmd, null, new NamedListAsyncListener());
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 4b29f02..a325e61 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -394,8 +394,8 @@ public class CoreContainer implements Closeable {
     containerProperties.putAll(cfg.getSolrProperties());
 
 
-    solrCoreLoadExecutor = new PerThreadExecService(ParWork.getRootSharedExecutor(), Math.max(12, Runtime.getRuntime().availableProcessors() / 2),
-        true, true);
+    solrCoreLoadExecutor = new PerThreadExecService(ParWork.getRootSharedExecutor(), Math.max(32, Runtime.getRuntime().availableProcessors()),
+        false, false);
 //    if (solrCoreLoadExecutor == null) {
 //      synchronized (CoreContainer.class) {
 //        if (solrCoreLoadExecutor == null) {
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
index 9fb84e0..4f6612f 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
@@ -58,6 +58,9 @@ public class UpdateShardHandler implements SolrInfoBean {
 
   private final Http2SolrClient updateOnlyClient;
 
+  private final Http2SolrClient recoveryOnlyClient;
+
+
   private final CloseableHttpClient defaultClient;
 
   private ExecutorService recoveryExecutor;
@@ -105,7 +108,8 @@ public class UpdateShardHandler implements SolrInfoBean {
           .connectionTimeout(cfg.getDistributedConnectionTimeout())
           .idleTimeout(cfg.getDistributedSocketTimeout());
     }
-    updateOnlyClient = updateOnlyClientBuilder.markInternalRequest().maxThreadPoolSize(ParWork.PROC_COUNT).maxRequestsQueuedPerDestination(16000).build();
+    updateOnlyClient = updateOnlyClientBuilder.markInternalRequest().maxThreadPoolSize(ParWork.PROC_COUNT)
+        .maxRequestsQueuedPerDestination(16000).strictEventOrdering(true).build();
     updateOnlyClient.enableCloseLock();
    // updateOnlyClient.addListenerFactory(updateHttpListenerFactory);
     Set<String> queryParams = new HashSet<>(2);
@@ -113,6 +117,15 @@ public class UpdateShardHandler implements SolrInfoBean {
     queryParams.add(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM);
     updateOnlyClient.setQueryParams(queryParams);
 
+
+    Http2SolrClient.Builder recoveryOnlyClientBuilder = new Http2SolrClient.Builder();
+    if (cfg != null) {
+      recoveryOnlyClientBuilder.connectionTimeout(5000).idleTimeout(30000);
+    }
+
+    recoveryOnlyClient = recoveryOnlyClientBuilder.markInternalRequest().maxThreadPoolSize(ParWork.PROC_COUNT).build();
+    recoveryOnlyClient.enableCloseLock();
+
 //    ThreadFactory recoveryThreadFactory = new SolrNamedThreadFactory("recoveryExecutor");
 //    if (cfg != null && cfg.getMaxRecoveryThreads() > 0) {
 //      if (log.isDebugEnabled()) {
@@ -191,6 +204,10 @@ public class UpdateShardHandler implements SolrInfoBean {
     return updateOnlyClient;
   }
 
+  public Http2SolrClient getRecoveryOnlyClient() {
+    return recoveryOnlyClient;
+  }
+
   public PoolingHttpClientConnectionManager getDefaultConnectionManager() {
     return defaultConnectionManager;
   }
@@ -206,13 +223,14 @@ public class UpdateShardHandler implements SolrInfoBean {
   public void close() {
     assert closeTracker.close();
     if (updateOnlyClient != null) updateOnlyClient.disableCloseLock();
+    if (recoveryOnlyClient != null) recoveryOnlyClient.disableCloseLock();
     try (ParWork closer = new ParWork(this, true)) {
       closer.collect("", () -> {
         HttpClientUtil.close(defaultClient);
         return defaultClient;
       });
       closer.addCollect();
-
+      closer.collect(recoveryOnlyClient);
       closer.collect(updateOnlyClient);
       closer.collect(defaultConnectionManager);
       closer.collect("SolrInfoBean", () -> {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 2f2bdb9..aea6ce9 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -164,9 +164,9 @@ public class Http2SolrClient extends SolrClient {
       }
       this.serverBaseUrl = serverBaseUrl;
     }
-    Integer moar = -1;
+    Integer moar = 1000;
     if (builder.maxOutstandingAsyncRequests != null) moar = builder.maxOutstandingAsyncRequests;
-    asyncTracker = new AsyncTracker(-1); // nocommit
+    asyncTracker = new AsyncTracker(moar); // nocommit
     this.headers = builder.headers;
     this.strictEventOrdering = builder.strictEventOrdering;
 
@@ -542,8 +542,7 @@ public class Http2SolrClient extends SolrClient {
   }
 
   private void decorateRequest(Request req, SolrRequest solrRequest) {
-    req.header(HttpHeader.ACCEPT_ENCODING, null);
-    req.timeout(idleTimeout, TimeUnit.MILLISECONDS);
+    req.header(HttpHeader.ACCEPT_ENCODING, null).idleTimeout(idleTimeout, TimeUnit.MILLISECONDS);
     if (solrRequest.getUserPrincipal() != null) {
       req.attribute(REQ_PRINCIPAL_KEY, solrRequest.getUserPrincipal());
     }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
index fbe6448..4f4dae6 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
@@ -473,7 +473,7 @@ public abstract class LBSolrClient extends SolrClient {
       synchronized (this) {
         if (aliveCheckExecutor == null) {
           aliveCheckExecutor = new ScheduledThreadPoolExecutor(1,
-              new SolrNamedThreadFactory("aliveCheckExecutor"));
+              new SolrNamedThreadFactory("aliveCheckExecutor", true));
           aliveCheckExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
           aliveCheckExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
           aliveCheckExecutor.scheduleAtFixedRate(
@@ -749,7 +749,9 @@ public abstract class LBSolrClient extends SolrClient {
   @Override
   public void close() {
     this.closed = true;
-    ExecutorUtil.shutdownAndAwaitTermination(aliveCheckExecutor);
+    if (aliveCheckExecutor != null) {
+      aliveCheckExecutor.shutdown();
+    }
     assert ObjectReleaseTracker.release(this);
   }
 }