You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2020/04/30 08:52:34 UTC

[lucene-solr] branch jira/SOLR-14354 updated: SOLR-14354: Async or using threads in better way for HttpShardHandler

This is an automated email from the ASF dual-hosted git repository.

datcm pushed a commit to branch jira/SOLR-14354
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/jira/SOLR-14354 by this push:
     new 4d6894b  SOLR-14354: Async or using threads in better way for HttpShardHandler
     new 7d06eea  Merge branch 'jira/SOLR-14354' of github.com:apache/lucene-solr into jira/SOLR-14354
4d6894b is described below

commit 4d6894bca3b477dde2cbfdf68007784136e7c360
Author: Cao Manh Dat <da...@apache.org>
AuthorDate: Thu Apr 30 15:49:50 2020 +0700

    SOLR-14354: Async or using threads in better way for HttpShardHandler
---
 .../solr/handler/component/HttpShardHandler.java   | 158 +++++++++----------
 .../handler/component/HttpShardHandlerFactory.java |  22 +--
 .../solr/handler/component/QueryComponent.java     |   2 +-
 .../apache/solr/update/MockingHttp2SolrClient.java |  20 +--
 .../solr/client/solrj/impl/Http2SolrClient.java    | 170 +++++++++++++--------
 .../solr/client/solrj/impl/LBHttp2SolrClient.java  | 142 +++++++++++++++++
 .../solr/client/solrj/impl/LBSolrClient.java       |  98 +++++++++++-
 .../apache/solr/client/solrj/util/Cancellable.java |  22 +++
 .../solr/client/solrj/impl/LBSolrClientTest.java   |  93 +++++++++++
 9 files changed, 543 insertions(+), 184 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
index f23cf16..1fcb317 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
@@ -17,17 +17,14 @@
 package org.apache.solr.handler.component;
 
 import java.io.IOException;
-import java.net.ConnectException;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import io.opentracing.Span;
 import io.opentracing.Tracer;
@@ -36,9 +33,11 @@ import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.Http2SolrClient;
+import org.apache.solr.client.solrj.impl.LBHttp2SolrClient;
 import org.apache.solr.client.solrj.impl.LBSolrClient;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
+import org.apache.solr.client.solrj.util.Cancellable;
 import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
@@ -54,7 +53,6 @@ import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.request.SolrRequestInfo;
 import org.apache.solr.util.tracing.GlobalTracer;
 import org.apache.solr.util.tracing.SolrRequestCarrier;
-import org.slf4j.MDC;
 
 public class HttpShardHandler extends ShardHandler {
   /**
@@ -64,18 +62,23 @@ public class HttpShardHandler extends ShardHandler {
    * by the RealtimeGet handler, since other types of replicas shouldn't respond to RTG requests
    */
   public static String ONLY_NRT_REPLICAS = "distribOnlyRealtime";
+  private static final ShardResponse END_QUEUE = new ShardResponse();
 
   private HttpShardHandlerFactory httpShardHandlerFactory;
-  private CompletionService<ShardResponse> completionService;
-  private Set<Future<ShardResponse>> pending;
+  private LinkedList<Cancellable> requests;
+  private BlockingQueue<ShardResponse> responses;
+  private AtomicInteger pending;
   private Map<String, List<String>> shardToURLs;
   private Http2SolrClient httpClient;
+  private LBHttp2SolrClient lbClient;
 
   public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory, Http2SolrClient httpClient) {
     this.httpClient = httpClient;
     this.httpShardHandlerFactory = httpShardHandlerFactory;
-    completionService = httpShardHandlerFactory.newCompletionService();
-    pending = new HashSet<>();
+    this.lbClient = httpShardHandlerFactory.loadbalancer;
+    this.pending = new AtomicInteger(0);
+    this.responses = new LinkedBlockingQueue<>();
+    this.requests = new LinkedList<>();
 
     // maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574")
     // This is primarily to keep track of what order we should use to query the replicas of a shard
@@ -130,77 +133,64 @@ public class HttpShardHandler extends ShardHandler {
     final Tracer tracer = GlobalTracer.getTracer();
     final Span span = tracer != null ? tracer.activeSpan() : null;
 
-    Callable<ShardResponse> task = () -> {
+    params.remove(CommonParams.WT); // use default (currently javabin)
+    params.remove(CommonParams.VERSION);
+    QueryRequest req = makeQueryRequest(sreq, params, shard);
+    req.setMethod(SolrRequest.METHOD.POST);
 
-      ShardResponse srsp = new ShardResponse();
-      if (sreq.nodeName != null) {
-        srsp.setNodeName(sreq.nodeName);
-      }
-      srsp.setShardRequest(sreq);
-      srsp.setShard(shard);
-      SimpleSolrResponse ssr = new SimpleSolrResponse();
-      srsp.setSolrResponse(ssr);
-      long startTime = System.nanoTime();
+    LBSolrClient.Req lbReq = httpShardHandlerFactory.newLBHttpSolrClientReq(req, urls);
+
+    ShardResponse srsp = new ShardResponse();
+    if (sreq.nodeName != null) {
+      srsp.setNodeName(sreq.nodeName);
+    }
+    srsp.setShardRequest(sreq);
+    srsp.setShard(shard);
+    SimpleSolrResponse ssr = new SimpleSolrResponse();
+    srsp.setSolrResponse(ssr);
+
+    pending.incrementAndGet();
+    // if there are no shards available for a slice, urls.size()==0
+    if (urls.size() == 0) {
+      // TODO: what's the right error code here? We should use the same thing when
+      // all of the servers for a shard are down.
+      SolrException exception = new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
+      srsp.setException(exception);
+      srsp.setResponseCode(exception.code());
+      responses.add(srsp);
+      return;
+    }
 
-      try {
-        params.remove(CommonParams.WT); // use default (currently javabin)
-        params.remove(CommonParams.VERSION);
+    requests.add(this.lbClient.asyncReq(lbReq, new LBHttp2SolrClient.OnComplete() {
+      long startTime = System.nanoTime();
 
-        QueryRequest req = makeQueryRequest(sreq, params, shard);
+      @Override
+      public void onStart() {
         if (tracer != null && span != null) {
           tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new SolrRequestCarrier(req));
         }
-        req.setMethod(SolrRequest.METHOD.POST);
         SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
         if (requestInfo != null) req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
-
-        // no need to set the response parser as binary is the defaultJab
-        // req.setResponseParser(new BinaryResponseParser());
-
-        // if there are no shards available for a slice, urls.size()==0
-        if (urls.size() == 0) {
-          // TODO: what's the right error code here? We should use the same thing when
-          // all of the servers for a shard are down.
-          throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
-        }
-
-        if (urls.size() <= 1) {
-          String url = urls.get(0);
-          srsp.setShardAddress(url);
-          ssr.nl = request(url, req);
-        } else {
-          LBSolrClient.Rsp rsp = httpShardHandlerFactory.makeLoadBalancedRequest(req, urls);
-          ssr.nl = rsp.getResponse();
-          srsp.setShardAddress(rsp.getServer());
-        }
-      } catch (ConnectException cex) {
-        srsp.setException(cex); //????
-      } catch (Exception th) {
-        srsp.setException(th);
-        if (th instanceof SolrException) {
-          srsp.setResponseCode(((SolrException) th).code());
-        } else {
-          srsp.setResponseCode(-1);
-        }
       }
 
-      ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
-
-      return transfomResponse(sreq, srsp, shard);
-    };
-
-    try {
-      if (shard != null) {
-        MDC.put("ShardRequest.shards", shard);
+      @Override
+      public void onComplete(LBSolrClient.Rsp rsp) {
+        ssr.nl = rsp.getResponse();
+        srsp.setShardAddress(rsp.getServer());
+        ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+        responses.add(srsp);
       }
-      if (urls != null && !urls.isEmpty()) {
-        MDC.put("ShardRequest.urlList", urls.toString());
+
+      @Override
+      public void onError(Throwable throwable) {
+        ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+        srsp.setException(throwable);
+        if (throwable instanceof SolrException) {
+          srsp.setResponseCode(((SolrException) throwable).code());
+        }
+        responses.add(srsp);
       }
-      pending.add(completionService.submit(task));
-    } finally {
-      MDC.remove("ShardRequest.shards");
-      MDC.remove("ShardRequest.urlList");
-    }
+    }));
   }
 
   protected NamedList<Object> request(String url, SolrRequest req) throws IOException, SolrServerException {
@@ -243,12 +233,13 @@ public class HttpShardHandler extends ShardHandler {
   }
 
   private ShardResponse take(boolean bailOnError) {
+    try {
+      while (pending.get() > 0) {
+        ShardResponse rsp = responses.take();
+        if (rsp == END_QUEUE)
+          continue;
 
-    while (pending.size() > 0) {
-      try {
-        Future<ShardResponse> future = completionService.take();
-        pending.remove(future);
-        ShardResponse rsp = future.get();
+        pending.decrementAndGet();
         if (bailOnError && rsp.getException() != null) return rsp; // if exception, return immediately
         // add response to the response list... we do this after the take() and
         // not after the completion of "call" so we know when the last response
@@ -258,13 +249,9 @@ public class HttpShardHandler extends ShardHandler {
         if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {
           return rsp;
         }
-      } catch (InterruptedException e) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-      } catch (ExecutionException e) {
-        // should be impossible... the problem with catching the exception
-        // at this level is we don't know what ShardRequest it applied to
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception", e);
       }
+    } catch (InterruptedException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
     }
     return null;
   }
@@ -272,9 +259,12 @@ public class HttpShardHandler extends ShardHandler {
 
   @Override
   public void cancelAll() {
-    for (Future<ShardResponse> future : pending) {
-      future.cancel(false);
+    for (Cancellable cancellable : requests) {
+      cancellable.cancel();
+      pending.decrementAndGet();
     }
+    // ensure that we do not hang on responses.take()
+    responses.add(END_QUEUE);
   }
 
   @Override
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
index 1617dcb..83af91d 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
@@ -96,7 +96,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
 
   protected volatile Http2SolrClient defaultClient;
   protected InstrumentedHttpListenerFactory httpListenerFactory;
-  private LBHttp2SolrClient loadbalancer;
+  protected LBHttp2SolrClient loadbalancer;
 
   int corePoolSize = 0;
   int maximumPoolSize = Integer.MAX_VALUE;
@@ -316,6 +316,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
     this.defaultClient = new Http2SolrClient.Builder()
         .connectionTimeout(connectionTimeout)
         .idleTimeout(soTimeout)
+        .withExecutor(commExecutor)
         .maxConnectionsPerHost(maxConnectionsPerHost).build();
     this.defaultClient.addListenerFactory(this.httpListenerFactory);
     this.loadbalancer = new LBHttp2SolrClient(defaultClient);
@@ -368,18 +369,6 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
     return solrMetricsContext;
   }
 
-  /**
-   * Makes a request to one or more of the given urls, using the configured load balancer.
-   *
-   * @param req The solr search request that should be sent through the load balancer
-   * @param urls The list of solr server urls to load balance across
-   * @return The response from the request
-   */
-  public LBSolrClient.Rsp makeLoadBalancedRequest(final QueryRequest req, List<String> urls)
-    throws SolrServerException, IOException {
-    return loadbalancer.request(newLBHttpSolrClientReq(req, urls));
-  }
-
   protected LBSolrClient.Req newLBHttpSolrClientReq(final QueryRequest req, List<String> urls) {
     int numServersToTry = (int)Math.floor(urls.size() * this.permittedLoadBalancerRequestsMaximumFraction);
     if (numServersToTry < this.permittedLoadBalancerRequestsMinimumAbsolute) {
@@ -425,13 +414,6 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
   }
 
   /**
-   * Creates a new completion service for use by a single set of distributed requests.
-   */
-  public CompletionService<ShardResponse> newCompletionService() {
-    return new ExecutorCompletionService<>(commExecutor);
-  }
-
-  /**
    * Rebuilds the URL replacing the URL scheme of the passed URL with the
    * configured scheme replacement.If no scheme was configured, the passed URL's
    * scheme is left alone.
diff --git a/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java b/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java
index b03997a..debc41b 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java
@@ -853,7 +853,7 @@ public class QueryComponent extends SearchComponent
           if (srsp.getException() != null) {
             Throwable t = srsp.getException();
             if(t instanceof SolrServerException) {
-              t = ((SolrServerException)t).getCause();
+              t = t.getCause();
             }
             nl.add("error", t.toString() );
             StringWriter trace = new StringWriter();
diff --git a/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java b/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java
index c9aded7..1a69511 100644
--- a/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java
+++ b/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java
@@ -28,6 +28,7 @@ import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.Http2SolrClient;
 import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.util.Cancellable;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.util.NamedList;
 
@@ -118,20 +119,20 @@ public class MockingHttp2SolrClient extends Http2SolrClient {
     return super.request(request, collection);
   }
 
-  public NamedList<Object> request(SolrRequest request, String collection, OnComplete onComplete)
-      throws SolrServerException, IOException {
+  @Override
+  public Cancellable asyncRequest(SolrRequest request, String collection, OnComplete onComplete) {
     if (request instanceof UpdateRequest) {
       UpdateRequest ur = (UpdateRequest) request;
       // won't throw exception if request is DBQ
       if (ur.getDeleteQuery() != null && !ur.getDeleteQuery().isEmpty()) {
-        return super.request(request, collection, onComplete);
+        return super.asyncRequest(request, collection, onComplete);
       }
     }
 
     if (exp != null) {
       if (oneExpPerReq) {
         if (reqGotException.contains(request)) {
-          return super.request(request, collection, onComplete);
+          return super.asyncRequest(request, collection, onComplete);
         }
         else
           reqGotException.add(request);
@@ -140,17 +141,12 @@ public class MockingHttp2SolrClient extends Http2SolrClient {
       Exception e = exception();
       if (e instanceof IOException) {
         if (LuceneTestCase.random().nextBoolean()) {
-          throw (IOException) e;
-        } else {
-          throw new SolrServerException(e);
+          e = new SolrServerException(e);
         }
-      } else if (e instanceof SolrServerException) {
-        throw (SolrServerException) e;
-      } else {
-        throw new SolrServerException(e);
       }
+      onComplete.onFailure(e);
     }
 
-    return super.request(request, collection, onComplete);
+    return super.asyncRequest(request, collection, onComplete);
   }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 6a08816..4fbcd9f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -34,27 +34,35 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.Phaser;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.http.HttpStatus;
 import org.apache.http.entity.ContentType;
 import org.apache.solr.client.solrj.ResponseParser;
 import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.V2RequestSupport;
 import org.apache.solr.client.solrj.embedded.SSLConfig;
+import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.request.RequestWriter;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.response.DelegationTokenResponse;
+import org.apache.solr.client.solrj.util.Cancellable;
 import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.client.solrj.util.Constants;
 import org.apache.solr.common.SolrException;
@@ -133,6 +141,7 @@ public class Http2SolrClient extends SolrClient {
    */
   private String serverBaseUrl;
   private boolean closeClient;
+  private ExecutorService executor;
 
   protected Http2SolrClient(String serverBaseUrl, Builder builder) {
     if (serverBaseUrl != null)  {
@@ -176,8 +185,14 @@ public class Http2SolrClient extends SolrClient {
     HttpClient httpClient;
 
     BlockingArrayQueue<Runnable> queue = new BlockingArrayQueue<>(256, 256);
-    ThreadPoolExecutor httpClientExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(32,
-        256, 60, TimeUnit.SECONDS, queue, new SolrNamedThreadFactory("h2sc"));
+    Executor executor = builder.executor;
+    if (executor == null) {
+      this.executor = new ExecutorUtil.MDCAwareThreadPoolExecutor(32,
+          256, 60, TimeUnit.SECONDS, queue, new SolrNamedThreadFactory("h2sc"));
+      executor = this.executor;
+    }
+    this.executor = Objects.requireNonNullElseGet(builder.executor, () -> new ExecutorUtil.MDCAwareThreadPoolExecutor(32,
+        256, 60, TimeUnit.SECONDS, queue, new SolrNamedThreadFactory("h2sc")));
 
     SslContextFactory.Client sslContextFactory;
     boolean ssl;
@@ -208,7 +223,7 @@ public class Http2SolrClient extends SolrClient {
       httpClient.setMaxConnectionsPerDestination(4);
     }
 
-    httpClient.setExecutor(httpClientExecutor);
+    httpClient.setExecutor(this.executor);
     httpClient.setStrictEventOrdering(false);
     httpClient.setConnectBlocking(true);
     httpClient.setFollowRedirects(false);
@@ -230,7 +245,6 @@ public class Http2SolrClient extends SolrClient {
     asyncTracker.waitForComplete();
     if (closeClient) {
       try {
-        ExecutorService executor = (ExecutorService) httpClient.getExecutor();
         httpClient.setStopTimeout(1000);
         httpClient.stop();
         ExecutorUtil.shutdownAndAwaitTermination(executor);
@@ -359,65 +373,95 @@ public class Http2SolrClient extends SolrClient {
     outStream.flush();
   }
 
-  public NamedList<Object> request(SolrRequest solrRequest,
-                                      String collection,
-                                      OnComplete onComplete) throws IOException, SolrServerException {
-    Request req = makeRequest(solrRequest, collection);
+  private static final Exception CANCELLED_EXCEPTION = new Exception();
+
+  public Cancellable asyncRequest(SolrRequest solrRequest, String collection, OnComplete onComplete) {
+    Request req;
+    try {
+      req = makeRequest(solrRequest, collection);
+    } catch (SolrServerException | IOException e) {
+      onComplete.onFailure(e);
+      return () -> {};
+    }
     final ResponseParser parser = solrRequest.getResponseParser() == null
         ? this.parser: solrRequest.getResponseParser();
-
-    if (onComplete != null) {
-      // This async call only suitable for indexing since the response size is limited by 5MB
-      req.onRequestQueued(asyncTracker.queuedListener)
-          .onComplete(asyncTracker.completeListener).send(new BufferingResponseListener(5 * 1024 * 1024) {
-
-        @Override
-        public void onComplete(Result result) {
-          if (result.isFailed()) {
-            onComplete.onFailure(result.getFailure());
-            return;
+    req.onRequestQueued(asyncTracker.queuedListener)
+        .onComplete(asyncTracker.completeListener)
+        .send(new InputStreamResponseListener() {
+          @Override
+          public void onHeaders(Response response) {
+            super.onHeaders(response);
+            InputStreamResponseListener listener = this;
+            executor.execute(() -> {
+              InputStream is = listener.getInputStream();
+              assert ObjectReleaseTracker.track(is);
+              try {
+                NamedList<Object> body = processErrorsAndResponse(response, parser, is, getEncoding(response), isV2ApiRequest(solrRequest));
+                onComplete.onSuccess(body);
+              } catch (RemoteSolrException e) {
+                if (SolrException.getRootCause(e) != CANCELLED_EXCEPTION) {
+                  onComplete.onFailure(e);
+                }
+              } catch (SolrServerException e) {
+                onComplete.onFailure(e);
+              }
+            });
           }
 
-          NamedList<Object> rsp;
-          try {
-            InputStream is = getContentAsInputStream();
-            assert ObjectReleaseTracker.track(is);
-            rsp = processErrorsAndResponse(result.getResponse(),
-                parser, is, getEncoding(), isV2ApiRequest(solrRequest));
-            onComplete.onSuccess(rsp);
-          } catch (Exception e) {
-            onComplete.onFailure(e);
+          @Override
+          public void onFailure(Response response, Throwable failure) {
+            super.onFailure(response, failure);
+            if (failure != CANCELLED_EXCEPTION) {
+              onComplete.onFailure(createException(req, failure));
+            }
           }
-        }
-      });
-      return null;
-    } else {
-      try {
-        InputStreamResponseListener listener = new InputStreamResponseListener();
-        req.send(listener);
-        Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS);
-        InputStream is = listener.getInputStream();
-        assert ObjectReleaseTracker.track(is);
-        return processErrorsAndResponse(response, parser, is, getEncoding(response), isV2ApiRequest(solrRequest));
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(e);
-      } catch (TimeoutException e) {
-        throw new SolrServerException(
-            "Timeout occured while waiting response from server at: " + req.getURI(), e);
-      } catch (ExecutionException e) {
-        Throwable cause = e.getCause();
-        if (cause instanceof ConnectException) {
-          throw new SolrServerException("Server refused connection at: " + req.getURI(), cause);
-        }
-        if (cause instanceof SolrServerException) {
-          throw (SolrServerException) cause;
-        } else if (cause instanceof IOException) {
-          throw new SolrServerException(
-              "IOException occured when talking to server at: " + getBaseURL(), cause);
-        }
-        throw new SolrServerException(cause.getMessage(), cause);
+        });
+    return () -> req.abort(CANCELLED_EXCEPTION);
+  }
+
+  @Override
+  public NamedList<Object> request(SolrRequest solrRequest, String collection) throws SolrServerException, IOException {
+    Request req = makeRequest(solrRequest, collection);
+    final ResponseParser parser = solrRequest.getResponseParser() == null
+        ? this.parser: solrRequest.getResponseParser();
+
+    try {
+      InputStreamResponseListener listener = new InputStreamResponseListener();
+      req.send(listener);
+      Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS);
+      InputStream is = listener.getInputStream();
+      assert ObjectReleaseTracker.track(is);
+      return processErrorsAndResponse(response, parser, is, getEncoding(response), isV2ApiRequest(solrRequest));
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (Throwable e) {
+      throw createException(req, e);
+    }
+  }
+
+  private SolrServerException createException(Request req, Throwable throwable) {
+    try {
+      throw throwable;
+    } catch (TimeoutException e) {
+      return new SolrServerException(
+          "Timeout occured while waiting response from server at: " + req.getURI(), e);
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof ConnectException) {
+        return new SolrServerException("Server refused connection at: " + req.getURI(), cause);
+      }
+      if (cause instanceof SolrServerException) {
+        return (SolrServerException) cause;
+      } else if (cause instanceof IOException) {
+        return new SolrServerException(
+            "IOException occured when talking to server at: " + getBaseURL(), cause);
       }
+      return new SolrServerException(cause.getMessage(), cause);
+    } catch (Throwable e) {
+      return new SolrServerException(e.getMessage(), e);
     }
   }
 
@@ -459,6 +503,7 @@ public class Http2SolrClient extends SolrClient {
 
   private void decorateRequest(Request req, SolrRequest solrRequest) {
     req.header(HttpHeader.ACCEPT_ENCODING, null);
+    req.timeout(idleTimeout, TimeUnit.MILLISECONDS);
     if (solrRequest.getUserPrincipal() != null) {
       req.attribute(REQ_PRINCIPAL_KEY, solrRequest.getUserPrincipal());
     }
@@ -748,11 +793,6 @@ public class Http2SolrClient extends SolrClient {
     }
   }
 
-  @Override
-  public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
-    return request(request, collection, null);
-  }
-
   public void setRequestWriter(RequestWriter requestWriter) {
     this.requestWriter = requestWriter;
   }
@@ -819,6 +859,7 @@ public class Http2SolrClient extends SolrClient {
     private Integer maxConnectionsPerHost;
     private boolean useHttp1_1 = Boolean.getBoolean("solr.http1");
     protected String baseSolrUrl;
+    private ExecutorService executor;
 
     public Builder() {
 
@@ -840,6 +881,11 @@ public class Http2SolrClient extends SolrClient {
       return this;
     }
 
+    public Builder withExecutor(ExecutorService executor) {
+      this.executor = executor;
+      return this;
+    }
+
     public Builder withSSLConfig(SSLConfig sslConfig) {
       this.sslConfig = sslConfig;
       return this;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
index 293a264..19280e0 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
@@ -16,9 +16,23 @@
  */
 package org.apache.solr.client.solrj.impl;
 
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
 import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.IsUpdateRequest;
+import org.apache.solr.client.solrj.util.Cancellable;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.NamedList;
+import org.slf4j.MDC;
+
+import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
 
 /**
  * LBHttp2SolrClient or "LoadBalanced LBHttp2SolrClient" is a load balancing wrapper around
@@ -66,4 +80,132 @@ public class LBHttp2SolrClient extends LBSolrClient {
   protected SolrClient getClient(String baseUrl) {
     return httpClient;
   }
+
+  public interface OnComplete {
+    void onStart();
+    void onComplete(Rsp rsp);
+    void onError(Throwable throwable);
+  }
+
+  public Cancellable asyncReq(Req req, OnComplete onComplete) {
+    Rsp rsp = new Rsp();
+    boolean isNonRetryable = req.request instanceof IsUpdateRequest || ADMIN_PATHS.contains(req.request.getPath());
+    ServerIterator it = new ServerIterator(req, zombieServers);
+    onComplete.onStart();
+    final AtomicBoolean cancelled = new AtomicBoolean(false);
+    AtomicReference<Cancellable> currentCancellable = new AtomicReference<>();
+    RetryListener retryListener = new RetryListener() {
+
+      @Override
+      public void onSuccess(Rsp rsp) {
+        onComplete.onComplete(rsp);
+      }
+
+      @Override
+      public void onFailure(Throwable e, boolean retryReq) {
+        if (retryReq) {
+          String url;
+          try {
+            url = it.nextOrError();
+          } catch (SolrServerException ex) {
+            onComplete.onError(e);
+            return;
+          }
+          try {
+            MDC.put("LBSolrClient.url", url);
+            synchronized (cancelled) {
+              if (cancelled.get()) {
+                return;
+              }
+              Cancellable cancellable = doRequest(url, req, rsp, isNonRetryable, it.isServingZombieServer(), this);
+              currentCancellable.set(cancellable);
+            }
+          } finally {
+            MDC.remove("LBSolrClient.url");
+          }
+        } else {
+          onComplete.onError(e);
+        }
+      }
+    };
+    try {
+      Cancellable cancellable = doRequest(it.nextOrError(), req, rsp, isNonRetryable, it.isServingZombieServer(), retryListener);
+      currentCancellable.set(cancellable);
+    } catch (SolrServerException e) {
+      onComplete.onError(e);
+    }
+    return () -> {
+      synchronized (cancelled) {
+        cancelled.set(true);
+        if (currentCancellable.get() != null) {
+          currentCancellable.get().cancel();
+        }
+      }
+    };
+  }
+
+  private interface RetryListener {
+    void onSuccess(Rsp rsp);
+    void onFailure(Throwable e, boolean retryReq);
+  }
+
+  private Cancellable doRequest(String baseUrl, Req req, Rsp rsp, boolean isNonRetryable,
+                         boolean isZombie, RetryListener listener) {
+    rsp.server = baseUrl;
+    req.getRequest().setBasePath(baseUrl);
+    return ((Http2SolrClient)getClient(baseUrl)).asyncRequest(req.getRequest(), null, new Http2SolrClient.OnComplete() {
+      @Override
+      public void onSuccess(NamedList<Object> result) {
+        rsp.rsp = result;
+        if (isZombie) {
+          zombieServers.remove(baseUrl);
+        }
+        listener.onSuccess(rsp);
+      }
+
+      @Override
+      public void onFailure(Throwable oe) {
+        try {
+          throw (Exception) oe;
+        } catch (BaseHttpSolrClient.RemoteExecutionException e){
+          listener.onFailure(e, false);
+        } catch(SolrException e) {
+          // we retry on 404 or 403 or 503 or 500
+          // unless it's an update - then we only retry on connect exception
+          if (!isNonRetryable && RETRY_CODES.contains(e.code())) {
+            listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
+          } else {
+            // Server is alive but the request was likely malformed or invalid
+            if (isZombie) {
+              zombieServers.remove(baseUrl);
+            }
+            listener.onFailure(e, false);
+          }
+        } catch (SocketException e) {
+          if (!isNonRetryable || e instanceof ConnectException) {
+            listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
+          } else {
+            listener.onFailure(e, false);
+          }
+        } catch (SocketTimeoutException e) {
+          if (!isNonRetryable) {
+            listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
+          } else {
+            listener.onFailure(e, false);
+          }
+        } catch (SolrServerException e) {
+          Throwable rootCause = e.getRootCause();
+          if (!isNonRetryable && rootCause instanceof IOException) {
+            listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
+          } else if (isNonRetryable && rootCause instanceof ConnectException) {
+            listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
+          } else {
+            listener.onFailure(e, false);
+          }
+        } catch (Exception e) {
+          listener.onFailure(new SolrServerException(e), false);
+        }
+      }
+    });
+  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
index 176f07d..6d5bf8c 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
@@ -24,10 +24,12 @@ import java.net.MalformedURLException;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.net.URL;
+import java.rmi.Remote;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -37,6 +39,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 import org.apache.solr.client.solrj.ResponseParser;
 import org.apache.solr.client.solrj.SolrClient;
@@ -60,7 +63,7 @@ import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
 public abstract class LBSolrClient extends SolrClient {
 
   // defaults
-  private static final Set<Integer> RETRY_CODES = new HashSet<>(Arrays.asList(404, 403, 503, 500));
+  protected static final Set<Integer> RETRY_CODES = new HashSet<>(Arrays.asList(404, 403, 503, 500));
   private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks
   private static final int NONSTANDARD_PING_LIMIT = 5;  // number of times we'll ping dead servers not in the server list
 
@@ -69,7 +72,7 @@ public abstract class LBSolrClient extends SolrClient {
   private final Map<String, ServerWrapper> aliveServers = new LinkedHashMap<>();
   // access to aliveServers should be synchronized on itself
 
-  private final Map<String, ServerWrapper> zombieServers = new ConcurrentHashMap<>();
+  protected final Map<String, ServerWrapper> zombieServers = new ConcurrentHashMap<>();
 
   // changes to aliveServers are reflected in this array, no need to synchronize
   private volatile ServerWrapper[] aliveServerList = new ServerWrapper[0];
@@ -136,6 +139,91 @@ public abstract class LBSolrClient extends SolrClient {
     }
   }
 
+  protected static class ServerIterator {
+    String serverStr;
+    List<String> skipped;
+    int numServersTried;
+    Iterator<String> it;
+    Iterator<String> skippedIt;
+    String exceptionMessage;
+    long timeAllowedNano;
+    long timeOutTime;
+
+    final Map<String, ServerWrapper> zombieServers;
+    final Req req;
+
+    public ServerIterator(Req req, Map<String, ServerWrapper> zombieServers) {
+      this.it = req.getServers().iterator();
+      this.req = req;
+      this.zombieServers = zombieServers;
+      this.timeAllowedNano = getTimeAllowedInNanos(req.getRequest());
+      this.timeOutTime = System.nanoTime() + timeAllowedNano;
+      fetchNext();
+    }
+
+    public synchronized boolean hasNext() {
+      return serverStr != null;
+    }
+
+    private void fetchNext() {
+      serverStr = null;
+      if (req.numServersToTry != null && numServersTried > req.numServersToTry) {
+        exceptionMessage = "Time allowed to handle this request exceeded";
+        return;
+      }
+
+      while (it.hasNext()) {
+        serverStr = it.next();
+        serverStr = normalize(serverStr);
+        // if the server is currently a zombie, just skip to the next one
+        ServerWrapper wrapper = zombieServers.get(serverStr);
+        if (wrapper != null) {
+          final int numDeadServersToTry = req.getNumDeadServersToTry();
+          if (numDeadServersToTry > 0) {
+            if (skipped == null) {
+              skipped = new ArrayList<>(numDeadServersToTry);
+              skipped.add(wrapper.getBaseUrl());
+            } else if (skipped.size() < numDeadServersToTry) {
+              skipped.add(wrapper.getBaseUrl());
+            }
+          }
+          continue;
+        }
+
+        break;
+      }
+      if (serverStr == null && skipped != null) {
+        if (skippedIt == null) {
+          skippedIt = skipped.iterator();
+        }
+        if (skippedIt.hasNext()) {
+          serverStr = skippedIt.next();
+        }
+      }
+    }
+
+    boolean isServingZombieServer() {
+      return skippedIt != null;
+    }
+
+    public synchronized String nextOrError() throws SolrServerException {
+      if (isTimeExceeded(timeAllowedNano, timeOutTime)) {
+        throw new SolrServerException("Time allowed to handle this request exceeded");
+      }
+      if (serverStr == null) {
+        throw new SolrServerException("No live SolrServers available to handle this request");
+      }
+      numServersTried++;
+      if (req.getNumServersToTry() != null && numServersTried > req.getNumServersToTry()) {
+        throw new SolrServerException("No live SolrServers available to handle this request:"
+            + " numServersTried="+numServersTried
+            + " numServersToTry="+req.getNumServersToTry());
+      }
+      String rs = serverStr;
+      fetchNext();
+      return rs;
+    }
+  }
 
   public static class Req {
     protected SolrRequest request;
@@ -349,13 +437,13 @@ public abstract class LBSolrClient extends SolrClient {
   /**
    * @return time allowed in nanos, returns -1 if no time_allowed is specified.
    */
-  private long getTimeAllowedInNanos(final SolrRequest req) {
+  private static long getTimeAllowedInNanos(final SolrRequest req) {
     SolrParams reqParams = req.getParams();
     return reqParams == null ? -1 :
         TimeUnit.NANOSECONDS.convert(reqParams.getInt(CommonParams.TIME_ALLOWED, -1), TimeUnit.MILLISECONDS);
   }
 
-  private boolean isTimeExceeded(long timeAllowedNano, long timeOutTime) {
+  private static boolean isTimeExceeded(long timeAllowedNano, long timeOutTime) {
     return timeAllowedNano > 0 && System.nanoTime() > timeOutTime;
   }
 
@@ -413,7 +501,7 @@ public abstract class LBSolrClient extends SolrClient {
 
   protected abstract SolrClient getClient(String baseUrl);
 
-  private Exception addZombie(String serverStr, Exception e) {
+  protected Exception addZombie(String serverStr, Exception e) {
     ServerWrapper wrapper = createServerWrapper(serverStr);
     wrapper.standard = false;
     zombieServers.put(serverStr, wrapper);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java b/solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java
new file mode 100644
index 0000000..323916a
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.util;
+
+public interface Cancellable {
+  void cancel();
+}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/LBSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/LBSolrClientTest.java
new file mode 100644
index 0000000..1455294
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/LBSolrClientTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.RoaringDocIdSet;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class LBSolrClientTest {
+
+  @Test
+  public void testServerIterator() throws SolrServerException {
+    LBSolrClient.Req req = new LBSolrClient.Req(new QueryRequest(), Arrays.asList("1", "2", "3", "4"));
+    LBSolrClient.ServerIterator serverIterator = new LBSolrClient.ServerIterator(req, new HashMap<>());
+    List<String> actualServers = new ArrayList<>();
+    while (serverIterator.hasNext()) {
+      actualServers.add(serverIterator.nextOrError());
+    }
+    assertEquals(Arrays.asList("1", "2", "3", "4"), actualServers);
+    assertFalse(serverIterator.hasNext());
+    LuceneTestCase.expectThrows(SolrServerException.class, serverIterator::nextOrError);
+  }
+
+  @Test
+  public void testServerIteratorWithZombieServers() throws SolrServerException {
+    HashMap<String, LBSolrClient.ServerWrapper> zombieServers = new HashMap<>();
+    LBSolrClient.Req req = new LBSolrClient.Req(new QueryRequest(), Arrays.asList("1", "2", "3", "4"));
+    LBSolrClient.ServerIterator serverIterator = new LBSolrClient.ServerIterator(req, zombieServers);
+    zombieServers.put("2", new LBSolrClient.ServerWrapper("2"));
+
+    assertTrue(serverIterator.hasNext());
+    assertEquals("1", serverIterator.nextOrError());
+    assertTrue(serverIterator.hasNext());
+    assertEquals("3", serverIterator.nextOrError());
+    assertTrue(serverIterator.hasNext());
+    assertEquals("4", serverIterator.nextOrError());
+    assertTrue(serverIterator.hasNext());
+    assertEquals("2", serverIterator.nextOrError());
+  }
+
+  @Test
+  public void testServerIteratorTimeAllowed() throws SolrServerException, InterruptedException {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(CommonParams.TIME_ALLOWED, 300);
+    LBSolrClient.Req req = new LBSolrClient.Req(new QueryRequest(params), Arrays.asList("1", "2", "3", "4"), 2);
+    LBSolrClient.ServerIterator serverIterator = new LBSolrClient.ServerIterator(req, new HashMap<>());
+    assertTrue(serverIterator.hasNext());
+    serverIterator.nextOrError();
+    Thread.sleep(300);
+    LuceneTestCase.expectThrows(SolrServerException.class, serverIterator::nextOrError);
+  }
+
+  @Test
+  public void testServerIteratorMaxRetry() throws SolrServerException {
+    LBSolrClient.Req req = new LBSolrClient.Req(new QueryRequest(), Arrays.asList("1", "2", "3", "4"), 2);
+    LBSolrClient.ServerIterator serverIterator = new LBSolrClient.ServerIterator(req, new HashMap<>());
+    assertTrue(serverIterator.hasNext());
+    serverIterator.nextOrError();
+    assertTrue(serverIterator.hasNext());
+    serverIterator.nextOrError();
+    LuceneTestCase.expectThrows(SolrServerException.class, serverIterator::nextOrError);
+  }
+}