You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2019/03/04 09:41:50 UTC

[lucene-solr] branch master updated: SOLR-13276: Adding Http2 equivalent classes of CloudSolrClient and HttpClusterStateProvider

This is an automated email from the ASF dual-hosted git repository.

datcm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new e99934b  SOLR-13276: Adding Http2 equivalent classes of CloudSolrClient and HttpClusterStateProvider
e99934b is described below

commit e99934b240cbb197331edc5e1889203e60b3d5d9
Author: Cao Manh Dat <da...@apache.org>
AuthorDate: Mon Mar 4 09:41:45 2019 +0000

    SOLR-13276: Adding Http2 equivalent classes of CloudSolrClient and HttpClusterStateProvider
---
 solr/CHANGES.txt                                   |    2 +
 ...oudSolrClient.java => BaseCloudSolrClient.java} |  640 +++--------
 ...ider.java => BaseHttpClusterStateProvider.java} |   83 +-
 .../solr/client/solrj/impl/BaseHttpSolrClient.java |   80 ++
 .../client/solrj/impl/CloudHttp2SolrClient.java    |  237 ++++
 .../solr/client/solrj/impl/CloudSolrClient.java    | 1186 +-------------------
 .../solrj/impl/Http2ClusterStateProvider.java      |   46 +
 .../solr/client/solrj/impl/Http2SolrClient.java    |   83 +-
 .../solrj/impl/HttpClusterStateProvider.java       |  301 +----
 .../solr/client/solrj/impl/HttpSolrClient.java     |   52 +-
 .../solr/client/solrj/request/UpdateRequest.java   |   76 +-
 .../impl/CloudHttp2SolrClientBadInputTest.java     |   73 ++
 .../impl/CloudHttp2SolrClientBuilderTest.java      |   84 ++
 .../CloudHttp2SolrClientMultiConstructorTest.java  |   85 ++
 .../solrj/impl/CloudHttp2SolrClientRetryTest.java  |   83 ++
 .../solrj/impl/CloudHttp2SolrClientTest.java       |  978 ++++++++++++++++
 .../solrj/impl/CloudSolrClientCacheTest.java       |    2 +-
 .../src/java/org/apache/solr/SolrTestCaseJ4.java   |   56 +
 18 files changed, 2015 insertions(+), 2132 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 04fecb9..bd04156 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -278,6 +278,8 @@ New Features
 
 * SOLR-13241: Add 'autoscaling' tool support to solr.cmd (Jason Gerlowski)
 
+* SOLR-13276: Adding Http2 equivalent classes of CloudSolrClient and HttpClusterStateProvider (Cao Manh Dat)
+
 Bug Fixes
 ----------------------
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
similarity index 68%
copy from solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
copy to solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
index 37cdba7..7ae1e02 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.solr.client.solrj.impl;
 
 import java.io.IOException;
@@ -30,7 +31,6 @@ import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -42,10 +42,8 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
 
-import org.apache.http.NoHttpResponseException;
-import org.apache.http.client.HttpClient;
-import org.apache.http.conn.ConnectTimeoutException;
 import org.apache.solr.client.solrj.ResponseParser;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
@@ -58,10 +56,9 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.request.V2Request;
 import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.ToleratedUpdateError;
-import org.apache.solr.common.cloud.ClusterState.CollectionRef;
+import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.CollectionStatePredicate;
 import org.apache.solr.common.cloud.CollectionStateWatcher;
 import org.apache.solr.common.cloud.DocCollection;
@@ -90,34 +87,18 @@ import org.slf4j.MDC;
 import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
 import static org.apache.solr.common.params.CommonParams.ID;
 
-/**
- * SolrJ client class to communicate with SolrCloud.
- * Instances of this class communicate with Zookeeper to discover
- * Solr endpoints for SolrCloud collections, and then use the 
- * {@link LBHttpSolrClient} to issue requests.
- * 
- * This class assumes the id field for your documents is called
- * 'id' - if this is not the case, you must set the right name
- * with {@link #setIdField(String)}.
- */
-@SuppressWarnings("serial")
-public class CloudSolrClient extends SolrClient {
+public abstract class BaseCloudSolrClient extends SolrClient {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private final ClusterStateProvider stateProvider;
   private volatile String defaultCollection;
-  private final LBHttpSolrClient lbClient;
-  private final boolean shutdownLBHttpSolrServer;
-  private HttpClient myClient;
-  private final boolean clientIsInternal;
   //no of times collection state to be reloaded if stale state error is received
   private static final int MAX_STALE_RETRIES = Integer.parseInt(System.getProperty("cloudSolrClientMaxStaleRetries", "5"));
-  Random rand = new Random();
-  
+  private Random rand = new Random();
+
   private final boolean updatesToLeaders;
   private final boolean directUpdatesToLeadersOnly;
-  private boolean parallelUpdates; //TODO final
+  boolean parallelUpdates; //TODO final
   private ExecutorService threadPool = ExecutorUtil
       .newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory(
           "CloudSolrClient ThreadPool"));
@@ -132,11 +113,11 @@ public class CloudSolrClient extends SolrClient {
     NON_ROUTABLE_PARAMS.add(UpdateParams.COMMIT);
     NON_ROUTABLE_PARAMS.add(UpdateParams.WAIT_SEARCHER);
     NON_ROUTABLE_PARAMS.add(UpdateParams.OPEN_SEARCHER);
-    
+
     NON_ROUTABLE_PARAMS.add(UpdateParams.SOFT_COMMIT);
     NON_ROUTABLE_PARAMS.add(UpdateParams.PREPARE_COMMIT);
     NON_ROUTABLE_PARAMS.add(UpdateParams.OPTIMIZE);
-    
+
     // Not supported via SolrCloud
     // NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
 
@@ -148,7 +129,7 @@ public class CloudSolrClient extends SolrClient {
     final AtomicLong puts = new AtomicLong();
     final AtomicLong hits = new AtomicLong();
     final Lock evictLock = new ReentrantLock(true);
-    private volatile long timeToLive = 60 * 1000L;
+    protected volatile long timeToLive = 60 * 1000L;
 
     @Override
     public ExpiringCachedDocCollection get(Object key) {
@@ -198,14 +179,6 @@ public class CloudSolrClient extends SolrClient {
     this.retryExpiryTime = TimeUnit.NANOSECONDS.convert(secs, TimeUnit.SECONDS);
   }
 
-  /**
-   * @deprecated since 7.0  Use {@link Builder} methods instead. 
-   */
-  @Deprecated
-  public void setSoTimeout(int timeout) {
-    lbClient.setSoTimeout(timeout);
-  }
-
   protected final StateCache collectionStateCache = new StateCache();
 
   class ExpiringCachedDocCollection {
@@ -241,60 +214,11 @@ public class CloudSolrClient extends SolrClient {
       retriedAt = System.nanoTime();
     }
   }
-  
-  /**
-   * Create a new client object that connects to Zookeeper and is always aware
-   * of the SolrCloud state. If there is a fully redundant Zookeeper quorum and
-   * SolrCloud has enough replicas for every shard in a collection, there is no
-   * single point of failure. Updates will be sent to shard leaders by default.
-   *
-   * @param builder a {@link CloudSolrClient.Builder} with the options used to create the client.
-   */
-  protected CloudSolrClient(Builder builder) {
-    if (builder.stateProvider == null) {
-      if (builder.zkHosts != null && builder.solrUrls != null) {
-        throw new IllegalArgumentException("Both zkHost(s) & solrUrl(s) have been specified. Only specify one.");
-      }
-      if (builder.zkHosts != null) {
-        this.stateProvider = new ZkClientClusterStateProvider(builder.zkHosts, builder.zkChroot);
-      } else if (builder.solrUrls != null && !builder.solrUrls.isEmpty()) {
-        try {
-          this.stateProvider = new HttpClusterStateProvider(builder.solrUrls, builder.httpClient);
-        } catch (Exception e) {
-          throw new RuntimeException("Couldn't initialize a HttpClusterStateProvider (is/are the "
-              + "Solr server(s), "  + builder.solrUrls + ", down?)", e);
-        }
-      } else {
-        throw new IllegalArgumentException("Both zkHosts and solrUrl cannot be null.");
-      }
-    } else {
-      this.stateProvider = builder.stateProvider;
-    }
-    this.clientIsInternal = builder.httpClient == null;
-    this.shutdownLBHttpSolrServer = builder.loadBalancedSolrClient == null;
-    if(builder.lbClientBuilder != null) {
-      propagateLBClientConfigOptions(builder);
-      builder.loadBalancedSolrClient = builder.lbClientBuilder.build();
-    }
-    if(builder.loadBalancedSolrClient != null) builder.httpClient = builder.loadBalancedSolrClient.getHttpClient();
-    this.myClient = (builder.httpClient == null) ? HttpClientUtil.createClient(null) : builder.httpClient;
-    if (builder.loadBalancedSolrClient == null) builder.loadBalancedSolrClient = createLBHttpSolrClient(builder, myClient);
-    this.lbClient = builder.loadBalancedSolrClient;
-    this.updatesToLeaders = builder.shardLeadersOnly;
-    this.parallelUpdates = builder.parallelUpdates;
-    this.directUpdatesToLeadersOnly = builder.directUpdatesToLeadersOnly;
-  }
-  
-  private void propagateLBClientConfigOptions(Builder builder) {
-    final LBHttpSolrClient.Builder lbBuilder = builder.lbClientBuilder;
-    
-    if (builder.connectionTimeoutMillis != null) {
-      lbBuilder.withConnectionTimeout(builder.connectionTimeoutMillis);
-    }
-    
-    if (builder.socketTimeoutMillis != null) {
-      lbBuilder.withSocketTimeout(builder.socketTimeoutMillis);
-    }
+
+  protected BaseCloudSolrClient(boolean updatesToLeaders, boolean parallelUpdates, boolean directUpdatesToLeadersOnly) {
+    this.updatesToLeaders = updatesToLeaders;
+    this.parallelUpdates = parallelUpdates;
+    this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly;
   }
 
   /** Sets the cache ttl for DocCollection Objects cached  . This is only applicable for collections which are persisted outside of clusterstate.json
@@ -305,28 +229,41 @@ public class CloudSolrClient extends SolrClient {
     this.collectionStateCache.timeToLive = seconds * 1000L;
   }
 
+  protected abstract LBSolrClient getLbClient();
+
+  public abstract ClusterStateProvider getClusterStateProvider();
+
+  protected abstract boolean wasCommError(Throwable t);
+
+  @Override
+  public void close() throws IOException {
+    if(this.threadPool != null && !this.threadPool.isShutdown()) {
+      this.threadPool.shutdown();
+    }
+  }
+
   public ResponseParser getParser() {
-    return lbClient.getParser();
+    return getLbClient().getParser();
   }
 
   /**
    * Note: This setter method is <b>not thread-safe</b>.
-   * 
+   *
    * @param processor
    *          Default Response Parser chosen to parse the response if the parser
    *          were not specified as part of the request.
    * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser()
    */
   public void setParser(ResponseParser processor) {
-    lbClient.setParser(processor);
+    getLbClient().setParser(processor);
   }
-  
+
   public RequestWriter getRequestWriter() {
-    return lbClient.getRequestWriter();
+    return getLbClient().getRequestWriter();
   }
-  
+
   public void setRequestWriter(RequestWriter requestWriter) {
-    lbClient.setRequestWriter(requestWriter);
+    getLbClient().setRequestWriter(requestWriter);
   }
 
   /**
@@ -337,9 +274,9 @@ public class CloudSolrClient extends SolrClient {
   }
 
   public ZkStateReader getZkStateReader() {
-    if (stateProvider instanceof ZkClientClusterStateProvider) {
-      ZkClientClusterStateProvider provider = (ZkClientClusterStateProvider) stateProvider;
-      stateProvider.connect();
+    if (getClusterStateProvider() instanceof ZkClientClusterStateProvider) {
+      ZkClientClusterStateProvider provider = (ZkClientClusterStateProvider) getClusterStateProvider();
+      getClusterStateProvider().connect();
       return provider.zkStateReader;
     }
     throw new IllegalStateException("This has no Zk stateReader");
@@ -358,7 +295,7 @@ public class CloudSolrClient extends SolrClient {
   public String getIdField() {
     return idField;
   }
-  
+
   /** Sets the default collection for request */
   public void setDefaultCollection(String collection) {
     this.defaultCollection = collection;
@@ -384,19 +321,12 @@ public class CloudSolrClient extends SolrClient {
     return parallelUpdates;
   }
 
-  /** @deprecated since 7.2  Use {@link Builder} methods instead. */
-  @Deprecated
-  public void setParallelUpdates(boolean parallelUpdates) {
-    this.parallelUpdates = parallelUpdates;
-  }
-
   /**
    * Connect to the zookeeper ensemble.
    * This is an optional method that may be used to force a connect before any other requests are sent.
-   *
    */
   public void connect() {
-    stateProvider.connect();
+    getClusterStateProvider().connect();
   }
 
   /**
@@ -407,12 +337,12 @@ public class CloudSolrClient extends SolrClient {
    * @throws InterruptedException if the wait is interrupted
    */
   public void connect(long duration, TimeUnit timeUnit) throws TimeoutException, InterruptedException {
-    log.info("Waiting for {} {} for cluster at {} to be ready", duration, timeUnit, stateProvider);
+    log.info("Waiting for {} {} for cluster at {} to be ready", duration, timeUnit, getClusterStateProvider());
     long timeout = System.nanoTime() + timeUnit.toNanos(duration);
     while (System.nanoTime() < timeout) {
       try {
         connect();
-        log.info("Cluster at {} ready", stateProvider);
+        log.info("Cluster at {} ready", getClusterStateProvider());
         return;
       }
       catch (RuntimeException e) {
@@ -424,8 +354,8 @@ public class CloudSolrClient extends SolrClient {
   }
 
   private ZkClientClusterStateProvider assertZKStateProvider() {
-    if (stateProvider instanceof ZkClientClusterStateProvider) {
-      return (ZkClientClusterStateProvider) stateProvider;
+    if (getClusterStateProvider() instanceof ZkClientClusterStateProvider) {
+      return (ZkClientClusterStateProvider) getClusterStateProvider();
     }
     throw new IllegalArgumentException("This client does not use ZK");
 
@@ -446,7 +376,7 @@ public class CloudSolrClient extends SolrClient {
    */
   public void waitForState(String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
       throws InterruptedException, TimeoutException {
-    stateProvider.connect();
+    getClusterStateProvider().connect();
     assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, predicate);
   }
 
@@ -461,7 +391,7 @@ public class CloudSolrClient extends SolrClient {
    * @param watcher    a watcher that will be called when the state changes
    */
   public void registerCollectionStateWatcher(String collection, CollectionStateWatcher watcher) {
-    stateProvider.connect();
+    getClusterStateProvider().connect();
     assertZKStateProvider().zkStateReader.registerCollectionStateWatcher(collection, watcher);
   }
 
@@ -484,13 +414,13 @@ public class CloudSolrClient extends SolrClient {
     }
 
     //Check to see if the collection is an alias.
-    List<String> aliasedCollections = stateProvider.resolveAlias(collection);
+    List<String> aliasedCollections = getClusterStateProvider().resolveAlias(collection);
     collection = aliasedCollections.get(0); // pick 1st (consistent with HttpSolrCall behavior)
 
     DocCollection col = getDocCollection(collection, null);
 
     DocRouter router = col.getRouter();
-    
+
     if (router instanceof ImplicitDocRouter) {
       // short circuit as optimization
       return null;
@@ -500,13 +430,13 @@ public class CloudSolrClient extends SolrClient {
     //The value is a list of URLs for each replica in the slice.
     //The first value in the list is the leader for the slice.
     final Map<String,List<String>> urlMap = buildUrlMap(col);
-    final Map<String, LBHttpSolrClient.Req> routes = (urlMap == null ? null : updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField));
+    final Map<String, ? extends LBSolrClient.Req> routes = createRoutes(updateRequest, routableParams, col, router, urlMap, idField);
     if (routes == null) {
       if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(updateRequest, idField)) {
-          // we have info (documents with ids and/or ids to delete) with
-          // which to find the leaders but we could not find (all of) them
-          throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
-              "directUpdatesToLeadersOnly==true but could not find leader(s)");
+        // we have info (documents with ids and/or ids to delete) with
+        // which to find the leaders but we could not find (all of) them
+        throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+            "directUpdatesToLeadersOnly==true but could not find leader(s)");
       } else {
         // we could not find a leader or routes yet - use unoptimized general path
         return null;
@@ -520,12 +450,14 @@ public class CloudSolrClient extends SolrClient {
 
     if (parallelUpdates) {
       final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<>(routes.size());
-      for (final Map.Entry<String, LBHttpSolrClient.Req> entry : routes.entrySet()) {
+      for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : routes.entrySet()) {
         final String url = entry.getKey();
-        final LBHttpSolrClient.Req lbRequest = entry.getValue();
+        final LBSolrClient.Req lbRequest = entry.getValue();
         try {
           MDC.put("CloudSolrClient.url", url);
-          responseFutures.put(url, threadPool.submit(() -> lbClient.request(lbRequest).getResponse()));
+          responseFutures.put(url, threadPool.submit(() -> {
+            return getLbClient().request(lbRequest).getResponse();
+          }));
         } finally {
           MDC.remove("CloudSolrClient.url");
         }
@@ -548,17 +480,17 @@ public class CloudSolrClient extends SolrClient {
         Throwable firstException = exceptions.getVal(0);
         if(firstException instanceof SolrException) {
           SolrException e = (SolrException) firstException;
-          throw new RouteException(ErrorCode.getErrorCode(e.code()), exceptions, routes);
+          throw getRouteException(SolrException.ErrorCode.getErrorCode(e.code()), exceptions, routes);
         } else {
-          throw new RouteException(ErrorCode.SERVER_ERROR, exceptions, routes);
+          throw getRouteException(SolrException.ErrorCode.SERVER_ERROR, exceptions, routes);
         }
       }
     } else {
-      for (Map.Entry<String, LBHttpSolrClient.Req> entry : routes.entrySet()) {
+      for (Map.Entry<String, ? extends LBSolrClient.Req> entry : routes.entrySet()) {
         String url = entry.getKey();
-        LBHttpSolrClient.Req lbRequest = entry.getValue();
+        LBSolrClient.Req lbRequest = entry.getValue();
         try {
-          NamedList<Object> rsp = lbClient.request(lbRequest).getResponse();
+          NamedList<Object> rsp = getLbClient().request(lbRequest).getResponse();
           shardResponses.add(url, rsp);
         } catch (Exception e) {
           if(e instanceof SolrException) {
@@ -577,12 +509,12 @@ public class CloudSolrClient extends SolrClient {
       deleteQueryRequest.setDeleteQuery(deleteQuery);
       nonRoutableRequest = deleteQueryRequest;
     }
-    
+
     Set<String> paramNames = nonRoutableParams.getParameterNames();
-    
+
     Set<String> intersection = new HashSet<>(paramNames);
     intersection.retainAll(NON_ROUTABLE_PARAMS);
-    
+
     if (nonRoutableRequest != null || intersection.size() > 0) {
       if (nonRoutableRequest == null) {
         nonRoutableRequest = new UpdateRequest();
@@ -592,12 +524,12 @@ public class CloudSolrClient extends SolrClient {
       List<String> urlList = new ArrayList<>();
       urlList.addAll(routes.keySet());
       Collections.shuffle(urlList, rand);
-      LBHttpSolrClient.Req req = new LBHttpSolrClient.Req(nonRoutableRequest, urlList);
+      LBSolrClient.Req req = new LBSolrClient.Req(nonRoutableRequest, urlList);
       try {
-        LBHttpSolrClient.Rsp rsp = lbClient.request(req);
+        LBSolrClient.Rsp rsp = getLbClient().request(req);
         shardResponses.add(urlList.get(0), rsp.getResponse());
       } catch (Exception e) {
-        throw new SolrException(ErrorCode.SERVER_ERROR, urlList.get(0), e);
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, urlList.get(0), e);
       }
     }
 
@@ -609,6 +541,16 @@ public class CloudSolrClient extends SolrClient {
     return rr;
   }
 
+  protected RouteException getRouteException(SolrException.ErrorCode serverError, NamedList<Throwable> exceptions, Map<String, ? extends LBSolrClient.Req> routes) {
+    return new RouteException(serverError, exceptions, routes);
+  }
+
+  protected Map<String, ? extends LBSolrClient.Req> createRoutes(UpdateRequest updateRequest, ModifiableSolrParams routableParams,
+                                                       DocCollection col, DocRouter router, Map<String, List<String>> urlMap,
+                                                       String idField) {
+    return urlMap == null ? null : updateRequest.getRoutesToCollection(router, col, urlMap, routableParams, idField);
+  }
+
   private Map<String,List<String>> buildUrlMap(DocCollection col) {
     Map<String, List<String>> urlMap = new HashMap<>();
     Slice[] slices = col.getActiveSlicesArr();
@@ -649,14 +591,14 @@ public class CloudSolrClient extends SolrClient {
     return urlMap;
   }
 
-  public RouteResponse condenseResponse(NamedList response, int timeMillis) {
-    RouteResponse condensed = new RouteResponse();
+  protected <T extends RouteResponse> T condenseResponse(NamedList response, int timeMillis, Supplier<T> supplier) {
+    T condensed = supplier.get();
     int status = 0;
     Integer rf = null;
     Integer minRf = null;
-    
+
     // TolerantUpdateProcessor
-    List<SimpleOrderedMap<String>> toleratedErrors = null; 
+    List<SimpleOrderedMap<String>> toleratedErrors = null;
     int maxToleratedErrors = Integer.MAX_VALUE;
 
     // For "adds", "deletes", "deleteByQuery" etc.
@@ -664,11 +606,11 @@ public class CloudSolrClient extends SolrClient {
 
     for(int i=0; i<response.size(); i++) {
       NamedList shardResponse = (NamedList)response.getVal(i);
-      NamedList header = (NamedList)shardResponse.get("responseHeader");      
+      NamedList header = (NamedList)shardResponse.get("responseHeader");
       Integer shardStatus = (Integer)header.get("status");
       int s = shardStatus.intValue();
       if(s > 0) {
-          status = s;
+        status = s;
       }
       Object rfObj = header.get(UpdateRequest.REPFACT);
       if (rfObj != null && rfObj instanceof Integer) {
@@ -678,16 +620,16 @@ public class CloudSolrClient extends SolrClient {
       }
       minRf = (Integer)header.get(UpdateRequest.MIN_REPFACT);
 
-      List<SimpleOrderedMap<String>> shardTolerantErrors = 
-        (List<SimpleOrderedMap<String>>) header.get("errors");
+      List<SimpleOrderedMap<String>> shardTolerantErrors =
+          (List<SimpleOrderedMap<String>>) header.get("errors");
       if (null != shardTolerantErrors) {
         Integer shardMaxToleratedErrors = (Integer) header.get("maxErrors");
         assert null != shardMaxToleratedErrors : "TolerantUpdateProcessor reported errors but not maxErrors";
         // if we get into some weird state where the nodes disagree about the effective maxErrors,
         // assume the min value seen to decide if we should fail.
         maxToleratedErrors = Math.min(maxToleratedErrors,
-                                      ToleratedUpdateError.getEffectiveMaxErrors(shardMaxToleratedErrors.intValue()));
-        
+            ToleratedUpdateError.getEffectiveMaxErrors(shardMaxToleratedErrors.intValue()));
+
         if (null == toleratedErrors) {
           toleratedErrors = new ArrayList<SimpleOrderedMap<String>>(shardTolerantErrors.size());
         }
@@ -723,17 +665,17 @@ public class CloudSolrClient extends SolrClient {
         // then at least one shard should have thrown a real error before this, so we don't worry
         // about having a more "singular" exception msg for that situation
         StringBuilder msgBuf =  new StringBuilder()
-          .append(toleratedErrors.size()).append(" Async failures during distributed update: ");
-          
+            .append(toleratedErrors.size()).append(" Async failures during distributed update: ");
+
         NamedList metadata = new NamedList<String>();
         for (SimpleOrderedMap<String> err : toleratedErrors) {
           ToleratedUpdateError te = ToleratedUpdateError.parseMap(err);
           metadata.add(te.getMetadataKey(), te.getMetadataValue());
-          
+
           msgBuf.append("\n").append(te.getMessage());
         }
-        
-        SolrException toThrow = new SolrException(ErrorCode.BAD_REQUEST, msgBuf.toString());
+
+        SolrException toThrow = new SolrException(SolrException.ErrorCode.BAD_REQUEST, msgBuf.toString());
         toThrow.setMetadata(metadata);
         throw toThrow;
       }
@@ -745,9 +687,13 @@ public class CloudSolrClient extends SolrClient {
     return condensed;
   }
 
-  public static class RouteResponse extends NamedList {
+  public RouteResponse condenseResponse(NamedList response, int timeMillis) {
+    return condenseResponse(response, timeMillis, RouteResponse::new);
+  }
+
+  public static class RouteResponse<T extends LBSolrClient.Req> extends NamedList {
     private NamedList routeResponses;
-    private Map<String, LBHttpSolrClient.Req> routes;
+    private Map<String, T> routes;
 
     public void setRouteResponses(NamedList routeResponses) {
       this.routeResponses = routeResponses;
@@ -757,11 +703,11 @@ public class CloudSolrClient extends SolrClient {
       return routeResponses;
     }
 
-    public void setRoutes(Map<String, LBHttpSolrClient.Req> routes) {
+    public void setRoutes(Map<String, T> routes) {
       this.routes = routes;
     }
 
-    public Map<String, LBHttpSolrClient.Req> getRoutes() {
+    public Map<String, T> getRoutes() {
       return routes;
     }
 
@@ -770,9 +716,9 @@ public class CloudSolrClient extends SolrClient {
   public static class RouteException extends SolrException {
 
     private NamedList<Throwable> throwables;
-    private Map<String, LBHttpSolrClient.Req> routes;
+    private Map<String, ? extends LBSolrClient.Req> routes;
 
-    public RouteException(ErrorCode errorCode, NamedList<Throwable> throwables, Map<String, LBHttpSolrClient.Req> routes){
+    public RouteException(ErrorCode errorCode, NamedList<Throwable> throwables, Map<String, ? extends LBSolrClient.Req> routes){
       super(errorCode, throwables.getVal(0).getMessage(), throwables.getVal(0));
       this.throwables = throwables;
       this.routes = routes;
@@ -798,7 +744,7 @@ public class CloudSolrClient extends SolrClient {
       return throwables;
     }
 
-    public Map<String, LBHttpSolrClient.Req> getRoutes() {
+    public Map<String, ? extends LBSolrClient.Req> getRoutes() {
       return this.routes;
     }
   }
@@ -848,7 +794,7 @@ public class CloudSolrClient extends SolrClient {
         // track the version of state we're using on the client side using the _stateVer_ param
         DocCollection coll = getDocCollection(requestedCollection, null);
         if (coll == null) {
-          throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + requestedCollection);
+          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + requestedCollection);
         }
         int collVer = coll.getZNodeVersion();
         if (coll.getStateFormat()>1) {
@@ -916,19 +862,18 @@ public class CloudSolrClient extends SolrClient {
       int errorCode = (rootCause instanceof SolrException) ?
           ((SolrException)rootCause).code() : SolrException.ErrorCode.UNKNOWN.code;
 
-          boolean wasCommError =
-              (rootCause instanceof ConnectException ||
-                  rootCause instanceof ConnectTimeoutException ||
-                  rootCause instanceof NoHttpResponseException ||
-                  rootCause instanceof SocketException);
-          
+      boolean wasCommError =
+          (rootCause instanceof ConnectException ||
+              rootCause instanceof SocketException ||
+              wasCommError(rootCause));
+
       log.error("Request to collection {} failed due to (" + errorCode + ") {}, retry={} commError={} errorCode={} ",
           inputCollections, rootCause.toString(), retryCount, wasCommError, errorCode);
 
       if (wasCommError
           || (exc instanceof RouteException && (errorCode == 503)) // 404 because the core does not exist 503 service unavailable
-          //TODO there are other reasons for 404. We need to change the solr response format from HTML to structured data to know that
-          ) {
+        //TODO there are other reasons for 404. We need to change the solr response format from HTML to structured data to know that
+      ) {
         // it was a communication error. it is likely that
         // the node to which the request to be sent is down . So , expire the state
         // so that the next attempt would fetch the fresh state
@@ -1023,13 +968,13 @@ public class CloudSolrClient extends SolrClient {
       }
       sendToLeaders = true;
     }
-    
+
     SolrParams reqParams = request.getParams();
     if (reqParams == null) { // TODO fix getParams to never return null!
       reqParams = new ModifiableSolrParams();
     }
 
-    final Set<String> liveNodes = stateProvider.getLiveNodes();
+    final Set<String> liveNodes = getClusterStateProvider().getLiveNodes();
 
     final List<String> theUrlList = new ArrayList<>(); // we populate this as follows...
 
@@ -1038,32 +983,32 @@ public class CloudSolrClient extends SolrClient {
         List<String> liveNodesList = new ArrayList<>(liveNodes);
         Collections.shuffle(liveNodesList, rand);
         theUrlList.add(Utils.getBaseUrlForNodeName(liveNodesList.get(0),
-            (String) stateProvider.getClusterProperty(ZkStateReader.URL_SCHEME,"http")));
+            getClusterStateProvider().getClusterProperty(ZkStateReader.URL_SCHEME,"http")));
       }
 
     } else if (ADMIN_PATHS.contains(request.getPath())) {
       for (String liveNode : liveNodes) {
         theUrlList.add(Utils.getBaseUrlForNodeName(liveNode,
-            (String) stateProvider.getClusterProperty(ZkStateReader.URL_SCHEME,"http")));
+            getClusterStateProvider().getClusterProperty(ZkStateReader.URL_SCHEME,"http")));
       }
 
     } else { // Typical...
       Set<String> collectionNames = resolveAliases(inputCollections);
       if (collectionNames.isEmpty()) {
-        throw new SolrException(ErrorCode.BAD_REQUEST,
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
             "No collection param specified on request and no default collection has been set: " + inputCollections);
       }
 
       // TODO: not a big deal because of the caching, but we could avoid looking
       //   at every shard when getting leaders if we tweaked some things
-      
+
       // Retrieve slices from the cloud state and, for each collection specified, add it to the Map of slices.
       Map<String,Slice> slices = new HashMap<>();
       String shardKeys = reqParams.get(ShardParams._ROUTE_);
       for (String collectionName : collectionNames) {
         DocCollection col = getDocCollection(collectionName, null);
         if (col == null) {
-          throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
+          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
         }
         Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col);
         ClientUtils.addSlices(slices, collectionName, routeSlices, true);
@@ -1106,8 +1051,8 @@ public class CloudSolrClient extends SolrClient {
       }
     }
 
-    LBHttpSolrClient.Req req = new LBHttpSolrClient.Req(request, theUrlList);
-    LBHttpSolrClient.Rsp rsp = lbClient.request(req);
+    LBSolrClient.Req req = new LBSolrClient.Req(request, theUrlList);
+    LBSolrClient.Rsp rsp = getLbClient().request(req);
     return rsp.getResponse();
   }
 
@@ -1115,12 +1060,12 @@ public class CloudSolrClient extends SolrClient {
   private LinkedHashSet<String> resolveAliases(List<String> inputCollections) {
     LinkedHashSet<String> collectionNames = new LinkedHashSet<>(); // consistent ordering
     for (String collectionName : inputCollections) {
-      if (stateProvider.getState(collectionName) == null) {
+      if (getClusterStateProvider().getState(collectionName) == null) {
         // perhaps it's an alias
-        List<String> aliasedCollections = stateProvider.resolveAlias(collectionName);
+        List<String> aliasedCollections = getClusterStateProvider().resolveAlias(collectionName);
         // one more level of alias indirection...  (dubious that we should support this)
         for (String aliasedCollection : aliasedCollections) {
-          collectionNames.addAll(stateProvider.resolveAlias(aliasedCollection));
+          collectionNames.addAll(getClusterStateProvider().resolveAlias(aliasedCollection));
         }
       } else {
         collectionNames.add(collectionName); // it's a collection
@@ -1129,31 +1074,6 @@ public class CloudSolrClient extends SolrClient {
     return collectionNames;
   }
 
-  @Override
-  public void close() throws IOException {
-    stateProvider.close();
-    
-    if (shutdownLBHttpSolrServer) {
-      lbClient.close();
-    }
-    
-    if (clientIsInternal && myClient!=null) {
-      HttpClientUtil.close(myClient);
-    }
-
-    if(this.threadPool != null && !this.threadPool.isShutdown()) {
-      this.threadPool.shutdown();
-    }
-  }
-
-  public LBHttpSolrClient getLbClient() {
-    return lbClient;
-  }
-
-  public HttpClient getHttpClient() {
-    return myClient;
-  }
-  
   public boolean isUpdatesToLeaders() {
     return updatesToLeaders;
   }
@@ -1170,7 +1090,7 @@ public class CloudSolrClient extends SolrClient {
    */
   public void setParallelCacheRefreshes(int n){ locks = objectList(n); }
 
-  private static ArrayList<Object> objectList(int n) {
+  protected static ArrayList<Object> objectList(int n) {
     ArrayList<Object> l =  new ArrayList<>(n);
     for(int i=0;i<n;i++) l.add(new Object());
     return l;
@@ -1187,7 +1107,7 @@ public class CloudSolrClient extends SolrClient {
           && !cacheEntry.shouldRetry()) return col;
     }
 
-    CollectionRef ref = getCollectionRef(collection);
+    ClusterState.CollectionRef ref = getCollectionRef(collection);
     if (ref == null) {
       //no such collection exists
       return null;
@@ -1222,14 +1142,14 @@ public class CloudSolrClient extends SolrClient {
     }
   }
 
-  CollectionRef getCollectionRef(String collection) {
-    return stateProvider.getState(collection);
+  ClusterState.CollectionRef getCollectionRef(String collection) {
+    return getClusterStateProvider().getState(collection);
   }
 
   /**
    * Useful for determining the minimum achieved replication factor across
    * all shards involved in processing an update request, typically useful
-   * for gauging the replication factor of a batch. 
+   * for gauging the replication factor of a batch.
    */
   @SuppressWarnings("rawtypes")
   public int getMinAchievedReplicationFactor(String collection, NamedList resp) {
@@ -1245,22 +1165,22 @@ public class CloudSolrClient extends SolrClient {
       if (achRf == null || rf < achRf) {
         achRf = rf;
       }
-    }    
+    }
     return (achRf != null) ? achRf.intValue() : -1;
   }
-  
+
   /**
    * Walks the NamedList response after performing an update request looking for
    * the replication factor that was achieved in each shard involved in the request.
-   * For single doc updates, there will be only one shard in the return value. 
+   * For single doc updates, there will be only one shard in the return value.
    */
   @SuppressWarnings("rawtypes")
   public Map<String,Integer> getShardReplicationFactor(String collection, NamedList resp) {
     connect();
-    
+
     Map<String,Integer> results = new HashMap<String,Integer>();
-    if (resp instanceof CloudSolrClient.RouteResponse) {
-      NamedList routes = ((CloudSolrClient.RouteResponse)resp).getRouteResponses();
+    if (resp instanceof RouteResponse) {
+      NamedList routes = ((RouteResponse)resp).getRouteResponses();
       DocCollection coll = getDocCollection(collection, null);
       Map<String,String> leaders = new HashMap<String,String>();
       for (Slice slice : coll.getActiveSlicesArr()) {
@@ -1273,7 +1193,7 @@ public class CloudSolrClient extends SolrClient {
           leaders.put(altLeaderUrl, slice.getName());
         }
       }
-      
+
       Iterator<Map.Entry<String,Object>> routeIter = routes.iterator();
       while (routeIter.hasNext()) {
         Map.Entry<String,Object> next = routeIter.next();
@@ -1292,21 +1212,9 @@ public class CloudSolrClient extends SolrClient {
           results.put(shard, rf);
         }
       }
-    }    
+    }
     return results;
   }
-  
-  /**
-   * @deprecated since 7.0  Use {@link Builder} methods instead. 
-   */
-  @Deprecated
-  public void setConnectionTimeout(int timeout) {
-    this.lbClient.setConnectionTimeout(timeout); 
-  }
-
-  public ClusterStateProvider getClusterStateProvider(){
-    return stateProvider;
-  }
 
   private static boolean hasInfoToFindLeaders(UpdateRequest updateRequest, String idField) {
     final Map<SolrInputDocument,Map<String,Object>> documents = updateRequest.getDocumentsMap();
@@ -1333,276 +1241,4 @@ public class CloudSolrClient extends SolrClient {
     return true;
   }
 
-  private static LBHttpSolrClient createLBHttpSolrClient(Builder cloudSolrClientBuilder, HttpClient httpClient) {
-    final LBHttpSolrClient.Builder lbBuilder = new LBHttpSolrClient.Builder();
-    lbBuilder.withHttpClient(httpClient);
-    if (cloudSolrClientBuilder.connectionTimeoutMillis != null) {
-      lbBuilder.withConnectionTimeout(cloudSolrClientBuilder.connectionTimeoutMillis);
-    }
-    if (cloudSolrClientBuilder.socketTimeoutMillis != null) {
-      lbBuilder.withSocketTimeout(cloudSolrClientBuilder.socketTimeoutMillis);
-    }
-    final LBHttpSolrClient lbClient = lbBuilder.build();
-    lbClient.setRequestWriter(new BinaryRequestWriter());
-    lbClient.setParser(new BinaryResponseParser());
-    
-    return lbClient;
-  }
-
-  /**
-   * Constructs {@link CloudSolrClient} instances from provided configuration.
-   */
-  public static class Builder extends SolrClientBuilder<Builder> {
-    protected Collection<String> zkHosts = new ArrayList<>();
-    protected List<String> solrUrls = new ArrayList<>();
-    protected String zkChroot;
-    protected LBHttpSolrClient loadBalancedSolrClient;
-    protected LBHttpSolrClient.Builder lbClientBuilder;
-    protected boolean shardLeadersOnly = true;
-    protected boolean directUpdatesToLeadersOnly = false;
-    protected boolean parallelUpdates = true;
-    protected ClusterStateProvider stateProvider;
-    
-    /**
-     * @deprecated use other constructors instead.  This constructor will be changing visibility in an upcoming release.
-     */
-    @Deprecated
-    public Builder() {}
-    
-    /**
-     * Provide a series of Solr URLs to be used when configuring {@link CloudSolrClient} instances.
-     * The solr client will use these urls to understand the cluster topology, which solr nodes are active etc.
-     * 
-     * Provided Solr URLs are expected to point to the root Solr path ("http://hostname:8983/solr"); they should not
-     * include any collections, cores, or other path components.
-     *
-     * Usage example:
-     *
-     * <pre>
-     *   final List&lt;String&gt; solrBaseUrls = new ArrayList&lt;String&gt;();
-     *   solrBaseUrls.add("http://solr1:8983/solr"); solrBaseUrls.add("http://solr2:8983/solr"); solrBaseUrls.add("http://solr3:8983/solr");
-     *   final SolrClient client = new CloudSolrClient.Builder(solrBaseUrls).build();
-     * </pre>
-     */
-    public Builder(List<String> solrUrls) {
-      this.solrUrls = solrUrls;
-    }
-    
-    /**
-     * Provide a series of ZK hosts which will be used when configuring {@link CloudSolrClient} instances.
-     *
-     * Usage example when Solr stores data at the ZooKeeper root ('/'):
-     *
-     * <pre>
-     *   final List&lt;String&gt; zkServers = new ArrayList&lt;String&gt;();
-     *   zkServers.add("zookeeper1:2181"); zkServers.add("zookeeper2:2181"); zkServers.add("zookeeper3:2181");
-     *   final SolrClient client = new CloudSolrClient.Builder(zkServers, Optional.empty()).build();
-     * </pre>
-     *
-     * Usage example when Solr data is stored in a ZooKeeper chroot:
-     *
-     *  <pre>
-     *    final List&lt;String&gt; zkServers = new ArrayList&lt;String&gt;();
-     *    zkServers.add("zookeeper1:2181"); zkServers.add("zookeeper2:2181"); zkServers.add("zookeeper3:2181");
-     *    final SolrClient client = new CloudSolrClient.Builder(zkServers, Optional.of("/solr")).build();
-     *  </pre>
-     *
-     * @param zkHosts a List of at least one ZooKeeper host and port (e.g. "zookeeper1:2181")
-     * @param zkChroot the path to the root ZooKeeper node containing Solr data.  Provide {@code java.util.Optional.empty()} if no ZK chroot is used.
-     */
-    public Builder(List<String> zkHosts, Optional<String> zkChroot) {
-      this.zkHosts = zkHosts;
-      if (zkChroot.isPresent()) this.zkChroot = zkChroot.get();
-    }
-
-    /**
-     * Provide a ZooKeeper client endpoint to be used when configuring {@link CloudSolrClient} instances.
-     * 
-     * Method may be called multiple times.  All provided values will be used.
-     * 
-     * @param zkHost
-     *          The client endpoint of the ZooKeeper quorum containing the cloud
-     *          state.
-     *          
-     * @deprecated use Zk-host constructor instead
-     */
-    @Deprecated
-    public Builder withZkHost(String zkHost) {
-      this.zkHosts.add(zkHost);
-      return this;
-    }
-
-    /**
-     * Provide a Solr URL to be used when configuring {@link CloudSolrClient} instances.
-     *
-     * Method may be called multiple times. One of the provided values will be used to fetch
-     * the list of live Solr nodes that the underlying {@link HttpClusterStateProvider} would be maintaining.
-     * 
-     * Provided Solr URL is expected to point to the root Solr path ("http://hostname:8983/solr"); it should not
-     * include any collections, cores, or other path components.
-     * 
-     * @deprecated use Solr-URL constructor instead
-     */
-    @Deprecated
-    public Builder withSolrUrl(String solrUrl) {
-      this.solrUrls.add(solrUrl);
-      return this;
-    }
-
-    /**
-     * Provide a list of Solr URL to be used when configuring {@link CloudSolrClient} instances.
-     * One of the provided values will be used to fetch the list of live Solr
-     * nodes that the underlying {@link HttpClusterStateProvider} would be maintaining.
-     * 
-     * Provided Solr URLs are expected to point to the root Solr path ("http://hostname:8983/solr"); they should not
-     * include any collections, cores, or other path components.
-     * 
-     * @deprecated use Solr URL constructors instead
-     */
-    @Deprecated
-    public Builder withSolrUrl(Collection<String> solrUrls) {
-      this.solrUrls.addAll(solrUrls);
-      return this;
-    }
-
-    /**
-     * Provides a {@link HttpClient} for the builder to use when creating clients.
-     */
-    public Builder withLBHttpSolrClientBuilder(LBHttpSolrClient.Builder lbHttpSolrClientBuilder) {
-      this.lbClientBuilder = lbHttpSolrClientBuilder;
-      return this;
-    }
-
-    /**
-     * Provide a series of ZooKeeper client endpoints for the builder to use when creating clients.
-     * 
-     * Method may be called multiple times.  All provided values will be used.
-     * 
-     * @param zkHosts
-     *          A Java Collection (List, Set, etc) of HOST:PORT strings, one for
-     *          each host in the ZooKeeper ensemble. Note that with certain
-     *          Collection types like HashSet, the order of hosts in the final
-     *          connect string may not be in the same order you added them.
-     *          
-     * @deprecated use Zk-host constructor instead
-     */
-    @Deprecated
-    public Builder withZkHost(Collection<String> zkHosts) {
-      this.zkHosts.addAll(zkHosts);
-      return this;
-    }
-
-    /**
-     * Provides a ZooKeeper chroot for the builder to use when creating clients.
-     * 
-     * @deprecated use Zk-host constructor instead
-     */
-    @Deprecated
-    public Builder withZkChroot(String zkChroot) {
-      this.zkChroot = zkChroot;
-      return this;
-    }
-    
-    /**
-     * Provides a {@link LBHttpSolrClient} for the builder to use when creating clients.
-     */
-    public Builder withLBHttpSolrClient(LBHttpSolrClient loadBalancedSolrClient) {
-      this.loadBalancedSolrClient = loadBalancedSolrClient;
-      return this;
-    }
-
-    /**
-     * Tells {@link Builder} that created clients should send updates only to shard leaders.
-     *
-     * WARNING: This method currently has no effect.  See SOLR-6312 for more information.
-     */
-    public Builder sendUpdatesOnlyToShardLeaders() {
-      shardLeadersOnly = true;
-      return this;
-    }
-    
-    /**
-     * Tells {@link Builder} that created clients should send updates to all replicas for a shard.
-     *
-     * WARNING: This method currently has no effect.  See SOLR-6312 for more information.
-     */
-    public Builder sendUpdatesToAllReplicasInShard() {
-      shardLeadersOnly = false;
-      return this;
-    }
-
-    /**
-     * Tells {@link Builder} that created clients should send direct updates to shard leaders only.
-     *
-     * UpdateRequests whose leaders cannot be found will "fail fast" on the client side with a {@link SolrException}
-     */
-    public Builder sendDirectUpdatesToShardLeadersOnly() {
-      directUpdatesToLeadersOnly = true;
-      return this;
-    }
-
-    /**
-     * Tells {@link Builder} that created clients can send updates to any shard replica (shard leaders and non-leaders).
-     *
-     * Shard leaders are still preferred, but the created clients will fallback to using other replicas if a leader
-     * cannot be found.
-     */
-    public Builder sendDirectUpdatesToAnyShardReplica() {
-      directUpdatesToLeadersOnly = false;
-      return this;
-    }
-
-    /**
-     * Tells {@link Builder} whether created clients should send shard updates serially or in parallel
-     *
-     * When an {@link UpdateRequest} affects multiple shards, {@link CloudSolrClient} splits it up and sends a request
-     * to each affected shard.  This setting chooses whether those sub-requests are sent serially or in parallel.
-     * <p>
-     * If not set, this defaults to 'true' and sends sub-requests in parallel.
-     */
-    public Builder withParallelUpdates(boolean parallelUpdates) {
-      this.parallelUpdates = parallelUpdates;
-      return this;
-    }
-
-    /**
-     * Expert feature where you want to implement a custom cluster discovery mechanism of the solr nodes as part of the
-     * cluster.
-     *
-     * @deprecated since this is an expert feature we don't want to expose this to regular users. To use this feature
-     * extend CloudSolrClient.Builder and pass your custom ClusterStateProvider
-     */
-    @Deprecated
-    public Builder withClusterStateProvider(ClusterStateProvider stateProvider) {
-      this.stateProvider = stateProvider;
-      return this;
-    }
-    
-    /**
-     * Create a {@link CloudSolrClient} based on the provided configuration.
-     */
-    public CloudSolrClient build() {
-      if (stateProvider == null) {
-        if (!zkHosts.isEmpty()) {
-          stateProvider = new ZkClientClusterStateProvider(zkHosts, zkChroot);
-        }
-        else if (!this.solrUrls.isEmpty()) {
-          try {
-            stateProvider = new HttpClusterStateProvider(solrUrls, httpClient);
-          } catch (Exception e) {
-            throw new RuntimeException("Couldn't initialize a HttpClusterStateProvider (is/are the "
-                + "Solr server(s), "  + solrUrls + ", down?)", e);
-          }
-        } else {
-          throw new IllegalArgumentException("Both zkHosts and solrUrl cannot be null.");
-        }
-      }
-      return new CloudSolrClient(this);
-    }
-
-    @Override
-    public Builder getThis() {
-      return this;
-    }
-  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
similarity index 78%
copy from solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java
copy to solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
index bbab3aa..042b6e4 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.solr.client.solrj.impl;
 
 import java.io.IOException;
@@ -26,15 +27,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.http.client.HttpClient;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.common.cloud.Aliases;
 import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.ClusterState.CollectionRef;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
@@ -43,7 +41,9 @@ import org.apache.solr.common.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class HttpClusterStateProvider implements ClusterStateProvider {
+import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.*;
+
+public abstract class BaseHttpClusterStateProvider implements ClusterStateProvider {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private String urlScheme;
@@ -53,21 +53,16 @@ public class HttpClusterStateProvider implements ClusterStateProvider {
   long aliasesTimestamp = 0;
 
   private int cacheTimeout = 5; // the liveNodes and aliases cache will be invalidated after 5 secs
-  final HttpClient httpClient;
-  final boolean clientIsInternal;
 
-  public HttpClusterStateProvider(List<String> solrUrls, HttpClient httpClient) throws Exception {
-    this.httpClient = httpClient == null? HttpClientUtil.createClient(null): httpClient;
-    this.clientIsInternal = httpClient == null;
+  public void init(List<String> solrUrls) throws Exception {
     for (String solrUrl: solrUrls) {
       urlScheme = solrUrl.startsWith("https")? "https": "http";
-      try (SolrClient initialClient = new HttpSolrClient.Builder().withBaseSolrUrl(solrUrl).withHttpClient(httpClient).build()) {
-        Set<String> liveNodes = fetchLiveNodes(initialClient); // throws exception if unable to fetch
-        this.liveNodes = liveNodes;
+      try (SolrClient initialClient = getSolrClient(solrUrl)) {
+        this.liveNodes = fetchLiveNodes(initialClient);
         liveNodesTimestamp = System.nanoTime();
         break;
       } catch (IOException e) {
-        log.warn("Attempt to fetch live_nodes from " + solrUrl + " failed.", e);
+        log.warn("Attempt to fetch cluster state from {} failed.", solrUrl, e);
       }
     }
 
@@ -80,19 +75,13 @@ public class HttpClusterStateProvider implements ClusterStateProvider {
     }
   }
 
-  @Override
-  public void close() throws IOException {
-    if (this.clientIsInternal && this.httpClient != null) {
-      HttpClientUtil.close(httpClient);
-    }
-  }
+  protected abstract SolrClient getSolrClient(String baseUrl);
 
   @Override
-  public CollectionRef getState(String collection) {
+  public ClusterState.CollectionRef getState(String collection) {
     for (String nodeName: liveNodes) {
-      try (HttpSolrClient client = new HttpSolrClient.Builder().
-          withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
-          withHttpClient(httpClient).build()) {
+      String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
+      try (SolrClient client = getSolrClient(baseUrl)) {
         ClusterState cs = fetchClusterState(client, collection, null);
         return cs.getCollectionRef(collection);
       } catch (SolrServerException | IOException e) {
@@ -102,8 +91,7 @@ public class HttpClusterStateProvider implements ClusterStateProvider {
         if ("NOT_FOUND".equals(e.getMetadata("CLUSTERSTATUS"))) {
           return null;
         }
-        log.warn("Attempt to fetch cluster state from " +
-            Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
+        log.warn("Attempt to fetch cluster state from {} failed.", baseUrl, e);
       } catch (NotACollectionException e) {
         // Cluster state for the given collection was not found, could be an alias.
         // Lets fetch/update our aliases:
@@ -170,16 +158,14 @@ public class HttpClusterStateProvider implements ClusterStateProvider {
     }
     if (TimeUnit.SECONDS.convert((System.nanoTime() - liveNodesTimestamp), TimeUnit.NANOSECONDS) > getCacheTimeout()) {
       for (String nodeName: liveNodes) {
-        try (HttpSolrClient client = new HttpSolrClient.Builder().
-            withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
-            withHttpClient(httpClient).build()) {
+        String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
+        try (SolrClient client = getSolrClient(baseUrl)) {
           Set<String> liveNodes = fetchLiveNodes(client);
           this.liveNodes = (liveNodes);
           liveNodesTimestamp = System.nanoTime();
           return liveNodes;
         } catch (Exception e) {
-          log.warn("Attempt to fetch live_nodes from " +
-              Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
+          log.warn("Attempt to fetch cluster state from {} failed.", baseUrl, e);
         }
       }
       throw new RuntimeException("Tried fetching live_nodes using all the node names we knew of, i.e. " + liveNodes +". However, "
@@ -198,8 +184,7 @@ public class HttpClusterStateProvider implements ClusterStateProvider {
     QueryRequest request = new QueryRequest(params);
     request.setPath("/admin/collections");
     NamedList cluster = (SimpleOrderedMap) client.request(request).get("cluster");
-    Set<String> liveNodes = new HashSet((List<String>)(cluster.get("live_nodes")));
-    return liveNodes;
+    return (Set<String>) new HashSet((List<String>)(cluster.get("live_nodes")));
   }
 
   @Override
@@ -219,12 +204,10 @@ public class HttpClusterStateProvider implements ClusterStateProvider {
     if (forceFetch || this.aliases == null ||
         TimeUnit.SECONDS.convert((System.nanoTime() - aliasesTimestamp), TimeUnit.NANOSECONDS) > getCacheTimeout()) {
       for (String nodeName: liveNodes) {
-        try (HttpSolrClient client = new HttpSolrClient.Builder().
-            withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
-            withHttpClient(httpClient).build()) {
+        String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
+        try (SolrClient client = getSolrClient(baseUrl)) {
 
-          Map<String, List<String>> aliases = new CollectionAdminRequest.ListAliases().process(client).getAliasesAsLists();
-          this.aliases = aliases;
+          this.aliases = new CollectionAdminRequest.ListAliases().process(client).getAliasesAsLists();
           this.aliasesTimestamp = System.nanoTime();
           return Collections.unmodifiableMap(this.aliases);
         } catch (SolrServerException | RemoteSolrException | IOException e) {
@@ -236,8 +219,7 @@ public class HttpClusterStateProvider implements ClusterStateProvider {
             this.aliasesTimestamp = System.nanoTime();
             return aliases;
           }
-          log.warn("Attempt to fetch cluster state from " +
-              Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
+          log.warn("Attempt to fetch cluster state from {} failed.", baseUrl, e);
         }
       }
 
@@ -254,14 +236,11 @@ public class HttpClusterStateProvider implements ClusterStateProvider {
   @Override
   public ClusterState getClusterState() throws IOException {
     for (String nodeName: liveNodes) {
-      try (HttpSolrClient client = new HttpSolrClient.Builder().
-          withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
-          withHttpClient(httpClient).build()) {
-        ClusterState cs = fetchClusterState(client, null, null);
-        return cs;
-      } catch (SolrServerException | RemoteSolrException | IOException e) {
-        log.warn("Attempt to fetch cluster state from " +
-            Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
+      String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
+      try (SolrClient client = getSolrClient(baseUrl)) {
+        return fetchClusterState(client, null, null);
+      } catch (SolrServerException | HttpSolrClient.RemoteSolrException | IOException e) {
+        log.warn("Attempt to fetch cluster state from {} failed.", baseUrl, e);
       } catch (NotACollectionException e) {
         // not possible! (we passed in null for collection so it can't be an alias)
         throw new RuntimeException("null should never cause NotACollectionException in " +
@@ -278,15 +257,13 @@ public class HttpClusterStateProvider implements ClusterStateProvider {
   @Override
   public Map<String, Object> getClusterProperties() {
     for (String nodeName : liveNodes) {
-      try (HttpSolrClient client = new HttpSolrClient.Builder().
-          withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
-          withHttpClient(httpClient).build()) {
+      String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
+      try (SolrClient client = getSolrClient(baseUrl)) {
         Map<String, Object> clusterProperties = new HashMap<>();
         fetchClusterState(client, null, clusterProperties);
         return clusterProperties;
-      } catch (SolrServerException | RemoteSolrException | IOException e) {
-        log.warn("Attempt to fetch cluster state from " +
-            Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
+      } catch (SolrServerException | HttpSolrClient.RemoteSolrException | IOException e) {
+        log.warn("Attempt to fetch cluster state from {} failed.", baseUrl, e);
       } catch (NotACollectionException e) {
         // not possible! (we passed in null for collection so it can't be an alias)
         throw new RuntimeException("null should never cause NotACollectionException in " +
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpSolrClient.java
new file mode 100644
index 0000000..dceb13c
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpSolrClient.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import java.util.Collections;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.NamedList;
+
+import static org.apache.solr.common.util.Utils.getObjectByPath;
+
+public abstract class BaseHttpSolrClient extends SolrClient {
+
+  /**
+   * Subclass of SolrException that allows us to capture an arbitrary HTTP
+   * status code that may have been returned by the remote server or a
+   * proxy along the way.
+   */
+  public static class RemoteSolrException extends SolrException {
+    /**
+     * @param remoteHost the host the error was received from
+     * @param code Arbitrary HTTP status code
+     * @param msg Exception Message
+     * @param th Throwable to wrap with this Exception
+     */
+    public RemoteSolrException(String remoteHost, int code, String msg, Throwable th) {
+      super(code, "Error from server at " + remoteHost + ": " + msg, th);
+    }
+  }
+
+  /**
+   * This should be thrown when a server has an error in executing the request and
+   * it sends a proper payload back to the client
+   */
+  public static class RemoteExecutionException extends HttpSolrClient.RemoteSolrException {
+    private NamedList meta;
+
+    public RemoteExecutionException(String remoteHost, int code, String msg, NamedList meta) {
+      super(remoteHost, code, msg, null);
+      this.meta = meta;
+    }
+
+
+    public static HttpSolrClient.RemoteExecutionException create(String host, NamedList errResponse) {
+      Object errObj = errResponse.get("error");
+      if (errObj != null) {
+        Number code = (Number) getObjectByPath(errObj, true, Collections.singletonList("code"));
+        String msg = (String) getObjectByPath(errObj, true, Collections.singletonList("msg"));
+        return new HttpSolrClient.RemoteExecutionException(host, code == null ? ErrorCode.UNKNOWN.code : code.intValue(),
+            msg == null ? "Unknown Error" : msg, errResponse);
+
+      } else {
+        throw new RuntimeException("No error");
+      }
+
+    }
+
+    public NamedList getMetaData() {
+
+      return meta;
+    }
+  }
+
+}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
new file mode 100644
index 0000000..8e6ac82
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrException;
+
+/**
+ * SolrJ client class to communicate with SolrCloud using Http2SolrClient.
+ * Instances of this class communicate with Zookeeper to discover
+ * Solr endpoints for SolrCloud collections, and then use the
+ * {@link LBHttp2SolrClient} to issue requests.
+ *
+ * This class assumes the id field for your documents is called
+ * 'id' - if this is not the case, you must set the right name
+ * with {@link #setIdField(String)}.
+ *
+ * @lucene.experimental
+ * @since solr 8.0
+ */
+@SuppressWarnings("serial")
+public class CloudHttp2SolrClient  extends BaseCloudSolrClient {
+
+  private final ClusterStateProvider stateProvider;
+  private final LBHttp2SolrClient lbClient;
+  private Http2SolrClient myClient;
+  private final boolean clientIsInternal;
+
+  /**
+   * Create a new client object that connects to Zookeeper and is always aware
+   * of the SolrCloud state. If there is a fully redundant Zookeeper quorum and
+   * SolrCloud has enough replicas for every shard in a collection, there is no
+   * single point of failure. Updates will be sent to shard leaders by default.
+   *
+   * @param builder a {@link Http2SolrClient.Builder} with the options used to create the client.
+   */
+  protected CloudHttp2SolrClient(Builder builder) {
+    super(builder.shardLeadersOnly, builder.parallelUpdates, builder.directUpdatesToLeadersOnly);
+    this.clientIsInternal = builder.httpClient == null;
+    this.myClient = (builder.httpClient == null) ? new Http2SolrClient.Builder().build() : builder.httpClient;
+    if (builder.stateProvider == null) {
+      if (builder.zkHosts != null && builder.solrUrls != null) {
+        throw new IllegalArgumentException("Both zkHost(s) & solrUrl(s) have been specified. Only specify one.");
+      }
+      if (builder.zkHosts != null) {
+        this.stateProvider = new ZkClientClusterStateProvider(builder.zkHosts, builder.zkChroot);
+      } else if (builder.solrUrls != null && !builder.solrUrls.isEmpty()) {
+        try {
+          this.stateProvider = new Http2ClusterStateProvider(builder.solrUrls, builder.httpClient);
+        } catch (Exception e) {
+          throw new RuntimeException("Couldn't initialize a HttpClusterStateProvider (is/are the "
+              + "Solr server(s), "  + builder.solrUrls + ", down?)", e);
+        }
+      } else {
+        throw new IllegalArgumentException("Both zkHosts and solrUrl cannot be null.");
+      }
+    } else {
+      this.stateProvider = builder.stateProvider;
+    }
+    this.lbClient = new LBHttp2SolrClient(myClient);
+
+  }
+
+
+  @Override
+  public void close() throws IOException {
+    stateProvider.close();
+    lbClient.close();
+
+    if (clientIsInternal && myClient!=null) {
+      myClient.close();
+    }
+
+    super.close();
+  }
+
+  public LBHttp2SolrClient getLbClient() {
+    return lbClient;
+  }
+
+  @Override
+  public ClusterStateProvider getClusterStateProvider() {
+    return stateProvider;
+  }
+
+  public Http2SolrClient getHttpClient() {
+    return myClient;
+  }
+
+  @Override
+  protected boolean wasCommError(Throwable rootCause) {
+    return false;
+  }
+
+  /**
+   * Constructs {@link CloudHttp2SolrClient} instances from provided configuration.
+   */
+  public static class Builder {
+    protected Collection<String> zkHosts = new ArrayList<>();
+    protected List<String> solrUrls = new ArrayList<>();
+    protected String zkChroot;
+    protected Http2SolrClient httpClient;
+    protected boolean shardLeadersOnly = true;
+    protected boolean directUpdatesToLeadersOnly = false;
+    protected boolean parallelUpdates = true;
+    protected ClusterStateProvider stateProvider;
+
+    /**
+     * Provide a series of Solr URLs to be used when configuring {@link CloudHttp2SolrClient} instances.
+     * The solr client will use these urls to understand the cluster topology, which solr nodes are active etc.
+     *
+     * Provided Solr URLs are expected to point to the root Solr path ("http://hostname:8983/solr"); they should not
+     * include any collections, cores, or other path components.
+     *
+     * Usage example:
+     *
+     * <pre>
+     *   final List&lt;String&gt; solrBaseUrls = new ArrayList&lt;String&gt;();
+     *   solrBaseUrls.add("http://solr1:8983/solr"); solrBaseUrls.add("http://solr2:8983/solr"); solrBaseUrls.add("http://solr3:8983/solr");
+     *   final SolrClient client = new CloudHttp2SolrClient.Builder(solrBaseUrls).build();
+     * </pre>
+     */
+    public Builder(List<String> solrUrls) {
+      this.solrUrls = solrUrls;
+    }
+
+    /**
+     * Provide a series of ZK hosts which will be used when configuring {@link CloudHttp2SolrClient} instances.
+     *
+     * Usage example when Solr stores data at the ZooKeeper root ('/'):
+     *
+     * <pre>
+     *   final List&lt;String&gt; zkServers = new ArrayList&lt;String&gt;();
+     *   zkServers.add("zookeeper1:2181"); zkServers.add("zookeeper2:2181"); zkServers.add("zookeeper3:2181");
+     *   final SolrClient client = new CloudHttp2SolrClient.Builder(zkServers, Optional.empty()).build();
+     * </pre>
+     *
+     * Usage example when Solr data is stored in a ZooKeeper chroot:
+     *
+     *  <pre>
+     *    final List&lt;String&gt; zkServers = new ArrayList&lt;String&gt;();
+     *    zkServers.add("zookeeper1:2181"); zkServers.add("zookeeper2:2181"); zkServers.add("zookeeper3:2181");
+     *    final SolrClient client = new CloudHttp2SolrClient.Builder(zkServers, Optional.of("/solr")).build();
+     *  </pre>
+     *
+     * @param zkHosts a List of at least one ZooKeeper host and port (e.g. "zookeeper1:2181")
+     * @param zkChroot the path to the root ZooKeeper node containing Solr data.  Provide {@code java.util.Optional.empty()} if no ZK chroot is used.
+     */
+    public Builder(List<String> zkHosts, Optional<String> zkChroot) {
+      this.zkHosts = zkHosts;
+      if (zkChroot.isPresent()) this.zkChroot = zkChroot.get();
+    }
+
+    /**
+     * Tells {@link CloudHttp2SolrClient.Builder} that created clients should send direct updates to shard leaders only.
+     *
+     * UpdateRequests whose leaders cannot be found will "fail fast" on the client side with a {@link SolrException}
+     */
+    public Builder sendDirectUpdatesToShardLeadersOnly() {
+      directUpdatesToLeadersOnly = true;
+      return this;
+    }
+
+    /**
+     * Tells {@link CloudHttp2SolrClient.Builder} that created clients can send updates to any shard replica (shard leaders and non-leaders).
+     *
+     * Shard leaders are still preferred, but the created clients will fallback to using other replicas if a leader
+     * cannot be found.
+     */
+    public Builder sendDirectUpdatesToAnyShardReplica() {
+      directUpdatesToLeadersOnly = false;
+      return this;
+    }
+
+    /**
+     * Tells {@link CloudHttp2SolrClient.Builder} whether created clients should send shard updates serially or in parallel
+     *
+     * When an {@link UpdateRequest} affects multiple shards, {@link CloudHttp2SolrClient} splits it up and sends a request
+     * to each affected shard.  This setting chooses whether those sub-requests are sent serially or in parallel.
+     * <p>
+     * If not set, this defaults to 'true' and sends sub-requests in parallel.
+     */
+    public Builder withParallelUpdates(boolean parallelUpdates) {
+      this.parallelUpdates = parallelUpdates;
+      return this;
+    }
+
+    public Builder withHttpClient(Http2SolrClient httpClient) {
+      this.httpClient = httpClient;
+      return this;
+    }
+
+    /**
+     * Create a {@link CloudHttp2SolrClient} based on the provided configuration.
+     */
+    public CloudHttp2SolrClient build() {
+      if (stateProvider == null) {
+        if (!zkHosts.isEmpty()) {
+          stateProvider = new ZkClientClusterStateProvider(zkHosts, zkChroot);
+        }
+        else if (!this.solrUrls.isEmpty()) {
+          try {
+            stateProvider = new Http2ClusterStateProvider(solrUrls, httpClient);
+          } catch (Exception e) {
+            throw new RuntimeException("Couldn't initialize a HttpClusterStateProvider (is/are the "
+                + "Solr server(s), "  + solrUrls + ", down?)", e);
+          }
+        } else {
+          throw new IllegalArgumentException("Both zkHosts and solrUrl cannot be null.");
+        }
+      }
+      return new CloudHttp2SolrClient(this);
+    }
+
+  }
+}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index 37cdba7..0b08780 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -17,78 +17,21 @@
 package org.apache.solr.client.solrj.impl;
 
 import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.net.ConnectException;
-import java.net.SocketException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.http.NoHttpResponseException;
 import org.apache.http.client.HttpClient;
 import org.apache.http.conn.ConnectTimeoutException;
-import org.apache.solr.client.solrj.ResponseParser;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.V2RequestSupport;
-import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
-import org.apache.solr.client.solrj.request.IsUpdateRequest;
-import org.apache.solr.client.solrj.request.RequestWriter;
 import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.request.V2Request;
-import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.ToleratedUpdateError;
-import org.apache.solr.common.cloud.ClusterState.CollectionRef;
-import org.apache.solr.common.cloud.CollectionStatePredicate;
-import org.apache.solr.common.cloud.CollectionStateWatcher;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
-import org.apache.solr.common.cloud.ImplicitDocRouter;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.ShardParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.Hash;
 import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SimpleOrderedMap;
-import org.apache.solr.common.util.SolrjNamedThreadFactory;
-import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.common.util.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
-import static org.apache.solr.common.params.CommonParams.ID;
 
 /**
  * SolrJ client class to communicate with SolrCloud.
@@ -101,102 +44,15 @@ import static org.apache.solr.common.params.CommonParams.ID;
  * with {@link #setIdField(String)}.
  */
 @SuppressWarnings("serial")
-public class CloudSolrClient extends SolrClient {
-
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+public class CloudSolrClient extends BaseCloudSolrClient {
 
   private final ClusterStateProvider stateProvider;
-  private volatile String defaultCollection;
   private final LBHttpSolrClient lbClient;
   private final boolean shutdownLBHttpSolrServer;
   private HttpClient myClient;
   private final boolean clientIsInternal;
-  //no of times collection state to be reloaded if stale state error is received
-  private static final int MAX_STALE_RETRIES = Integer.parseInt(System.getProperty("cloudSolrClientMaxStaleRetries", "5"));
-  Random rand = new Random();
-  
-  private final boolean updatesToLeaders;
-  private final boolean directUpdatesToLeadersOnly;
-  private boolean parallelUpdates; //TODO final
-  private ExecutorService threadPool = ExecutorUtil
-      .newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory(
-          "CloudSolrClient ThreadPool"));
-  private String idField = ID;
-  public static final String STATE_VERSION = "_stateVer_";
-  private long retryExpiryTime = TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS);//3 seconds or 3 million nanos
-  private final Set<String> NON_ROUTABLE_PARAMS;
-  {
-    NON_ROUTABLE_PARAMS = new HashSet<>();
-    NON_ROUTABLE_PARAMS.add(UpdateParams.EXPUNGE_DELETES);
-    NON_ROUTABLE_PARAMS.add(UpdateParams.MAX_OPTIMIZE_SEGMENTS);
-    NON_ROUTABLE_PARAMS.add(UpdateParams.COMMIT);
-    NON_ROUTABLE_PARAMS.add(UpdateParams.WAIT_SEARCHER);
-    NON_ROUTABLE_PARAMS.add(UpdateParams.OPEN_SEARCHER);
-    
-    NON_ROUTABLE_PARAMS.add(UpdateParams.SOFT_COMMIT);
-    NON_ROUTABLE_PARAMS.add(UpdateParams.PREPARE_COMMIT);
-    NON_ROUTABLE_PARAMS.add(UpdateParams.OPTIMIZE);
-    
-    // Not supported via SolrCloud
-    // NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
-
-  }
-  private volatile List<Object> locks = objectList(3);
-
-
-  static class StateCache extends ConcurrentHashMap<String, ExpiringCachedDocCollection> {
-    final AtomicLong puts = new AtomicLong();
-    final AtomicLong hits = new AtomicLong();
-    final Lock evictLock = new ReentrantLock(true);
-    private volatile long timeToLive = 60 * 1000L;
-
-    @Override
-    public ExpiringCachedDocCollection get(Object key) {
-      ExpiringCachedDocCollection val = super.get(key);
-      if(val == null) {
-        // a new collection is likely to be added now.
-        //check if there are stale items and remove them
-        evictStale();
-        return null;
-      }
-      if(val.isExpired(timeToLive)) {
-        super.remove(key);
-        return null;
-      }
-      hits.incrementAndGet();
-      return val;
-    }
-
-    @Override
-    public ExpiringCachedDocCollection put(String key, ExpiringCachedDocCollection value) {
-      puts.incrementAndGet();
-      return super.put(key, value);
-    }
-
-    void evictStale() {
-      if(!evictLock.tryLock()) return;
-      try {
-        for (Entry<String, ExpiringCachedDocCollection> e : entrySet()) {
-          if(e.getValue().isExpired(timeToLive)){
-            super.remove(e.getKey());
-          }
-        }
-      } finally {
-        evictLock.unlock();
-      }
-    }
-
-  }
 
-  /**
-   * This is the time to wait to refetch the state
-   * after getting the same state version from ZK
-   * <p>
-   * secs
-   */
-  public void setRetryExpiryTime(int secs) {
-    this.retryExpiryTime = TimeUnit.NANOSECONDS.convert(secs, TimeUnit.SECONDS);
-  }
+  public static final String STATE_VERSION = BaseCloudSolrClient.STATE_VERSION;
 
   /**
    * @deprecated since 7.0  Use {@link Builder} methods instead. 
@@ -206,42 +62,6 @@ public class CloudSolrClient extends SolrClient {
     lbClient.setSoTimeout(timeout);
   }
 
-  protected final StateCache collectionStateCache = new StateCache();
-
-  class ExpiringCachedDocCollection {
-    final DocCollection cached;
-    final long cachedAt;
-    //This is the time at which the collection is retried and got the same old version
-    volatile long retriedAt = -1;
-    //flag that suggests that this is potentially to be rechecked
-    volatile boolean maybeStale = false;
-
-    ExpiringCachedDocCollection(DocCollection cached) {
-      this.cached = cached;
-      this.cachedAt = System.nanoTime();
-    }
-
-    boolean isExpired(long timeToLiveMs) {
-      return (System.nanoTime() - cachedAt)
-          > TimeUnit.NANOSECONDS.convert(timeToLiveMs, TimeUnit.MILLISECONDS);
-    }
-
-    boolean shouldRetry() {
-      if (maybeStale) {// we are not sure if it is stale so check with retry time
-        if ((retriedAt == -1 ||
-            (System.nanoTime() - retriedAt) > retryExpiryTime)) {
-          return true;// we retried a while back. and we could not get anything new.
-          //it's likely that it is not going to be available now also.
-        }
-      }
-      return false;
-    }
-
-    void setRetriedAt() {
-      retriedAt = System.nanoTime();
-    }
-  }
-  
   /**
    * Create a new client object that connects to Zookeeper and is always aware
    * of the SolrCloud state. If there is a fully redundant Zookeeper quorum and
@@ -251,6 +71,7 @@ public class CloudSolrClient extends SolrClient {
    * @param builder a {@link CloudSolrClient.Builder} with the options used to create the client.
    */
   protected CloudSolrClient(Builder builder) {
+    super(builder.shardLeadersOnly, builder.parallelUpdates, builder.directUpdatesToLeadersOnly);
     if (builder.stateProvider == null) {
       if (builder.zkHosts != null && builder.solrUrls != null) {
         throw new IllegalArgumentException("Both zkHost(s) & solrUrl(s) have been specified. Only specify one.");
@@ -280,9 +101,6 @@ public class CloudSolrClient extends SolrClient {
     this.myClient = (builder.httpClient == null) ? HttpClientUtil.createClient(null) : builder.httpClient;
     if (builder.loadBalancedSolrClient == null) builder.loadBalancedSolrClient = createLBHttpSolrClient(builder, myClient);
     this.lbClient = builder.loadBalancedSolrClient;
-    this.updatesToLeaders = builder.shardLeadersOnly;
-    this.parallelUpdates = builder.parallelUpdates;
-    this.directUpdatesToLeadersOnly = builder.directUpdatesToLeadersOnly;
   }
   
   private void propagateLBClientConfigOptions(Builder builder) {
@@ -297,91 +115,14 @@ public class CloudSolrClient extends SolrClient {
     }
   }
 
-  /** Sets the cache ttl for DocCollection Objects cached  . This is only applicable for collections which are persisted outside of clusterstate.json
-   * @param seconds ttl value in seconds
-   */
-  public void setCollectionCacheTTl(int seconds){
-    assert seconds > 0;
-    this.collectionStateCache.timeToLive = seconds * 1000L;
-  }
-
-  public ResponseParser getParser() {
-    return lbClient.getParser();
-  }
-
-  /**
-   * Note: This setter method is <b>not thread-safe</b>.
-   * 
-   * @param processor
-   *          Default Response Parser chosen to parse the response if the parser
-   *          were not specified as part of the request.
-   * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser()
-   */
-  public void setParser(ResponseParser processor) {
-    lbClient.setParser(processor);
-  }
-  
-  public RequestWriter getRequestWriter() {
-    return lbClient.getRequestWriter();
-  }
-  
-  public void setRequestWriter(RequestWriter requestWriter) {
-    lbClient.setRequestWriter(requestWriter);
-  }
-
-  /**
-   * @return the zkHost value used to connect to zookeeper.
-   */
-  public String getZkHost() {
-    return assertZKStateProvider().zkHost;
-  }
-
-  public ZkStateReader getZkStateReader() {
-    if (stateProvider instanceof ZkClientClusterStateProvider) {
-      ZkClientClusterStateProvider provider = (ZkClientClusterStateProvider) stateProvider;
-      stateProvider.connect();
-      return provider.zkStateReader;
-    }
-    throw new IllegalStateException("This has no Zk stateReader");
-  }
-
-  /**
-   * @param idField the field to route documents on.
-   */
-  public void setIdField(String idField) {
-    this.idField = idField;
-  }
-
-  /**
-   * @return the field that updates are routed on.
-   */
-  public String getIdField() {
-    return idField;
-  }
-  
-  /** Sets the default collection for request */
-  public void setDefaultCollection(String collection) {
-    this.defaultCollection = collection;
-  }
-
-  /** Gets the default collection for request */
-  public String getDefaultCollection() {
-    return defaultCollection;
-  }
-
-  /** Set the connect timeout to the zookeeper ensemble in ms */
-  public void setZkConnectTimeout(int zkConnectTimeout) {
-    assertZKStateProvider().zkConnectTimeout = zkConnectTimeout;
-  }
-
-  /** Set the timeout to the zookeeper ensemble in ms */
-  public void setZkClientTimeout(int zkClientTimeout) {
-    assertZKStateProvider().zkClientTimeout = zkClientTimeout;
+  protected Map<String, LBHttpSolrClient.Req> createRoutes(UpdateRequest updateRequest, ModifiableSolrParams routableParams,
+                                                       DocCollection col, DocRouter router, Map<String, List<String>> urlMap,
+                                                       String idField) {
+    return urlMap == null ? null : updateRequest.getRoutes(router, col, urlMap, routableParams, idField);
   }
 
-  /** Gets whether direct updates are sent in parallel */
-  public boolean isParallelUpdates() {
-    return parallelUpdates;
+  protected RouteException getRouteException(SolrException.ErrorCode serverError, NamedList<Throwable> exceptions, Map<String, ? extends LBSolrClient.Req> routes) {
+    return new RouteException(serverError, exceptions, routes);
   }
 
   /** @deprecated since 7.2  Use {@link Builder} methods instead. */
@@ -391,742 +132,30 @@ public class CloudSolrClient extends SolrClient {
   }
 
   /**
-   * Connect to the zookeeper ensemble.
-   * This is an optional method that may be used to force a connect before any other requests are sent.
-   *
-   */
-  public void connect() {
-    stateProvider.connect();
-  }
-
-  /**
-   * Connect to a cluster.  If the cluster is not ready, retry connection up to a given timeout.
-   * @param duration the timeout
-   * @param timeUnit the units of the timeout
-   * @throws TimeoutException if the cluster is not ready after the timeout
-   * @throws InterruptedException if the wait is interrupted
-   */
-  public void connect(long duration, TimeUnit timeUnit) throws TimeoutException, InterruptedException {
-    log.info("Waiting for {} {} for cluster at {} to be ready", duration, timeUnit, stateProvider);
-    long timeout = System.nanoTime() + timeUnit.toNanos(duration);
-    while (System.nanoTime() < timeout) {
-      try {
-        connect();
-        log.info("Cluster at {} ready", stateProvider);
-        return;
-      }
-      catch (RuntimeException e) {
-        // not ready yet, then...
-      }
-      TimeUnit.MILLISECONDS.sleep(250);
-    }
-    throw new TimeoutException("Timed out waiting for cluster");
-  }
-
-  private ZkClientClusterStateProvider assertZKStateProvider() {
-    if (stateProvider instanceof ZkClientClusterStateProvider) {
-      return (ZkClientClusterStateProvider) stateProvider;
-    }
-    throw new IllegalArgumentException("This client does not use ZK");
-
-  }
-
-  /**
-   * Block until a collection state matches a predicate, or a timeout
-   *
-   * Note that the predicate may be called again even after it has returned true, so
-   * implementors should avoid changing state within the predicate call itself.
-   *
-   * @param collection the collection to watch
-   * @param wait       how long to wait
-   * @param unit       the units of the wait parameter
-   * @param predicate  a {@link CollectionStatePredicate} to check the collection state
-   * @throws InterruptedException on interrupt
-   * @throws TimeoutException     on timeout
+   * @deprecated since Solr 8.0
    */
-  public void waitForState(String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
-      throws InterruptedException, TimeoutException {
-    stateProvider.connect();
-    assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, predicate);
+  @Deprecated
+  public RouteResponse condenseResponse(NamedList response, int timeMillis) {
+    return condenseResponse(response, timeMillis, RouteResponse::new);
   }
 
   /**
-   * Register a CollectionStateWatcher to be called when the cluster state for a collection changes
-   *
-   * Note that the watcher is unregistered after it has been called once.  To make a watcher persistent,
-   * it should re-register itself in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)}
-   * call
-   *
-   * @param collection the collection to watch
-   * @param watcher    a watcher that will be called when the state changes
+   * @deprecated since Solr 8.0
    */
-  public void registerCollectionStateWatcher(String collection, CollectionStateWatcher watcher) {
-    stateProvider.connect();
-    assertZKStateProvider().zkStateReader.registerCollectionStateWatcher(collection, watcher);
-  }
-
-  private NamedList<Object> directUpdate(AbstractUpdateRequest request, String collection) throws SolrServerException {
-    UpdateRequest updateRequest = (UpdateRequest) request;
-    SolrParams params = request.getParams();
-    ModifiableSolrParams routableParams = new ModifiableSolrParams();
-    ModifiableSolrParams nonRoutableParams = new ModifiableSolrParams();
-
-    if(params != null) {
-      nonRoutableParams.add(params);
-      routableParams.add(params);
-      for(String param : NON_ROUTABLE_PARAMS) {
-        routableParams.remove(param);
-      }
-    }
-
-    if (collection == null) {
-      throw new SolrServerException("No collection param specified on request and no default collection has been set.");
-    }
-
-    //Check to see if the collection is an alias.
-    List<String> aliasedCollections = stateProvider.resolveAlias(collection);
-    collection = aliasedCollections.get(0); // pick 1st (consistent with HttpSolrCall behavior)
-
-    DocCollection col = getDocCollection(collection, null);
-
-    DocRouter router = col.getRouter();
-    
-    if (router instanceof ImplicitDocRouter) {
-      // short circuit as optimization
-      return null;
-    }
-
-    //Create the URL map, which is keyed on slice name.
-    //The value is a list of URLs for each replica in the slice.
-    //The first value in the list is the leader for the slice.
-    final Map<String,List<String>> urlMap = buildUrlMap(col);
-    final Map<String, LBHttpSolrClient.Req> routes = (urlMap == null ? null : updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField));
-    if (routes == null) {
-      if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(updateRequest, idField)) {
-          // we have info (documents with ids and/or ids to delete) with
-          // which to find the leaders but we could not find (all of) them
-          throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
-              "directUpdatesToLeadersOnly==true but could not find leader(s)");
-      } else {
-        // we could not find a leader or routes yet - use unoptimized general path
-        return null;
-      }
-    }
-
-    final NamedList<Throwable> exceptions = new NamedList<>();
-    final NamedList<NamedList> shardResponses = new NamedList<>(routes.size()+1); // +1 for deleteQuery
-
-    long start = System.nanoTime();
-
-    if (parallelUpdates) {
-      final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<>(routes.size());
-      for (final Map.Entry<String, LBHttpSolrClient.Req> entry : routes.entrySet()) {
-        final String url = entry.getKey();
-        final LBHttpSolrClient.Req lbRequest = entry.getValue();
-        try {
-          MDC.put("CloudSolrClient.url", url);
-          responseFutures.put(url, threadPool.submit(() -> lbClient.request(lbRequest).getResponse()));
-        } finally {
-          MDC.remove("CloudSolrClient.url");
-        }
-      }
-
-      for (final Map.Entry<String, Future<NamedList<?>>> entry: responseFutures.entrySet()) {
-        final String url = entry.getKey();
-        final Future<NamedList<?>> responseFuture = entry.getValue();
-        try {
-          shardResponses.add(url, responseFuture.get());
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
-        } catch (ExecutionException e) {
-          exceptions.add(url, e.getCause());
-        }
-      }
-
-      if (exceptions.size() > 0) {
-        Throwable firstException = exceptions.getVal(0);
-        if(firstException instanceof SolrException) {
-          SolrException e = (SolrException) firstException;
-          throw new RouteException(ErrorCode.getErrorCode(e.code()), exceptions, routes);
-        } else {
-          throw new RouteException(ErrorCode.SERVER_ERROR, exceptions, routes);
-        }
-      }
-    } else {
-      for (Map.Entry<String, LBHttpSolrClient.Req> entry : routes.entrySet()) {
-        String url = entry.getKey();
-        LBHttpSolrClient.Req lbRequest = entry.getValue();
-        try {
-          NamedList<Object> rsp = lbClient.request(lbRequest).getResponse();
-          shardResponses.add(url, rsp);
-        } catch (Exception e) {
-          if(e instanceof SolrException) {
-            throw (SolrException) e;
-          } else {
-            throw new SolrServerException(e);
-          }
-        }
-      }
-    }
-
-    UpdateRequest nonRoutableRequest = null;
-    List<String> deleteQuery = updateRequest.getDeleteQuery();
-    if (deleteQuery != null && deleteQuery.size() > 0) {
-      UpdateRequest deleteQueryRequest = new UpdateRequest();
-      deleteQueryRequest.setDeleteQuery(deleteQuery);
-      nonRoutableRequest = deleteQueryRequest;
-    }
-    
-    Set<String> paramNames = nonRoutableParams.getParameterNames();
-    
-    Set<String> intersection = new HashSet<>(paramNames);
-    intersection.retainAll(NON_ROUTABLE_PARAMS);
-    
-    if (nonRoutableRequest != null || intersection.size() > 0) {
-      if (nonRoutableRequest == null) {
-        nonRoutableRequest = new UpdateRequest();
-      }
-      nonRoutableRequest.setParams(nonRoutableParams);
-      nonRoutableRequest.setBasicAuthCredentials(request.getBasicAuthUser(), request.getBasicAuthPassword());
-      List<String> urlList = new ArrayList<>();
-      urlList.addAll(routes.keySet());
-      Collections.shuffle(urlList, rand);
-      LBHttpSolrClient.Req req = new LBHttpSolrClient.Req(nonRoutableRequest, urlList);
-      try {
-        LBHttpSolrClient.Rsp rsp = lbClient.request(req);
-        shardResponses.add(urlList.get(0), rsp.getResponse());
-      } catch (Exception e) {
-        throw new SolrException(ErrorCode.SERVER_ERROR, urlList.get(0), e);
-      }
-    }
-
-    long end = System.nanoTime();
-
-    RouteResponse rr = condenseResponse(shardResponses, (int) TimeUnit.MILLISECONDS.convert(end - start, TimeUnit.NANOSECONDS));
-    rr.setRouteResponses(shardResponses);
-    rr.setRoutes(routes);
-    return rr;
-  }
-
-  private Map<String,List<String>> buildUrlMap(DocCollection col) {
-    Map<String, List<String>> urlMap = new HashMap<>();
-    Slice[] slices = col.getActiveSlicesArr();
-    for (Slice slice : slices) {
-      String name = slice.getName();
-      List<String> urls = new ArrayList<>();
-      Replica leader = slice.getLeader();
-      if (directUpdatesToLeadersOnly && leader == null) {
-        for (Replica replica : slice.getReplicas(
-            replica -> replica.isActive(getClusterStateProvider().getLiveNodes())
-                && replica.getType() == Replica.Type.NRT)) {
-          leader = replica;
-          break;
-        }
-      }
-      if (leader == null) {
-        if (directUpdatesToLeadersOnly) {
-          continue;
-        }
-        // take unoptimized general path - we cannot find a leader yet
-        return null;
-      }
-      ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
-      String url = zkProps.getCoreUrl();
-      urls.add(url);
-      if (!directUpdatesToLeadersOnly) {
-        for (Replica replica : slice.getReplicas()) {
-          if (!replica.getNodeName().equals(leader.getNodeName()) &&
-              !replica.getName().equals(leader.getName())) {
-            ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
-            String url1 = zkProps1.getCoreUrl();
-            urls.add(url1);
-          }
-        }
-      }
-      urlMap.put(name, urls);
-    }
-    return urlMap;
-  }
-
-  public RouteResponse condenseResponse(NamedList response, int timeMillis) {
-    RouteResponse condensed = new RouteResponse();
-    int status = 0;
-    Integer rf = null;
-    Integer minRf = null;
-    
-    // TolerantUpdateProcessor
-    List<SimpleOrderedMap<String>> toleratedErrors = null; 
-    int maxToleratedErrors = Integer.MAX_VALUE;
-
-    // For "adds", "deletes", "deleteByQuery" etc.
-    Map<String, NamedList> versions = new HashMap<>();
-
-    for(int i=0; i<response.size(); i++) {
-      NamedList shardResponse = (NamedList)response.getVal(i);
-      NamedList header = (NamedList)shardResponse.get("responseHeader");      
-      Integer shardStatus = (Integer)header.get("status");
-      int s = shardStatus.intValue();
-      if(s > 0) {
-          status = s;
-      }
-      Object rfObj = header.get(UpdateRequest.REPFACT);
-      if (rfObj != null && rfObj instanceof Integer) {
-        Integer routeRf = (Integer)rfObj;
-        if (rf == null || routeRf < rf)
-          rf = routeRf;
-      }
-      minRf = (Integer)header.get(UpdateRequest.MIN_REPFACT);
-
-      List<SimpleOrderedMap<String>> shardTolerantErrors = 
-        (List<SimpleOrderedMap<String>>) header.get("errors");
-      if (null != shardTolerantErrors) {
-        Integer shardMaxToleratedErrors = (Integer) header.get("maxErrors");
-        assert null != shardMaxToleratedErrors : "TolerantUpdateProcessor reported errors but not maxErrors";
-        // if we get into some weird state where the nodes disagree about the effective maxErrors,
-        // assume the min value seen to decide if we should fail.
-        maxToleratedErrors = Math.min(maxToleratedErrors,
-                                      ToleratedUpdateError.getEffectiveMaxErrors(shardMaxToleratedErrors.intValue()));
-        
-        if (null == toleratedErrors) {
-          toleratedErrors = new ArrayList<SimpleOrderedMap<String>>(shardTolerantErrors.size());
-        }
-        for (SimpleOrderedMap<String> err : shardTolerantErrors) {
-          toleratedErrors.add(err);
-        }
-      }
-      for (String updateType: Arrays.asList("adds", "deletes", "deleteByQuery")) {
-        Object obj = shardResponse.get(updateType);
-        if (obj instanceof NamedList) {
-          NamedList versionsList = versions.containsKey(updateType) ?
-              versions.get(updateType): new NamedList();
-          versionsList.addAll((NamedList)obj);
-          versions.put(updateType, versionsList);
-        }
-      }
-    }
-
-    NamedList cheader = new NamedList();
-    cheader.add("status", status);
-    cheader.add("QTime", timeMillis);
-    if (rf != null)
-      cheader.add(UpdateRequest.REPFACT, rf);
-    if (minRf != null)
-      cheader.add(UpdateRequest.MIN_REPFACT, minRf);
-    if (null != toleratedErrors) {
-      cheader.add("maxErrors", ToleratedUpdateError.getUserFriendlyMaxErrors(maxToleratedErrors));
-      cheader.add("errors", toleratedErrors);
-      if (maxToleratedErrors < toleratedErrors.size()) {
-        // cumulative errors are too high, we need to throw a client exception w/correct metadata
-
-        // NOTE: it shouldn't be possible for 1 == toleratedErrors.size(), because if that were the case
-        // then at least one shard should have thrown a real error before this, so we don't worry
-        // about having a more "singular" exception msg for that situation
-        StringBuilder msgBuf =  new StringBuilder()
-          .append(toleratedErrors.size()).append(" Async failures during distributed update: ");
-          
-        NamedList metadata = new NamedList<String>();
-        for (SimpleOrderedMap<String> err : toleratedErrors) {
-          ToleratedUpdateError te = ToleratedUpdateError.parseMap(err);
-          metadata.add(te.getMetadataKey(), te.getMetadataValue());
-          
-          msgBuf.append("\n").append(te.getMessage());
-        }
-        
-        SolrException toThrow = new SolrException(ErrorCode.BAD_REQUEST, msgBuf.toString());
-        toThrow.setMetadata(metadata);
-        throw toThrow;
-      }
-    }
-    for (String updateType: versions.keySet()) {
-      condensed.add(updateType, versions.get(updateType));
-    }
-    condensed.add("responseHeader", cheader);
-    return condensed;
-  }
-
-  public static class RouteResponse extends NamedList {
-    private NamedList routeResponses;
-    private Map<String, LBHttpSolrClient.Req> routes;
-
-    public void setRouteResponses(NamedList routeResponses) {
-      this.routeResponses = routeResponses;
-    }
-
-    public NamedList getRouteResponses() {
-      return routeResponses;
-    }
-
-    public void setRoutes(Map<String, LBHttpSolrClient.Req> routes) {
-      this.routes = routes;
-    }
-
-    public Map<String, LBHttpSolrClient.Req> getRoutes() {
-      return routes;
-    }
-
-  }
-
-  public static class RouteException extends SolrException {
-
-    private NamedList<Throwable> throwables;
-    private Map<String, LBHttpSolrClient.Req> routes;
-
-    public RouteException(ErrorCode errorCode, NamedList<Throwable> throwables, Map<String, LBHttpSolrClient.Req> routes){
-      super(errorCode, throwables.getVal(0).getMessage(), throwables.getVal(0));
-      this.throwables = throwables;
-      this.routes = routes;
-
-      // create a merged copy of the metadata from all wrapped exceptions
-      NamedList<String> metadata = new NamedList<String>();
-      for (int i = 0; i < throwables.size(); i++) {
-        Throwable t = throwables.getVal(i);
-        if (t instanceof SolrException) {
-          SolrException e = (SolrException) t;
-          NamedList<String> eMeta = e.getMetadata();
-          if (null != eMeta) {
-            metadata.addAll(eMeta);
-          }
-        }
-      }
-      if (0 < metadata.size()) {
-        this.setMetadata(metadata);
-      }
-    }
-
-    public NamedList<Throwable> getThrowables() {
-      return throwables;
-    }
-
-    public Map<String, LBHttpSolrClient.Req> getRoutes() {
-      return this.routes;
-    }
-  }
+  @Deprecated
+  public static class RouteResponse extends BaseCloudSolrClient.RouteResponse<LBHttpSolrClient.Req> {
 
-  @Override
-  public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
-    // the collection parameter of the request overrides that of the parameter to this method
-    String requestCollection = request.getCollection();
-    if (requestCollection != null) {
-      collection = requestCollection;
-    } else if (collection == null) {
-      collection = defaultCollection;
-    }
-    List<String> inputCollections =
-        collection == null ? Collections.emptyList() : StrUtils.splitSmart(collection, ",", true);
-    return requestWithRetryOnStaleState(request, 0, inputCollections);
   }
 
   /**
-   * As this class doesn't watch external collections on the client side,
-   * there's a chance that the request will fail due to cached stale state,
-   * which means the state must be refreshed from ZK and retried.
+   * @deprecated since Solr 8.0
    */
-  protected NamedList<Object> requestWithRetryOnStaleState(SolrRequest request, int retryCount, List<String> inputCollections)
-      throws SolrServerException, IOException {
-    connect(); // important to call this before you start working with the ZkStateReader
-
-    // build up a _stateVer_ param to pass to the server containing all of the
-    // external collection state versions involved in this request, which allows
-    // the server to notify us that our cached state for one or more of the external
-    // collections is stale and needs to be refreshed ... this code has no impact on internal collections
-    String stateVerParam = null;
-    List<DocCollection> requestedCollections = null;
-    boolean isCollectionRequestOfV2 = false;
-    if (request instanceof V2RequestSupport) {
-      request = ((V2RequestSupport) request).getV2Request();
-    }
-    if (request instanceof V2Request) {
-      isCollectionRequestOfV2 = ((V2Request) request).isPerCollectionRequest();
-    }
-    boolean isAdmin = ADMIN_PATHS.contains(request.getPath());
-    if (!inputCollections.isEmpty() && !isAdmin && !isCollectionRequestOfV2) { // don't do _stateVer_ checking for admin, v2 api requests
-      Set<String> requestedCollectionNames = resolveAliases(inputCollections);
-
-      StringBuilder stateVerParamBuilder = null;
-      for (String requestedCollection : requestedCollectionNames) {
-        // track the version of state we're using on the client side using the _stateVer_ param
-        DocCollection coll = getDocCollection(requestedCollection, null);
-        if (coll == null) {
-          throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + requestedCollection);
-        }
-        int collVer = coll.getZNodeVersion();
-        if (coll.getStateFormat()>1) {
-          if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
-          requestedCollections.add(coll);
-
-          if (stateVerParamBuilder == null) {
-            stateVerParamBuilder = new StringBuilder();
-          } else {
-            stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
-          }
-
-          stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
-        }
-      }
-
-      if (stateVerParamBuilder != null) {
-        stateVerParam = stateVerParamBuilder.toString();
-      }
-    }
-
-    if (request.getParams() instanceof ModifiableSolrParams) {
-      ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
-      if (stateVerParam != null) {
-        params.set(STATE_VERSION, stateVerParam);
-      } else {
-        params.remove(STATE_VERSION);
-      }
-    } // else: ??? how to set this ???
-
-    NamedList<Object> resp = null;
-    try {
-      resp = sendRequest(request, inputCollections);
-      //to avoid an O(n) operation we always add STATE_VERSION to the last and try to read it from there
-      Object o = resp == null || resp.size() == 0 ? null : resp.get(STATE_VERSION, resp.size() - 1);
-      if(o != null && o instanceof Map) {
-        //remove this because no one else needs this and tests would fail if they are comparing responses
-        resp.remove(resp.size()-1);
-        Map invalidStates = (Map) o;
-        for (Object invalidEntries : invalidStates.entrySet()) {
-          Map.Entry e = (Map.Entry) invalidEntries;
-          getDocCollection((String) e.getKey(), (Integer) e.getValue());
-        }
-
-      }
-    } catch (Exception exc) {
-
-      Throwable rootCause = SolrException.getRootCause(exc);
-      // don't do retry support for admin requests
-      // or if the request doesn't have a collection specified
-      // or request is v2 api and its method is not GET
-      if (inputCollections.isEmpty() || isAdmin || (request instanceof V2Request && request.getMethod() != SolrRequest.METHOD.GET)) {
-        if (exc instanceof SolrServerException) {
-          throw (SolrServerException)exc;
-        } else if (exc instanceof IOException) {
-          throw (IOException)exc;
-        }else if (exc instanceof RuntimeException) {
-          throw (RuntimeException) exc;
-        }
-        else {
-          throw new SolrServerException(rootCause);
-        }
-      }
-
-      int errorCode = (rootCause instanceof SolrException) ?
-          ((SolrException)rootCause).code() : SolrException.ErrorCode.UNKNOWN.code;
-
-          boolean wasCommError =
-              (rootCause instanceof ConnectException ||
-                  rootCause instanceof ConnectTimeoutException ||
-                  rootCause instanceof NoHttpResponseException ||
-                  rootCause instanceof SocketException);
-          
-      log.error("Request to collection {} failed due to (" + errorCode + ") {}, retry={} commError={} errorCode={} ",
-          inputCollections, rootCause.toString(), retryCount, wasCommError, errorCode);
-
-      if (wasCommError
-          || (exc instanceof RouteException && (errorCode == 503)) // 404 because the core does not exist 503 service unavailable
-          //TODO there are other reasons for 404. We need to change the solr response format from HTML to structured data to know that
-          ) {
-        // it was a communication error. it is likely that
-        // the node to which the request to be sent is down . So , expire the state
-        // so that the next attempt would fetch the fresh state
-        // just re-read state for all of them, if it has not been retried
-        // in retryExpiryTime time
-        if (requestedCollections != null) {
-          for (DocCollection ext : requestedCollections) {
-            ExpiringCachedDocCollection cacheEntry = collectionStateCache.get(ext.getName());
-            if (cacheEntry == null) continue;
-            cacheEntry.maybeStale = true;
-          }
-        }
-        if (retryCount < MAX_STALE_RETRIES) {//if it is a communication error , we must try again
-          //may be, we have a stale version of the collection state
-          // and we could not get any information from the server
-          //it is probably not worth trying again and again because
-          // the state would not have been updated
-          log.info("trying request again");
-          return requestWithRetryOnStaleState(request, retryCount + 1, inputCollections);
-        }
-      } else {
-        log.info("request was not communication error it seems");
-      }
-
-      boolean stateWasStale = false;
-      if (retryCount < MAX_STALE_RETRIES  &&
-          requestedCollections != null    &&
-          !requestedCollections.isEmpty() &&
-          (SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE || errorCode == 404))
-      {
-        // cached state for one or more external collections was stale
-        // re-issue request using updated state
-        stateWasStale = true;
-
-        // just re-read state for all of them, which is a little heavy handed but hopefully a rare occurrence
-        for (DocCollection ext : requestedCollections) {
-          collectionStateCache.remove(ext.getName());
-        }
-      }
-
-      // if we experienced a communication error, it's worth checking the state
-      // with ZK just to make sure the node we're trying to hit is still part of the collection
-      if (retryCount < MAX_STALE_RETRIES &&
-          !stateWasStale &&
-          requestedCollections != null &&
-          !requestedCollections.isEmpty() &&
-          wasCommError) {
-        for (DocCollection ext : requestedCollections) {
-          DocCollection latestStateFromZk = getDocCollection(ext.getName(), null);
-          if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
-            // looks like we couldn't reach the server because the state was stale == retry
-            stateWasStale = true;
-            // we just pulled state from ZK, so update the cache so that the retry uses it
-            collectionStateCache.put(ext.getName(), new ExpiringCachedDocCollection(latestStateFromZk));
-          }
-        }
-      }
-
-      if (requestedCollections != null) {
-        requestedCollections.clear(); // done with this
-      }
-
-      // if the state was stale, then we retry the request once with new state pulled from Zk
-      if (stateWasStale) {
-        log.warn("Re-trying request to collection(s) "+inputCollections+" after stale state error from server.");
-        resp = requestWithRetryOnStaleState(request, retryCount+1, inputCollections);
-      } else {
-        if (exc instanceof SolrException || exc instanceof SolrServerException || exc instanceof IOException) {
-          throw exc;
-        } else {
-          throw new SolrServerException(rootCause);
-        }
-      }
-    }
-
-    return resp;
-  }
-
-  protected NamedList<Object> sendRequest(SolrRequest request, List<String> inputCollections)
-      throws SolrServerException, IOException {
-    connect();
-
-    boolean sendToLeaders = false;
-
-    if (request instanceof IsUpdateRequest) {
-      if (request instanceof UpdateRequest) {
-        String collection = inputCollections.isEmpty() ? null : inputCollections.get(0); // getting first mimics HttpSolrCall
-        NamedList<Object> response = directUpdate((AbstractUpdateRequest) request, collection);
-        if (response != null) {
-          return response;
-        }
-      }
-      sendToLeaders = true;
-    }
-    
-    SolrParams reqParams = request.getParams();
-    if (reqParams == null) { // TODO fix getParams to never return null!
-      reqParams = new ModifiableSolrParams();
-    }
-
-    final Set<String> liveNodes = stateProvider.getLiveNodes();
-
-    final List<String> theUrlList = new ArrayList<>(); // we populate this as follows...
-
-    if (request instanceof V2Request) {
-      if (!liveNodes.isEmpty()) {
-        List<String> liveNodesList = new ArrayList<>(liveNodes);
-        Collections.shuffle(liveNodesList, rand);
-        theUrlList.add(Utils.getBaseUrlForNodeName(liveNodesList.get(0),
-            (String) stateProvider.getClusterProperty(ZkStateReader.URL_SCHEME,"http")));
-      }
-
-    } else if (ADMIN_PATHS.contains(request.getPath())) {
-      for (String liveNode : liveNodes) {
-        theUrlList.add(Utils.getBaseUrlForNodeName(liveNode,
-            (String) stateProvider.getClusterProperty(ZkStateReader.URL_SCHEME,"http")));
-      }
-
-    } else { // Typical...
-      Set<String> collectionNames = resolveAliases(inputCollections);
-      if (collectionNames.isEmpty()) {
-        throw new SolrException(ErrorCode.BAD_REQUEST,
-            "No collection param specified on request and no default collection has been set: " + inputCollections);
-      }
-
-      // TODO: not a big deal because of the caching, but we could avoid looking
-      //   at every shard when getting leaders if we tweaked some things
-      
-      // Retrieve slices from the cloud state and, for each collection specified, add it to the Map of slices.
-      Map<String,Slice> slices = new HashMap<>();
-      String shardKeys = reqParams.get(ShardParams._ROUTE_);
-      for (String collectionName : collectionNames) {
-        DocCollection col = getDocCollection(collectionName, null);
-        if (col == null) {
-          throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
-        }
-        Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col);
-        ClientUtils.addSlices(slices, collectionName, routeSlices, true);
-      }
-
-      // Gather URLs, grouped by leader or replica
-      // TODO: allow filtering by group, role, etc
-      Set<String> seenNodes = new HashSet<>();
-      List<String> replicas = new ArrayList<>();
-      String joinedInputCollections = StrUtils.join(inputCollections, ',');
-      for (Slice slice : slices.values()) {
-        for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) {
-          ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
-          String node = coreNodeProps.getNodeName();
-          if (!liveNodes.contains(node) // Must be a live node to continue
-              || Replica.State.getState(coreNodeProps.getState()) != Replica.State.ACTIVE) // Must be an ACTIVE replica to continue
-            continue;
-          if (seenNodes.add(node)) { // if we haven't yet collected a URL to this node...
-            String url = ZkCoreNodeProps.getCoreUrl(nodeProps.getStr(ZkStateReader.BASE_URL_PROP), joinedInputCollections);
-            if (sendToLeaders && coreNodeProps.isLeader()) {
-              theUrlList.add(url); // put leaders here eagerly (if sendToLeader mode)
-            } else {
-              replicas.add(url); // replicas here
-            }
-          }
-        }
-      }
-
-      // Shuffle the leaders, if any    (none if !sendToLeaders)
-      Collections.shuffle(theUrlList, rand);
-
-      // Shuffle the replicas, if any, and append to our list
-      Collections.shuffle(replicas, rand);
-      theUrlList.addAll(replicas);
+  @Deprecated
+  public static class RouteException extends BaseCloudSolrClient.RouteException {
 
-      if (theUrlList.isEmpty()) {
-        collectionStateCache.keySet().removeAll(collectionNames);
-        throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
-            "Could not find a healthy node to handle the request.");
-      }
+    public RouteException(ErrorCode errorCode, NamedList<Throwable> throwables, Map<String, ? extends LBSolrClient.Req> routes) {
+      super(errorCode, throwables, routes);
     }
-
-    LBHttpSolrClient.Req req = new LBHttpSolrClient.Req(request, theUrlList);
-    LBHttpSolrClient.Rsp rsp = lbClient.request(req);
-    return rsp.getResponse();
-  }
-
-  /** Resolves the input collections to their possible aliased collections. Doesn't validate collection existence. */
-  private LinkedHashSet<String> resolveAliases(List<String> inputCollections) {
-    LinkedHashSet<String> collectionNames = new LinkedHashSet<>(); // consistent ordering
-    for (String collectionName : inputCollections) {
-      if (stateProvider.getState(collectionName) == null) {
-        // perhaps it's an alias
-        List<String> aliasedCollections = stateProvider.resolveAlias(collectionName);
-        // one more level of alias indirection...  (dubious that we should support this)
-        for (String aliasedCollection : aliasedCollections) {
-          collectionNames.addAll(stateProvider.resolveAlias(aliasedCollection));
-        }
-      } else {
-        collectionNames.add(collectionName); // it's a collection
-      }
-    }
-    return collectionNames;
   }
 
   @Override
@@ -1141,9 +170,7 @@ public class CloudSolrClient extends SolrClient {
       HttpClientUtil.close(myClient);
     }
 
-    if(this.threadPool != null && !this.threadPool.isShutdown()) {
-      this.threadPool.shutdown();
-    }
+    super.close();
   }
 
   public LBHttpSolrClient getLbClient() {
@@ -1153,150 +180,8 @@ public class CloudSolrClient extends SolrClient {
   public HttpClient getHttpClient() {
     return myClient;
   }
-  
-  public boolean isUpdatesToLeaders() {
-    return updatesToLeaders;
-  }
 
   /**
-   * @return true if direct updates are sent to shard leaders only
-   */
-  public boolean isDirectUpdatesToLeadersOnly() {
-    return directUpdatesToLeadersOnly;
-  }
-
-  /**If caches are expired they are refreshed after acquiring a lock.
-   * use this to set the number of locks
-   */
-  public void setParallelCacheRefreshes(int n){ locks = objectList(n); }
-
-  private static ArrayList<Object> objectList(int n) {
-    ArrayList<Object> l =  new ArrayList<>(n);
-    for(int i=0;i<n;i++) l.add(new Object());
-    return l;
-  }
-
-
-  protected DocCollection getDocCollection(String collection, Integer expectedVersion) throws SolrException {
-    if (expectedVersion == null) expectedVersion = -1;
-    if (collection == null) return null;
-    ExpiringCachedDocCollection cacheEntry = collectionStateCache.get(collection);
-    DocCollection col = cacheEntry == null ? null : cacheEntry.cached;
-    if (col != null) {
-      if (expectedVersion <= col.getZNodeVersion()
-          && !cacheEntry.shouldRetry()) return col;
-    }
-
-    CollectionRef ref = getCollectionRef(collection);
-    if (ref == null) {
-      //no such collection exists
-      return null;
-    }
-    if (!ref.isLazilyLoaded()) {
-      //it is readily available just return it
-      return ref.get();
-    }
-    List locks = this.locks;
-    final Object lock = locks.get(Math.abs(Hash.murmurhash3_x86_32(collection, 0, collection.length(), 0) % locks.size()));
-    DocCollection fetchedCol = null;
-    synchronized (lock) {
-      /*we have waited for sometime just check once again*/
-      cacheEntry = collectionStateCache.get(collection);
-      col = cacheEntry == null ? null : cacheEntry.cached;
-      if (col != null) {
-        if (expectedVersion <= col.getZNodeVersion()
-            && !cacheEntry.shouldRetry()) return col;
-      }
-      // We are going to fetch a new version
-      // we MUST try to get a new version
-      fetchedCol = ref.get();//this is a call to ZK
-      if (fetchedCol == null) return null;// this collection no more exists
-      if (col != null && fetchedCol.getZNodeVersion() == col.getZNodeVersion()) {
-        cacheEntry.setRetriedAt();//we retried and found that it is the same version
-        cacheEntry.maybeStale = false;
-      } else {
-        if (fetchedCol.getStateFormat() > 1)
-          collectionStateCache.put(collection, new ExpiringCachedDocCollection(fetchedCol));
-      }
-      return fetchedCol;
-    }
-  }
-
-  CollectionRef getCollectionRef(String collection) {
-    return stateProvider.getState(collection);
-  }
-
-  /**
-   * Useful for determining the minimum achieved replication factor across
-   * all shards involved in processing an update request, typically useful
-   * for gauging the replication factor of a batch. 
-   */
-  @SuppressWarnings("rawtypes")
-  public int getMinAchievedReplicationFactor(String collection, NamedList resp) {
-    // it's probably already on the top-level header set by condense
-    NamedList header = (NamedList)resp.get("responseHeader");
-    Integer achRf = (Integer)header.get(UpdateRequest.REPFACT);
-    if (achRf != null)
-      return achRf.intValue();
-
-    // not on the top-level header, walk the shard route tree
-    Map<String,Integer> shardRf = getShardReplicationFactor(collection, resp);
-    for (Integer rf : shardRf.values()) {
-      if (achRf == null || rf < achRf) {
-        achRf = rf;
-      }
-    }    
-    return (achRf != null) ? achRf.intValue() : -1;
-  }
-  
-  /**
-   * Walks the NamedList response after performing an update request looking for
-   * the replication factor that was achieved in each shard involved in the request.
-   * For single doc updates, there will be only one shard in the return value. 
-   */
-  @SuppressWarnings("rawtypes")
-  public Map<String,Integer> getShardReplicationFactor(String collection, NamedList resp) {
-    connect();
-    
-    Map<String,Integer> results = new HashMap<String,Integer>();
-    if (resp instanceof CloudSolrClient.RouteResponse) {
-      NamedList routes = ((CloudSolrClient.RouteResponse)resp).getRouteResponses();
-      DocCollection coll = getDocCollection(collection, null);
-      Map<String,String> leaders = new HashMap<String,String>();
-      for (Slice slice : coll.getActiveSlicesArr()) {
-        Replica leader = slice.getLeader();
-        if (leader != null) {
-          ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
-          String leaderUrl = zkProps.getBaseUrl() + "/" + zkProps.getCoreName();
-          leaders.put(leaderUrl, slice.getName());
-          String altLeaderUrl = zkProps.getBaseUrl() + "/" + collection;
-          leaders.put(altLeaderUrl, slice.getName());
-        }
-      }
-      
-      Iterator<Map.Entry<String,Object>> routeIter = routes.iterator();
-      while (routeIter.hasNext()) {
-        Map.Entry<String,Object> next = routeIter.next();
-        String host = next.getKey();
-        NamedList hostResp = (NamedList)next.getValue();
-        Integer rf = (Integer)((NamedList)hostResp.get("responseHeader")).get(UpdateRequest.REPFACT);
-        if (rf != null) {
-          String shard = leaders.get(host);
-          if (shard == null) {
-            if (host.endsWith("/"))
-              shard = leaders.get(host.substring(0,host.length()-1));
-            if (shard == null) {
-              shard = host;
-            }
-          }
-          results.put(shard, rf);
-        }
-      }
-    }    
-    return results;
-  }
-  
-  /**
    * @deprecated since 7.0  Use {@link Builder} methods instead. 
    */
   @Deprecated
@@ -1308,29 +193,10 @@ public class CloudSolrClient extends SolrClient {
     return stateProvider;
   }
 
-  private static boolean hasInfoToFindLeaders(UpdateRequest updateRequest, String idField) {
-    final Map<SolrInputDocument,Map<String,Object>> documents = updateRequest.getDocumentsMap();
-    final Map<String,Map<String,Object>> deleteById = updateRequest.getDeleteByIdMap();
-
-    final boolean hasNoDocuments = (documents == null || documents.isEmpty());
-    final boolean hasNoDeleteById = (deleteById == null || deleteById.isEmpty());
-    if (hasNoDocuments && hasNoDeleteById) {
-      // no documents and no delete-by-id, so no info to find leader(s)
-      return false;
-    }
-
-    if (documents != null) {
-      for (final Map.Entry<SolrInputDocument,Map<String,Object>> entry : documents.entrySet()) {
-        final SolrInputDocument doc = entry.getKey();
-        final Object fieldValue = doc.getFieldValue(idField);
-        if (fieldValue == null) {
-          // a document with no id field value, so can't find leader for it
-          return false;
-        }
-      }
-    }
-
-    return true;
+  @Override
+  protected boolean wasCommError(Throwable rootCause) {
+    return rootCause instanceof ConnectTimeoutException ||
+        rootCause instanceof NoHttpResponseException;
   }
 
   private static LBHttpSolrClient createLBHttpSolrClient(Builder cloudSolrClientBuilder, HttpClient httpClient) {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2ClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2ClusterStateProvider.java
new file mode 100644
index 0000000..335684a
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2ClusterStateProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.solr.client.solrj.SolrClient;
+
+public class Http2ClusterStateProvider extends BaseHttpClusterStateProvider {
+  final Http2SolrClient httpClient;
+  final boolean closeClient;
+
+  public Http2ClusterStateProvider(List<String> solrUrls, Http2SolrClient httpClient) throws Exception {
+    this.httpClient = httpClient == null? new Http2SolrClient.Builder().build(): httpClient;
+    this.closeClient = httpClient == null;
+    init(solrUrls);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (this.closeClient && this.httpClient != null) {
+      httpClient.close();
+    }
+  }
+
+  @Override
+  protected SolrClient getSolrClient(String baseUrl) {
+    return new Http2SolrClient.Builder(baseUrl).withHttpClient(httpClient).build();
+  }
+}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 9b60460..580a849 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -93,10 +93,18 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.*;
 import static org.apache.solr.common.util.Utils.getObjectByPath;
 
 // TODO: error handling, small Http2SolrClient features, security, ssl
 /**
+ * Difference between this {@link Http2SolrClient} and {@link HttpSolrClient}:
+ * <ul>
+ *  <li>{@link Http2SolrClient} sends requests in HTTP/2</li>
+ *  <li>{@link Http2SolrClient} can point to multiple urls</li>
+ *  <li>{@link Http2SolrClient} does not expose its internal httpClient like {@link HttpSolrClient#getHttpClient()},
+ * sharing connection pools should be done by {@link Http2SolrClient.Builder#withHttpClient(Http2SolrClient)} </li>
+ * </ul>
  * @lucene.experimental
  */
 public class Http2SolrClient extends SolrClient {
@@ -139,20 +147,12 @@ public class Http2SolrClient extends SolrClient {
     if (builder.idleTimeout != null) idleTimeout = builder.idleTimeout;
     else idleTimeout = HttpClientUtil.DEFAULT_SO_TIMEOUT;
 
-    if (builder.httpClient == null) {
+    if (builder.http2SolrClient == null) {
       httpClient = createHttpClient(builder);
       closeClient = true;
     } else {
-      httpClient = builder.httpClient;
+      httpClient = builder.http2SolrClient.httpClient;
     }
-    if (!httpClient.isStarted()) {
-      try {
-        httpClient.start();
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-
     assert ObjectReleaseTracker.track(this);
   }
 
@@ -160,10 +160,12 @@ public class Http2SolrClient extends SolrClient {
     this.listenerFactory.add(factory);
   }
 
+  // internal usage only
   HttpClient getHttpClient() {
     return httpClient;
   }
 
+  // internal usage only
   ProtocolHandlers getProtocolHandlers() {
     return httpClient.getProtocolHandlers();
   }
@@ -213,6 +215,11 @@ public class Http2SolrClient extends SolrClient {
 
     if (builder.idleTimeout != null) httpClient.setIdleTimeout(builder.idleTimeout);
     if (builder.connectionTimeout != null) httpClient.setConnectTimeout(builder.connectionTimeout);
+    try {
+      httpClient.start();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
     return httpClient;
   }
 
@@ -445,6 +452,7 @@ public class Http2SolrClient extends SolrClient {
   private Request makeRequest(SolrRequest solrRequest, String collection)
       throws SolrServerException, IOException {
     Request req = createRequest(solrRequest, collection);
+    req.header(HttpHeader.ACCEPT_ENCODING, null);
     setListeners(solrRequest, req);
     if (solrRequest.getUserPrincipal() != null) {
       req.attribute(REQ_PRINCIPAL_KEY, solrRequest.getUserPrincipal());
@@ -790,7 +798,7 @@ public class Http2SolrClient extends SolrClient {
 
   public static class Builder {
 
-    private HttpClient httpClient;
+    private Http2SolrClient http2SolrClient;
     private SSLConfig sslConfig = defaultSSLConfig;
     private Integer idleTimeout;
     private Integer connectionTimeout;
@@ -810,8 +818,11 @@ public class Http2SolrClient extends SolrClient {
       return new Http2SolrClient(baseSolrUrl, this);
     }
 
-    public Builder withHttpClient(HttpClient httpClient) {
-      this.httpClient = httpClient;
+    /**
+     * Reuse {@code httpClient} connections pool
+     */
+    public Builder withHttpClient(Http2SolrClient httpClient) {
+      this.http2SolrClient = httpClient;
       return this;
     }
 
@@ -845,52 +856,6 @@ public class Http2SolrClient extends SolrClient {
 
   }
 
-  /**
-   * Subclass of SolrException that allows us to capture an arbitrary HTTP status code that may have been returned by
-   * the remote server or a proxy along the way.
-   */
-  public static class RemoteSolrException extends SolrException {
-    /**
-     * @param remoteHost the host the error was received from
-     * @param code       Arbitrary HTTP status code
-     * @param msg        Exception Message
-     * @param th         Throwable to wrap with this Exception
-     */
-    public RemoteSolrException(String remoteHost, int code, String msg, Throwable th) {
-      super(code, "Error from server at " + remoteHost + ": " + msg, th);
-    }
-  }
-
-  /**
-   * This should be thrown when a server has an error in executing the request and it sends a proper payload back to the
-   * client
-   */
-  public static class RemoteExecutionException extends RemoteSolrException {
-    private NamedList meta;
-
-    public RemoteExecutionException(String remoteHost, int code, String msg, NamedList meta) {
-      super(remoteHost, code, msg, null);
-      this.meta = meta;
-    }
-
-    public static RemoteExecutionException create(String host, NamedList errResponse) {
-      Object errObj = errResponse.get("error");
-      if (errObj != null) {
-        Number code = (Number) getObjectByPath(errObj, true, Collections.singletonList("code"));
-        String msg = (String) getObjectByPath(errObj, true, Collections.singletonList("msg"));
-        return new RemoteExecutionException(host, code == null ? ErrorCode.UNKNOWN.code : code.intValue(),
-            msg == null ? "Unknown Error" : msg, errResponse);
-
-      } else {
-        throw new RuntimeException("No error");
-      }
-    }
-
-    public NamedList getMetaData() {
-      return meta;
-    }
-  }
-
   public Set<String> getQueryParams() {
     return queryParams;
   }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java
index bbab3aa..07fd8f8 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java
@@ -17,67 +17,25 @@
 package org.apache.solr.client.solrj.impl;
 
 import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.http.client.HttpClient;
 import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.common.cloud.Aliases;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.ClusterState.CollectionRef;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SimpleOrderedMap;
-import org.apache.solr.common.util.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class HttpClusterStateProvider implements ClusterStateProvider {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+public class HttpClusterStateProvider extends BaseHttpClusterStateProvider {
 
-  private String urlScheme;
-  volatile Set<String> liveNodes;
-  long liveNodesTimestamp = 0;
-  volatile Map<String, List<String>> aliases;
-  long aliasesTimestamp = 0;
-
-  private int cacheTimeout = 5; // the liveNodes and aliases cache will be invalidated after 5 secs
-  final HttpClient httpClient;
-  final boolean clientIsInternal;
+  private final HttpClient httpClient;
+  private final boolean clientIsInternal;
 
   public HttpClusterStateProvider(List<String> solrUrls, HttpClient httpClient) throws Exception {
     this.httpClient = httpClient == null? HttpClientUtil.createClient(null): httpClient;
     this.clientIsInternal = httpClient == null;
-    for (String solrUrl: solrUrls) {
-      urlScheme = solrUrl.startsWith("https")? "https": "http";
-      try (SolrClient initialClient = new HttpSolrClient.Builder().withBaseSolrUrl(solrUrl).withHttpClient(httpClient).build()) {
-        Set<String> liveNodes = fetchLiveNodes(initialClient); // throws exception if unable to fetch
-        this.liveNodes = liveNodes;
-        liveNodesTimestamp = System.nanoTime();
-        break;
-      } catch (IOException e) {
-        log.warn("Attempt to fetch live_nodes from " + solrUrl + " failed.", e);
-      }
-    }
+    init(solrUrls);
+  }
 
-    if (this.liveNodes == null || this.liveNodes.isEmpty()) {
-      throw new RuntimeException("Tried fetching live_nodes using Solr URLs provided, i.e. " + solrUrls + ". However, "
-          + "succeeded in obtaining the cluster state from none of them."
-          + "If you think your Solr cluster is up and is accessible,"
-          + " you could try re-creating a new CloudSolrClient using working"
-          + " solrUrl(s) or zkHost(s).");
-    }
+  @Override
+  protected SolrClient getSolrClient(String baseUrl) {
+    return new HttpSolrClient.Builder().withBaseSolrUrl(baseUrl).withHttpClient(httpClient).build();
   }
 
   @Override
@@ -86,247 +44,4 @@ public class HttpClusterStateProvider implements ClusterStateProvider {
       HttpClientUtil.close(httpClient);
     }
   }
-
-  @Override
-  public CollectionRef getState(String collection) {
-    for (String nodeName: liveNodes) {
-      try (HttpSolrClient client = new HttpSolrClient.Builder().
-          withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
-          withHttpClient(httpClient).build()) {
-        ClusterState cs = fetchClusterState(client, collection, null);
-        return cs.getCollectionRef(collection);
-      } catch (SolrServerException | IOException e) {
-        log.warn("Attempt to fetch cluster state from " +
-            Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
-      } catch (RemoteSolrException e) {
-        if ("NOT_FOUND".equals(e.getMetadata("CLUSTERSTATUS"))) {
-          return null;
-        }
-        log.warn("Attempt to fetch cluster state from " +
-            Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
-      } catch (NotACollectionException e) {
-        // Cluster state for the given collection was not found, could be an alias.
-        // Lets fetch/update our aliases:
-        getAliases(true);
-        return null;
-      }
-    }
-    throw new RuntimeException("Tried fetching cluster state using the node names we knew of, i.e. " + liveNodes +". However, "
-        + "succeeded in obtaining the cluster state from none of them."
-        + "If you think your Solr cluster is up and is accessible,"
-        + " you could try re-creating a new CloudSolrClient using working"
-        + " solrUrl(s) or zkHost(s).");
-  }
-
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  private ClusterState fetchClusterState(SolrClient client, String collection, Map<String, Object> clusterProperties) throws SolrServerException, IOException, NotACollectionException {
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    if (collection != null) {
-      params.set("collection", collection);
-    }
-    params.set("action", "CLUSTERSTATUS");
-    QueryRequest request = new QueryRequest(params);
-    request.setPath("/admin/collections");
-    NamedList cluster = (SimpleOrderedMap) client.request(request).get("cluster");
-    Map<String, Object> collectionsMap;
-    if (collection != null) {
-      collectionsMap = Collections.singletonMap(collection,
-          ((NamedList) cluster.get("collections")).get(collection));
-    } else {
-      collectionsMap = ((NamedList)cluster.get("collections")).asMap(10);
-    }
-    int znodeVersion;
-    Map<String, Object> collFromStatus = (Map<String, Object>) (collectionsMap).get(collection);
-    if (collection != null && collFromStatus == null) {
-      throw new NotACollectionException(); // probably an alias
-    }
-    if (collection != null) { // can be null if alias
-      znodeVersion =  (int) collFromStatus.get("znodeVersion");
-    } else {
-      znodeVersion = -1;
-    }
-    Set<String> liveNodes = new HashSet((List<String>)(cluster.get("live_nodes")));
-    this.liveNodes = liveNodes;
-    liveNodesTimestamp = System.nanoTime();
-    //TODO SOLR-11877 we don't know the znode path; CLUSTER_STATE is probably wrong leading to bad stateFormat
-    ClusterState cs = ClusterState.load(znodeVersion, collectionsMap, liveNodes, ZkStateReader.CLUSTER_STATE);
-    if (clusterProperties != null) {
-      Map<String, Object> properties = (Map<String, Object>) cluster.get("properties");
-      if (properties != null) {
-        clusterProperties.putAll(properties);
-      }
-    }
-    return cs;
-  }
-
-  @Override
-  public Set<String> getLiveNodes() {
-    if (liveNodes == null) {
-      throw new RuntimeException("We don't know of any live_nodes to fetch the"
-          + " latest live_nodes information from. "
-          + "If you think your Solr cluster is up and is accessible,"
-          + " you could try re-creating a new CloudSolrClient using working"
-          + " solrUrl(s) or zkHost(s).");
-    }
-    if (TimeUnit.SECONDS.convert((System.nanoTime() - liveNodesTimestamp), TimeUnit.NANOSECONDS) > getCacheTimeout()) {
-      for (String nodeName: liveNodes) {
-        try (HttpSolrClient client = new HttpSolrClient.Builder().
-            withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
-            withHttpClient(httpClient).build()) {
-          Set<String> liveNodes = fetchLiveNodes(client);
-          this.liveNodes = (liveNodes);
-          liveNodesTimestamp = System.nanoTime();
-          return liveNodes;
-        } catch (Exception e) {
-          log.warn("Attempt to fetch live_nodes from " +
-              Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
-        }
-      }
-      throw new RuntimeException("Tried fetching live_nodes using all the node names we knew of, i.e. " + liveNodes +". However, "
-          + "succeeded in obtaining the cluster state from none of them."
-          + "If you think your Solr cluster is up and is accessible,"
-          + " you could try re-creating a new CloudSolrClient using working"
-          + " solrUrl(s) or zkHost(s).");
-    } else {
-      return liveNodes; // cached copy is fresh enough
-    }
-  }
-
-  private static Set<String> fetchLiveNodes(SolrClient client) throws Exception {
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    params.set("action", "CLUSTERSTATUS");
-    QueryRequest request = new QueryRequest(params);
-    request.setPath("/admin/collections");
-    NamedList cluster = (SimpleOrderedMap) client.request(request).get("cluster");
-    Set<String> liveNodes = new HashSet((List<String>)(cluster.get("live_nodes")));
-    return liveNodes;
-  }
-
-  @Override
-  public List<String> resolveAlias(String aliasName) {
-    return Aliases.resolveAliasesGivenAliasMap(getAliases(false), aliasName);
-  }
-
-  private Map<String, List<String>> getAliases(boolean forceFetch) {
-    if (this.liveNodes == null) {
-      throw new RuntimeException("We don't know of any live_nodes to fetch the"
-          + " latest aliases information from. "
-          + "If you think your Solr cluster is up and is accessible,"
-          + " you could try re-creating a new CloudSolrClient using working"
-          + " solrUrl(s) or zkHost(s).");
-    }
-
-    if (forceFetch || this.aliases == null ||
-        TimeUnit.SECONDS.convert((System.nanoTime() - aliasesTimestamp), TimeUnit.NANOSECONDS) > getCacheTimeout()) {
-      for (String nodeName: liveNodes) {
-        try (HttpSolrClient client = new HttpSolrClient.Builder().
-            withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
-            withHttpClient(httpClient).build()) {
-
-          Map<String, List<String>> aliases = new CollectionAdminRequest.ListAliases().process(client).getAliasesAsLists();
-          this.aliases = aliases;
-          this.aliasesTimestamp = System.nanoTime();
-          return Collections.unmodifiableMap(this.aliases);
-        } catch (SolrServerException | RemoteSolrException | IOException e) {
-          // Situation where we're hitting an older Solr which doesn't have LISTALIASES
-          if (e instanceof RemoteSolrException && ((RemoteSolrException)e).code()==400) {
-            log.warn("LISTALIASES not found, possibly using older Solr server. Aliases won't work"
-                + " unless you re-create the CloudSolrClient using zkHost(s) or upgrade Solr server", e);
-            this.aliases = Collections.emptyMap();
-            this.aliasesTimestamp = System.nanoTime();
-            return aliases;
-          }
-          log.warn("Attempt to fetch cluster state from " +
-              Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
-        }
-      }
-
-      throw new RuntimeException("Tried fetching aliases using all the node names we knew of, i.e. " + liveNodes +". However, "
-          + "succeeded in obtaining the cluster state from none of them."
-          + "If you think your Solr cluster is up and is accessible,"
-          + " you could try re-creating a new CloudSolrClient using a working"
-          + " solrUrl or zkHost.");
-    } else {
-      return Collections.unmodifiableMap(this.aliases); // cached copy is fresh enough
-    }
-  }
-
-  @Override
-  public ClusterState getClusterState() throws IOException {
-    for (String nodeName: liveNodes) {
-      try (HttpSolrClient client = new HttpSolrClient.Builder().
-          withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
-          withHttpClient(httpClient).build()) {
-        ClusterState cs = fetchClusterState(client, null, null);
-        return cs;
-      } catch (SolrServerException | RemoteSolrException | IOException e) {
-        log.warn("Attempt to fetch cluster state from " +
-            Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
-      } catch (NotACollectionException e) {
-        // not possible! (we passed in null for collection so it can't be an alias)
-        throw new RuntimeException("null should never cause NotACollectionException in " +
-            "fetchClusterState() Please report this as a bug!");
-      }
-    }
-    throw new RuntimeException("Tried fetching cluster state using the node names we knew of, i.e. " + liveNodes +". However, "
-        + "succeeded in obtaining the cluster state from none of them."
-        + "If you think your Solr cluster is up and is accessible,"
-        + " you could try re-creating a new CloudSolrClient using working"
-        + " solrUrl(s) or zkHost(s).");
-  }
-
-  @Override
-  public Map<String, Object> getClusterProperties() {
-    for (String nodeName : liveNodes) {
-      try (HttpSolrClient client = new HttpSolrClient.Builder().
-          withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
-          withHttpClient(httpClient).build()) {
-        Map<String, Object> clusterProperties = new HashMap<>();
-        fetchClusterState(client, null, clusterProperties);
-        return clusterProperties;
-      } catch (SolrServerException | RemoteSolrException | IOException e) {
-        log.warn("Attempt to fetch cluster state from " +
-            Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
-      } catch (NotACollectionException e) {
-        // not possible! (we passed in null for collection so it can't be an alias)
-        throw new RuntimeException("null should never cause NotACollectionException in " +
-            "fetchClusterState() Please report this as a bug!");
-      }
-    }
-    throw new RuntimeException("Tried fetching cluster state using the node names we knew of, i.e. " + liveNodes + ". However, "
-        + "succeeded in obtaining the cluster state from none of them."
-        + "If you think your Solr cluster is up and is accessible,"
-        + " you could try re-creating a new CloudSolrClient using working"
-        + " solrUrl(s) or zkHost(s).");
-  }
-
-  @Override
-  public String getPolicyNameByCollection(String coll) {
-    throw new UnsupportedOperationException("Fetching cluster properties not supported"
-        + " using the HttpClusterStateProvider. "
-        + "ZkClientClusterStateProvider can be used for this."); // TODO
-  }
-
-  @Override
-  public Object getClusterProperty(String propertyName) {
-    if (propertyName.equals(ZkStateReader.URL_SCHEME)) {
-      return this.urlScheme;
-    }
-    return getClusterProperties().get(propertyName);
-  }
-
-  @Override
-  public void connect() {}
-
-  public int getCacheTimeout() {
-    return cacheTimeout;
-  }
-
-  public void setCacheTimeout(int cacheTimeout) {
-    this.cacheTimeout = cacheTimeout;
-  }
-
-  // This exception is not meant to escape this class it should be caught and wrapped.
-  private class NotACollectionException extends Exception {
-  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
index 3cdcdd4..d12093b 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
@@ -66,7 +66,6 @@ import org.apache.http.message.BasicHeader;
 import org.apache.http.message.BasicNameValuePair;
 import org.apache.http.util.EntityUtils;
 import org.apache.solr.client.solrj.ResponseParser;
-import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.V2RequestSupport;
@@ -91,7 +90,7 @@ import static org.apache.solr.common.util.Utils.getObjectByPath;
 /**
  * A SolrClient implementation that talks directly to a Solr server via HTTP
  */
-public class HttpSolrClient extends SolrClient {
+public class HttpSolrClient extends BaseHttpSolrClient {
 
   private static final String UTF_8 = StandardCharsets.UTF_8.name();
   private static final String DEFAULT_PATH = "/select";
@@ -560,7 +559,7 @@ public class HttpSolrClient extends SolrClient {
       } else {
         contentType = "";
       }
-      
+
       // handle some http level checks before trying to parse the response
       switch (httpStatus) {
         case HttpStatus.SC_OK:
@@ -798,53 +797,26 @@ s   * @deprecated since 7.0  Use {@link Builder} methods instead.
     this.useMultiPartPost = useMultiPartPost;
   }
 
+
   /**
-   * Subclass of SolrException that allows us to capture an arbitrary HTTP
-   * status code that may have been returned by the remote server or a 
-   * proxy along the way.
+   * @deprecated since 8.0, catch {@link BaseHttpSolrClient.RemoteSolrException} instead
    */
-  public static class RemoteSolrException extends SolrException {
-    /**
-     * @param remoteHost the host the error was received from
-     * @param code Arbitrary HTTP status code
-     * @param msg Exception Message
-     * @param th Throwable to wrap with this Exception
-     */
+  @Deprecated
+  public static class RemoteSolrException extends BaseHttpSolrClient.RemoteSolrException {
+
     public RemoteSolrException(String remoteHost, int code, String msg, Throwable th) {
-      super(code, "Error from server at " + remoteHost + ": " + msg, th);
+      super(remoteHost, code, msg, th);
     }
   }
 
   /**
-   * This should be thrown when a server has an error in executing the request and
-   * it sends a proper payload back to the client
+   * @deprecated since 8.0, catch {@link BaseHttpSolrClient.RemoteExecutionException} instead
    */
-  public static class RemoteExecutionException extends RemoteSolrException {
-    private NamedList meta;
+  @Deprecated
+  public static class RemoteExecutionException extends BaseHttpSolrClient.RemoteExecutionException {
 
     public RemoteExecutionException(String remoteHost, int code, String msg, NamedList meta) {
-      super(remoteHost, code, msg, null);
-      this.meta = meta;
-    }
-
-
-    public static RemoteExecutionException create(String host, NamedList errResponse) {
-      Object errObj = errResponse.get("error");
-      if (errObj != null) {
-        Number code = (Number) getObjectByPath(errObj, true, Collections.singletonList("code"));
-        String msg = (String) getObjectByPath(errObj, true, Collections.singletonList("msg"));
-        return new RemoteExecutionException(host, code == null ? ErrorCode.UNKNOWN.code : code.intValue(),
-            msg == null ? "Unknown Error" : msg, errResponse);
-
-      } else {
-        throw new RuntimeException("No error");
-      }
-
-    }
-
-    public NamedList getMetaData() {
-
-      return meta;
+      super(remoteHost, code, msg, meta);
     }
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
index 8a142bf..3eb20eb 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
@@ -31,8 +31,10 @@ import java.util.Objects;
 import java.util.Set;
 
 import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
+import org.apache.solr.client.solrj.impl.LBSolrClient;
 import org.apache.solr.client.solrj.response.UpdateResponse;
 import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.common.SolrInputDocument;
@@ -236,25 +238,21 @@ public class UpdateRequest extends AbstractUpdateRequest {
     params.set(UpdateParams.COMMIT, "true");
     return process(client, collection);
   }
-  
-  /**
-   * @param router to route updates with
-   * @param col DocCollection for the updates
-   * @param urlMap of the cluster
-   * @param params params to use
-   * @param idField the id field
-   * @return a Map of urls to requests
-   */
-  public Map<String,LBHttpSolrClient.Req> getRoutes(DocRouter router,
-      DocCollection col, Map<String,List<String>> urlMap,
-      ModifiableSolrParams params, String idField) {
-    
+
+  private interface ReqSupplier<T extends LBSolrClient.Req> {
+    T get(SolrRequest solrRequest, List<String> servers);
+  }
+
+  private <T extends LBSolrClient.Req> Map<String, T> getRoutes(DocRouter router,
+                                                                               DocCollection col, Map<String,List<String>> urlMap,
+                                                                               ModifiableSolrParams params, String idField,
+                                                                               ReqSupplier<T> reqSupplier) {
     if ((documents == null || documents.size() == 0)
         && (deleteById == null || deleteById.size() == 0)) {
       return null;
     }
-    
-    Map<String,LBHttpSolrClient.Req> routes = new HashMap<>();
+
+    Map<String,T> routes = new HashMap<>();
     if (documents != null) {
       Set<Entry<SolrInputDocument,Map<String,Object>>> entries = documents.entrySet();
       for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
@@ -273,7 +271,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
           return null;
         }
         String leaderUrl = urls.get(0);
-        LBHttpSolrClient.Req request = routes
+        T request = routes
             .get(leaderUrl);
         if (request == null) {
           UpdateRequest updateRequest = new UpdateRequest();
@@ -283,7 +281,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
           updateRequest.setPath(getPath());
           updateRequest.setBasicAuthCredentials(getBasicAuthUser(), getBasicAuthPassword());
           updateRequest.setResponseParser(getResponseParser());
-          request = new LBHttpSolrClient.Req(updateRequest, urls);
+          request = reqSupplier.get(updateRequest, urls);
           routes.put(leaderUrl, request);
         }
         UpdateRequest urequest = (UpdateRequest) request.getRequest();
@@ -299,17 +297,17 @@ public class UpdateRequest extends AbstractUpdateRequest {
         }
       }
     }
-    
+
     // Route the deleteById's
-    
+
     if (deleteById != null) {
-      
+
       Iterator<Map.Entry<String,Map<String,Object>>> entries = deleteById.entrySet()
           .iterator();
       while (entries.hasNext()) {
-        
+
         Map.Entry<String,Map<String,Object>> entry = entries.next();
-        
+
         String deleteId = entry.getKey();
         Map<String,Object> map = entry.getValue();
         Long version = null;
@@ -325,7 +323,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
           return null;
         }
         String leaderUrl = urls.get(0);
-        LBHttpSolrClient.Req request = routes.get(leaderUrl);
+        T request = routes.get(leaderUrl);
         if (request != null) {
           UpdateRequest urequest = (UpdateRequest) request.getRequest();
           urequest.deleteById(deleteId, version);
@@ -335,7 +333,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
           urequest.deleteById(deleteId, version);
           urequest.setCommitWithin(getCommitWithin());
           urequest.setBasicAuthCredentials(getBasicAuthUser(), getBasicAuthPassword());
-          request = new LBHttpSolrClient.Req(urequest, urls);
+          request = reqSupplier.get(urequest, urls);
           routes.put(leaderUrl, request);
         }
       }
@@ -344,6 +342,36 @@ public class UpdateRequest extends AbstractUpdateRequest {
     return routes;
   }
   
+  /**
+   * @param router to route updates with
+   * @param col DocCollection for the updates
+   * @param urlMap of the cluster
+   * @param params params to use
+   * @param idField the id field
+   * @return a Map of urls to requests
+   */
+  public Map<String, LBSolrClient.Req> getRoutesToCollection(DocRouter router,
+                                                             DocCollection col, Map<String,List<String>> urlMap,
+                                                             ModifiableSolrParams params, String idField) {
+    return getRoutes(router, col, urlMap, params, idField, LBSolrClient.Req::new);
+  }
+  
+  /**
+   * @param router to route updates with
+   * @param col DocCollection for the updates
+   * @param urlMap of the cluster
+   * @param params params to use
+   * @param idField the id field
+   * @return a Map of urls to requests
+   * @deprecated since 8.0, uses {@link #getRoutesToCollection(DocRouter, DocCollection, Map, ModifiableSolrParams, String)} instead
+   */
+  @Deprecated
+  public Map<String,LBHttpSolrClient.Req> getRoutes(DocRouter router,
+      DocCollection col, Map<String,List<String>> urlMap,
+      ModifiableSolrParams params, String idField) {
+    return getRoutes(router, col, urlMap, params, idField, LBHttpSolrClient.Req::new);
+  }
+  
   public void setDocIterator(Iterator<SolrInputDocument> docIterator) {
     this.docIterator = docIterator;
   }
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientBadInputTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientBadInputTest.java
new file mode 100644
index 0000000..6206d4d
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientBadInputTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.hamcrest.core.StringContains.containsString;
+
+public class CloudHttp2SolrClientBadInputTest extends SolrCloudTestCase {
+  private static final List<String> NULL_STR_LIST = null;
+  private static final List<String> EMPTY_STR_LIST = new ArrayList<>();
+  private static final String ANY_COLLECTION = "ANY_COLLECTION";
+  private static final int ANY_COMMIT_WITHIN_TIME = -1;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(1)
+        .configure();
+
+    final List<String> solrUrls = new ArrayList<>();
+  }
+
+  @Test
+  public void testDeleteByIdReportsInvalidIdLists() throws Exception {
+    try (SolrClient client = getCloudHttp2SolrClient(cluster)) {
+      assertExceptionThrownWithMessageContaining(IllegalArgumentException.class, Lists.newArrayList("ids", "null"), () -> {
+        client.deleteById(ANY_COLLECTION, NULL_STR_LIST);
+      });
+      assertExceptionThrownWithMessageContaining(IllegalArgumentException.class, Lists.newArrayList("ids", "empty"), () -> {
+        client.deleteById(ANY_COLLECTION, EMPTY_STR_LIST);
+      });
+      assertExceptionThrownWithMessageContaining(IllegalArgumentException.class, Lists.newArrayList("ids", "null"), () -> {
+        client.deleteById(ANY_COLLECTION, NULL_STR_LIST, ANY_COMMIT_WITHIN_TIME);
+      });
+      assertExceptionThrownWithMessageContaining(IllegalArgumentException.class, Lists.newArrayList("ids", "empty"), () -> {
+        client.deleteById(ANY_COLLECTION, EMPTY_STR_LIST, ANY_COMMIT_WITHIN_TIME);
+      });
+    }
+  }
+
+  private void assertExceptionThrownWithMessageContaining(Class expectedType, List<String> expectedStrings, LuceneTestCase.ThrowingRunnable runnable) {
+    Throwable thrown = expectThrows(expectedType, runnable);
+
+    if (expectedStrings != null) {
+      for (String expectedString : expectedStrings) {
+        assertThat(thrown.getMessage(), containsString(expectedString));
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientBuilderTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientBuilderTest.java
new file mode 100644
index 0000000..72d5e8c
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientBuilderTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.junit.Test;
+
+public class CloudHttp2SolrClientBuilderTest extends LuceneTestCase {
+  private static final String ANY_CHROOT = "/ANY_CHROOT";
+  private static final String ANY_ZK_HOST = "ANY_ZK_HOST";
+  private static final String ANY_OTHER_ZK_HOST = "ANY_OTHER_ZK_HOST";
+
+  @Test
+  public void testSingleZkHostSpecified() throws IOException {
+    try(CloudHttp2SolrClient createdClient = new CloudHttp2SolrClient.Builder(Collections.singletonList(ANY_ZK_HOST), Optional.of(ANY_CHROOT))
+        .build()) {
+      final String clientZkHost = createdClient.getZkHost();
+
+      assertTrue(clientZkHost.contains(ANY_ZK_HOST));
+    }
+  }
+
+  @Test
+  public void testSeveralZkHostsSpecifiedSingly() throws IOException {
+    final List<String> zkHostList = new ArrayList<>();
+    zkHostList.add(ANY_ZK_HOST); zkHostList.add(ANY_OTHER_ZK_HOST);
+    try (CloudHttp2SolrClient createdClient = new CloudHttp2SolrClient.Builder(zkHostList, Optional.of(ANY_CHROOT))
+        .build()) {
+      final String clientZkHost = createdClient.getZkHost();
+
+      assertTrue(clientZkHost.contains(ANY_ZK_HOST));
+      assertTrue(clientZkHost.contains(ANY_OTHER_ZK_HOST));
+    }
+  }
+
+  @Test
+  public void testSeveralZkHostsSpecifiedTogether() throws IOException {
+    final ArrayList<String> zkHosts = new ArrayList<String>();
+    zkHosts.add(ANY_ZK_HOST);
+    zkHosts.add(ANY_OTHER_ZK_HOST);
+    try(CloudHttp2SolrClient createdClient = new CloudHttp2SolrClient.Builder(zkHosts, Optional.of(ANY_CHROOT)).build()) {
+      final String clientZkHost = createdClient.getZkHost();
+
+      assertTrue(clientZkHost.contains(ANY_ZK_HOST));
+      assertTrue(clientZkHost.contains(ANY_OTHER_ZK_HOST));
+    }
+  }
+
+  @Test
+  public void testByDefaultConfiguresClientToSendUpdatesOnlyToShardLeaders() throws IOException {
+    try(CloudHttp2SolrClient createdClient = new CloudHttp2SolrClient.Builder(Collections.singletonList(ANY_ZK_HOST), Optional.of(ANY_CHROOT)).build()) {
+      assertTrue(createdClient.isUpdatesToLeaders() == true);
+    }
+  }
+
+  @Test
+  public void testIsDirectUpdatesToLeadersOnlyDefault() throws IOException {
+    try(CloudHttp2SolrClient createdClient = new CloudHttp2SolrClient.Builder(Collections.singletonList(ANY_ZK_HOST), Optional.of(ANY_CHROOT)).build()) {
+      assertFalse(createdClient.isDirectUpdatesToLeadersOnly());
+    }
+  }
+
+}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientMultiConstructorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientMultiConstructorTest.java
new file mode 100644
index 0000000..fa3425b
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientMultiConstructorTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.TestUtil;
+import org.junit.Test;
+
+public class CloudHttp2SolrClientMultiConstructorTest extends LuceneTestCase {
+  
+  /*
+   * NOTE: If you only include one String argument, it will NOT use the
+   * constructor with the variable argument list, which is the one that
+   * we are testing here.
+   */
+  Collection<String> hosts;
+
+  @Test
+  // commented out on: 24-Dec-2018   @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 20-Sep-2018
+  public void testZkConnectionStringConstructorWithValidChroot() throws IOException {
+    boolean setOrList = random().nextBoolean();
+    int numOfZKServers = TestUtil.nextInt(random(), 1, 5);
+    boolean withChroot = random().nextBoolean();
+
+    final String chroot = "/mychroot";
+
+    StringBuilder sb = new StringBuilder();
+
+    if(setOrList) {
+      /*
+        A LinkedHashSet is required here for testing, or we can't guarantee
+        the order of entries in the final string.
+       */
+      hosts = new LinkedHashSet<>();
+    } else {
+      hosts = new ArrayList<>();
+    }
+
+    for(int i=0; i<numOfZKServers; i++) {
+      String ZKString = "host" + i + ":2181";
+      hosts.add(ZKString);
+      sb.append(ZKString);
+      if(i<numOfZKServers -1) sb.append(",");
+    }
+
+    String clientChroot = null;
+    if (withChroot) {
+      sb.append(chroot);
+      clientChroot = "/mychroot";
+    }
+
+    try (CloudHttp2SolrClient client = new CloudHttp2SolrClient.Builder(new ArrayList<>(hosts), Optional.ofNullable(clientChroot)).build()) {
+      assertEquals(sb.toString(), client.getZkHost());
+    }
+  }
+  
+  @Test(expected = IllegalArgumentException.class)
+  // commented out on: 24-Dec-2018   @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 20-Sep-2018
+  public void testBadChroot() {
+    final List<String> zkHosts = new ArrayList<>();
+    zkHosts.add("host1:2181");
+    new CloudHttp2SolrClient.Builder(zkHosts, Optional.of("foo")).build();
+  }
+}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientRetryTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientRetryTest.java
new file mode 100644
index 0000000..52a4b84
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientRetryTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import java.util.Collections;
+import java.util.Optional;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.util.TestInjection;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class CloudHttp2SolrClientRetryTest extends SolrCloudTestCase {
+  private static final int NODE_COUNT = 1;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(NODE_COUNT)
+        .addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
+        .configure();
+  }
+
+  @Test
+  public void testRetry() throws Exception {
+    String collectionName = "testRetry";
+    try (CloudHttp2SolrClient solrClient = new CloudHttp2SolrClient.Builder(Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty()).build()) {
+      CollectionAdminRequest.createCollection(collectionName, 1, 1)
+          .process(solrClient);
+
+      solrClient.add(collectionName, new SolrInputDocument("id", "1"));
+
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set(CommonParams.QT, "/admin/metrics");
+      String updateRequestCountKey = "solr.core.testRetry.shard1.replica_n1:UPDATE./update.requestTimes:count";
+      params.set("key", updateRequestCountKey);
+      params.set("indent", "true");
+
+      QueryResponse response = solrClient.query(collectionName, params, SolrRequest.METHOD.GET);
+      NamedList<Object> namedList = response.getResponse();
+      System.out.println(namedList);
+      NamedList metrics = (NamedList) namedList.get("metrics");
+      assertEquals(1L, metrics.get(updateRequestCountKey));
+
+      TestInjection.failUpdateRequests = "true:100";
+      try {
+        expectThrows(BaseCloudSolrClient.RouteException.class,
+            "Expected an exception on the client when failure is injected during updates", () -> {
+              solrClient.add(collectionName, new SolrInputDocument("id", "2"));
+            });
+      } finally {
+        TestInjection.reset();
+      }
+
+      response = solrClient.query(collectionName, params, SolrRequest.METHOD.GET);
+      namedList = response.getResponse();
+      System.out.println(namedList);
+      metrics = (NamedList) namedList.get("metrics");
+      assertEquals(2L, metrics.get(updateRequestCountKey));
+    }
+  }
+}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
new file mode 100644
index 0000000..de8c311
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
@@ -0,0 +1,978 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.impl;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.lucene.util.TestUtil;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.client.solrj.response.RequestStatusState;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.cloud.AbstractDistribZkTestBase;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.common.params.UpdateParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.handler.admin.CollectionsHandler;
+import org.apache.solr.handler.admin.ConfigSetsHandler;
+import org.apache.solr.handler.admin.CoreAdminHandler;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.client.solrj.impl.BaseCloudSolrClient.*;
+
+
+/**
+ * This test would be faster if we simulated the zk state instead.
+ */
+@Slow
+public class CloudHttp2SolrClientTest extends SolrCloudTestCase {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  
+  private static final String COLLECTION = "collection1";
+  private static final String COLLECTION2 = "2nd_collection";
+
+  private static final String id = "id";
+
+  private static final int TIMEOUT = 30;
+  private static final int NODE_COUNT = 3;
+
+  private static CloudHttp2SolrClient httpBasedCloudSolrClient = null;
+  private static CloudHttp2SolrClient zkBasedCloudSolrClient = null;
+
+  @Before
+  public void setupCluster() throws Exception {
+    configureCluster(NODE_COUNT)
+        .addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
+        .configure();
+
+    final List<String> solrUrls = new ArrayList<>();
+    solrUrls.add(cluster.getJettySolrRunner(0).getBaseUrl().toString());
+    httpBasedCloudSolrClient = new CloudHttp2SolrClient.Builder(solrUrls).build();
+    zkBasedCloudSolrClient = new CloudHttp2SolrClient.Builder(Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty()).build();
+  }
+
+  
+  @After 
+  public void tearDown() throws Exception {
+    if (httpBasedCloudSolrClient != null) {
+      try {
+        httpBasedCloudSolrClient.close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    if (zkBasedCloudSolrClient != null) {
+      try {
+        zkBasedCloudSolrClient.close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    
+    shutdownCluster();
+    super.tearDown();
+  }
+
+  /**
+   * Randomly return the cluster's ZK based CSC, or HttpClusterProvider based CSC.
+   */
+  private CloudHttp2SolrClient getRandomClient() {
+//    return random().nextBoolean()? zkBasedCloudSolrClient : httpBasedCloudSolrClient;
+    return httpBasedCloudSolrClient;
+  }
+
+  @Test
+  public void testParallelUpdateQTime() throws Exception {
+    CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
+    cluster.waitForActiveCollection(COLLECTION, 2, 2);
+    UpdateRequest req = new UpdateRequest();
+    for (int i=0; i<10; i++)  {
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", String.valueOf(TestUtil.nextInt(random(), 1000, 1100)));
+      req.add(doc);
+    }
+    UpdateResponse response = req.process(getRandomClient(), COLLECTION);
+    // See SOLR-6547, we just need to ensure that no exception is thrown here
+    assertTrue(response.getQTime() >= 0);
+  }
+
+  @Test
+  public void testOverwriteOption() throws Exception {
+
+    CollectionAdminRequest.createCollection("overwrite", "conf", 1, 1)
+        .processAndWait(cluster.getSolrClient(), TIMEOUT);
+    cluster.waitForActiveCollection("overwrite", 1, 1);
+    
+    new UpdateRequest()
+        .add("id", "0", "a_t", "hello1")
+        .add("id", "0", "a_t", "hello2")
+        .commit(cluster.getSolrClient(), "overwrite");
+
+    QueryResponse resp = cluster.getSolrClient().query("overwrite", new SolrQuery("*:*"));
+    assertEquals("There should be one document because overwrite=true", 1, resp.getResults().getNumFound());
+
+    new UpdateRequest()
+        .add(new SolrInputDocument(id, "1", "a_t", "hello1"), /* overwrite = */ false)
+        .add(new SolrInputDocument(id, "1", "a_t", "hello2"), false)
+        .commit(cluster.getSolrClient(), "overwrite");
+      
+    resp = getRandomClient().query("overwrite", new SolrQuery("*:*"));
+    assertEquals("There should be 3 documents because there should be two id=1 docs due to overwrite=false", 3, resp.getResults().getNumFound());
+
+  }
+
+  @Test
+  public void testAliasHandling() throws Exception {
+    CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
+    cluster.waitForActiveCollection(COLLECTION, 2, 2);
+
+    CollectionAdminRequest.createCollection(COLLECTION2, "conf", 2, 1).process(cluster.getSolrClient());
+    cluster.waitForActiveCollection(COLLECTION2, 2, 2);
+
+    CloudHttp2SolrClient client = getRandomClient();
+    SolrInputDocument doc = new SolrInputDocument("id", "1", "title_s", "my doc");
+    client.add(COLLECTION, doc);
+    client.commit(COLLECTION);
+    CollectionAdminRequest.createAlias("testalias", COLLECTION).process(cluster.getSolrClient());
+
+    SolrInputDocument doc2 = new SolrInputDocument("id", "2", "title_s", "my doc too");
+    client.add(COLLECTION2, doc2);
+    client.commit(COLLECTION2);
+    CollectionAdminRequest.createAlias("testalias2", COLLECTION2).process(cluster.getSolrClient());
+
+    CollectionAdminRequest.createAlias("testaliascombined", COLLECTION + "," + COLLECTION2).process(cluster.getSolrClient());
+
+    // ensure that the aliases have been registered
+    Map<String, String> aliases = new CollectionAdminRequest.ListAliases().process(cluster.getSolrClient()).getAliases();
+    assertEquals(COLLECTION, aliases.get("testalias"));
+    assertEquals(COLLECTION2, aliases.get("testalias2"));
+    assertEquals(COLLECTION + "," + COLLECTION2, aliases.get("testaliascombined"));
+
+    assertEquals(1, client.query(COLLECTION, params("q", "*:*")).getResults().getNumFound());
+    assertEquals(1, client.query("testalias", params("q", "*:*")).getResults().getNumFound());
+
+    assertEquals(1, client.query(COLLECTION2, params("q", "*:*")).getResults().getNumFound());
+    assertEquals(1, client.query("testalias2", params("q", "*:*")).getResults().getNumFound());
+
+    assertEquals(2, client.query("testaliascombined", params("q", "*:*")).getResults().getNumFound());
+
+    ModifiableSolrParams paramsWithBothCollections = params("q", "*:*", "collection", COLLECTION + "," + COLLECTION2);
+    assertEquals(2, client.query(null, paramsWithBothCollections).getResults().getNumFound());
+
+    ModifiableSolrParams paramsWithBothAliases = params("q", "*:*", "collection", "testalias,testalias2");
+    assertEquals(2, client.query(null, paramsWithBothAliases).getResults().getNumFound());
+
+    ModifiableSolrParams paramsWithCombinedAlias = params("q", "*:*", "collection", "testaliascombined");
+    assertEquals(2, client.query(null, paramsWithCombinedAlias).getResults().getNumFound());
+
+    ModifiableSolrParams paramsWithMixedCollectionAndAlias = params("q", "*:*", "collection", "testalias," + COLLECTION2);
+    assertEquals(2, client.query(null, paramsWithMixedCollectionAndAlias).getResults().getNumFound());
+  }
+
+  @Test
+  public void testRouting() throws Exception {
+    CollectionAdminRequest.createCollection("routing_collection", "conf", 2, 1).process(cluster.getSolrClient());
+    cluster.waitForActiveCollection("routing_collection", 2, 2);
+    
+    AbstractUpdateRequest request = new UpdateRequest()
+        .add(id, "0", "a_t", "hello1")
+        .add(id, "2", "a_t", "hello2")
+        .setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
+    
+    // Test single threaded routed updates for UpdateRequest
+    NamedList<Object> response = getRandomClient().request(request, "routing_collection");
+    if (getRandomClient().isDirectUpdatesToLeadersOnly()) {
+      checkSingleServer(response);
+    }
+    RouteResponse rr = (RouteResponse) response;
+    Map<String,LBSolrClient.Req> routes = rr.getRoutes();
+    Iterator<Map.Entry<String,LBSolrClient.Req>> it = routes.entrySet()
+        .iterator();
+    while (it.hasNext()) {
+      Map.Entry<String,LBSolrClient.Req> entry = it.next();
+      String url = entry.getKey();
+      UpdateRequest updateRequest = (UpdateRequest) entry.getValue()
+          .getRequest();
+      SolrInputDocument doc = updateRequest.getDocuments().get(0);
+      String id = doc.getField("id").getValue().toString();
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.add("q", "id:" + id);
+      params.add("distrib", "false");
+      QueryRequest queryRequest = new QueryRequest(params);
+      try (HttpSolrClient solrClient = getHttpSolrClient(url)) {
+        QueryResponse queryResponse = queryRequest.process(solrClient);
+        SolrDocumentList docList = queryResponse.getResults();
+        assertTrue(docList.getNumFound() == 1);
+      }
+    }
+    
+    // Test the deleteById routing for UpdateRequest
+    
+    final UpdateResponse uResponse = new UpdateRequest()
+        .deleteById("0")
+        .deleteById("2")
+        .commit(cluster.getSolrClient(), "routing_collection");
+    if (getRandomClient().isDirectUpdatesToLeadersOnly()) {
+      checkSingleServer(uResponse.getResponse());
+    }
+
+    QueryResponse qResponse = getRandomClient().query("routing_collection", new SolrQuery("*:*"));
+    SolrDocumentList docs = qResponse.getResults();
+    assertEquals(0, docs.getNumFound());
+    
+    // Test Multi-Threaded routed updates for UpdateRequest
+    try (CloudSolrClient threadedClient = new CloudSolrClientBuilder
+        (Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
+        .withParallelUpdates(true)
+        .build()) {
+      threadedClient.setDefaultCollection("routing_collection");
+      response = threadedClient.request(request);
+      if (threadedClient.isDirectUpdatesToLeadersOnly()) {
+        checkSingleServer(response);
+      }
+      rr = (RouteResponse) response;
+      routes = rr.getRoutes();
+      it = routes.entrySet()
+          .iterator();
+      while (it.hasNext()) {
+        Map.Entry<String,LBSolrClient.Req> entry = it.next();
+        String url = entry.getKey();
+        UpdateRequest updateRequest = (UpdateRequest) entry.getValue()
+            .getRequest();
+        SolrInputDocument doc = updateRequest.getDocuments().get(0);
+        String id = doc.getField("id").getValue().toString();
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        params.add("q", "id:" + id);
+        params.add("distrib", "false");
+        QueryRequest queryRequest = new QueryRequest(params);
+        try (HttpSolrClient solrClient = getHttpSolrClient(url)) {
+          QueryResponse queryResponse = queryRequest.process(solrClient);
+          SolrDocumentList docList = queryResponse.getResults();
+          assertTrue(docList.getNumFound() == 1);
+        }
+      }
+    }
+
+    // Test that queries with _route_ params are routed by the client
+
+    // Track request counts on each node before query calls
+    ClusterState clusterState = cluster.getSolrClient().getZkStateReader().getClusterState();
+    DocCollection col = clusterState.getCollection("routing_collection");
+    Map<String, Long> requestCountsMap = Maps.newHashMap();
+    for (Slice slice : col.getSlices()) {
+      for (Replica replica : slice.getReplicas()) {
+        String baseURL = (String) replica.get(ZkStateReader.BASE_URL_PROP);
+        requestCountsMap.put(baseURL, getNumRequests(baseURL, "routing_collection"));
+      }
+    }
+
+    // Collect the base URLs of the replicas of shard that's expected to be hit
+    DocRouter router = col.getRouter();
+    Collection<Slice> expectedSlices = router.getSearchSlicesSingle("0", null, col);
+    Set<String> expectedBaseURLs = Sets.newHashSet();
+    for (Slice expectedSlice : expectedSlices) {
+      for (Replica replica : expectedSlice.getReplicas()) {
+        String baseURL = (String) replica.get(ZkStateReader.BASE_URL_PROP);
+        expectedBaseURLs.add(baseURL);
+      }
+    }
+
+    assertTrue("expected urls is not fewer than all urls! expected=" + expectedBaseURLs
+        + "; all=" + requestCountsMap.keySet(),
+        expectedBaseURLs.size() < requestCountsMap.size());
+
+    // Calculate a number of shard keys that route to the same shard.
+    int n;
+    if (TEST_NIGHTLY) {
+      n = random().nextInt(999) + 2;
+    } else {
+      n = random().nextInt(9) + 2;
+    }
+    
+    List<String> sameShardRoutes = Lists.newArrayList();
+    sameShardRoutes.add("0");
+    for (int i = 1; i < n; i++) {
+      String shardKey = Integer.toString(i);
+      Collection<Slice> slices = router.getSearchSlicesSingle(shardKey, null, col);
+      log.info("Expected Slices {}", slices);
+      if (expectedSlices.equals(slices)) {
+        sameShardRoutes.add(shardKey);
+      }
+    }
+
+    assertTrue(sameShardRoutes.size() > 1);
+
+    // Do N queries with _route_ parameter to the same shard
+    for (int i = 0; i < n; i++) {
+      ModifiableSolrParams solrParams = new ModifiableSolrParams();
+      solrParams.set(CommonParams.Q, "*:*");
+      solrParams.set(ShardParams._ROUTE_, sameShardRoutes.get(random().nextInt(sameShardRoutes.size())));
+      log.info("output: {}", getRandomClient().query("routing_collection", solrParams));
+    }
+
+    // Request counts increase from expected nodes should aggregate to 1000, while there should be
+    // no increase in unexpected nodes.
+    int increaseFromExpectedUrls = 0;
+    int increaseFromUnexpectedUrls = 0;
+    Map<String, Long> numRequestsToUnexpectedUrls = Maps.newHashMap();
+    for (Slice slice : col.getSlices()) {
+      for (Replica replica : slice.getReplicas()) {
+        String baseURL = (String) replica.get(ZkStateReader.BASE_URL_PROP);
+
+        Long prevNumRequests = requestCountsMap.get(baseURL);
+        Long curNumRequests = getNumRequests(baseURL, "routing_collection");
+
+        long delta = curNumRequests - prevNumRequests;
+        if (expectedBaseURLs.contains(baseURL)) {
+          increaseFromExpectedUrls += delta;
+        } else {
+          increaseFromUnexpectedUrls += delta;
+          numRequestsToUnexpectedUrls.put(baseURL, delta);
+        }
+      }
+    }
+
+    assertEquals("Unexpected number of requests to expected URLs", n, increaseFromExpectedUrls);
+    assertEquals("Unexpected number of requests to unexpected URLs: " + numRequestsToUnexpectedUrls,
+        0, increaseFromUnexpectedUrls);
+
+  }
+
+  /**
+   * Tests if the specification of 'preferLocalShards' in the query-params
+   * limits the distributed query to locally hosted shards only
+   */
+  @Test
+  // commented 4-Sep-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 2-Aug-2018
+  public void preferLocalShardsTest() throws Exception {
+
+    String collectionName = "localShardsTestColl";
+
+    int liveNodes = cluster.getJettySolrRunners().size();
+
+    // For preferLocalShards to succeed in a test, every shard should have
+    // all its cores on the same node.
+    // Hence the below configuration for our collection
+    CollectionAdminRequest.createCollection(collectionName, "conf", liveNodes, liveNodes)
+        .setMaxShardsPerNode(liveNodes * liveNodes)
+        .processAndWait(cluster.getSolrClient(), TIMEOUT);
+    cluster.waitForActiveCollection(collectionName, liveNodes, liveNodes * liveNodes);
+    // Add some new documents
+    new UpdateRequest()
+        .add(id, "0", "a_t", "hello1")
+        .add(id, "2", "a_t", "hello2")
+        .add(id, "3", "a_t", "hello2")
+        .commit(getRandomClient(), collectionName);
+
+    // Run the actual test for 'preferLocalShards'
+    queryWithShardsPreferenceRules(getRandomClient(), false, collectionName);
+    queryWithShardsPreferenceRules(getRandomClient(), true, collectionName);
+  }
+
+  @SuppressWarnings("deprecation")
+  private void queryWithShardsPreferenceRules(CloudHttp2SolrClient cloudClient,
+                                          boolean useShardsPreference,
+                                          String collectionName)
+      throws Exception
+  {
+    SolrQuery qRequest = new SolrQuery("*:*");
+
+    ModifiableSolrParams qParams = new ModifiableSolrParams();
+    if (useShardsPreference) {
+      qParams.add(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION + ":" + ShardParams.REPLICA_LOCAL);
+    } else {
+      qParams.add(CommonParams.PREFER_LOCAL_SHARDS, "true");
+    }
+    qParams.add(ShardParams.SHARDS_INFO, "true");
+    qRequest.add(qParams);
+
+    // CloudSolrClient sends the request to some node.
+    // And since all the nodes are hosting cores from all shards, the
+    // distributed query formed by this node will select cores from the
+    // local shards only
+    QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
+
+    Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
+    assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
+
+    // Iterate over shards-info and check what cores responded
+    SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
+    Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
+    List<String> shardAddresses = new ArrayList<String>();
+    while (itr.hasNext()) {
+      Map.Entry<String, ?> e = itr.next();
+      assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
+      String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
+      assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress);
+      shardAddresses.add(shardAddress);
+    }
+    log.info("Shards giving the response: " + Arrays.toString(shardAddresses.toArray()));
+
+    // Make sure the distributed queries were directed to a single node only
+    Set<Integer> ports = new HashSet<Integer>();
+    for (String shardAddr: shardAddresses) {
+      URL url = new URL (shardAddr);
+      ports.add(url.getPort());
+    }
+
+    // This assertion would hold true as long as every shard has a core on each node
+    assertTrue ("Response was not received from shards on a single node",
+        shardAddresses.size() > 1 && ports.size()==1);
+  }
+
+  private Long getNumRequests(String baseUrl, String collectionName) throws
+      SolrServerException, IOException {
+    return getNumRequests(baseUrl, collectionName, "QUERY", "/select", null, false);
+  }
+
+  private Long getNumRequests(String baseUrl, String collectionName, String category, String key, String scope, boolean returnNumErrors) throws
+      SolrServerException, IOException {
+
+    NamedList<Object> resp;
+    try (HttpSolrClient client = getHttpSolrClient(baseUrl + "/"+ collectionName, 15000, 60000)) {
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set("qt", "/admin/mbeans");
+      params.set("stats", "true");
+      params.set("key", key);
+      params.set("cat", category);
+      // use generic request to avoid extra processing of queries
+      QueryRequest req = new QueryRequest(params);
+      resp = client.request(req);
+    }
+    String name;
+    if (returnNumErrors) {
+      name = category + "." + (scope != null ? scope : key) + ".errors";
+    } else {
+      name = category + "." + (scope != null ? scope : key) + ".requests";
+    }
+    Map<String,Object> map = (Map<String,Object>)resp.findRecursive("solr-mbeans", category, key, "stats");
+    if (map == null) {
+      return null;
+    }
+    if (scope != null) { // admin handler uses a meter instead of counter here
+      return (Long)map.get(name + ".count");
+    } else {
+      return (Long) map.get(name);
+    }
+  }
+
+  @Test
+  public void testNonRetryableRequests() throws Exception {
+    try (CloudSolrClient client = getCloudSolrClient(cluster.getZkServer().getZkAddress())) {
+      // important to have one replica on each node
+      RequestStatusState state = CollectionAdminRequest.createCollection("foo", "conf", 1, NODE_COUNT).processAndWait(client, 60);
+      if (state == RequestStatusState.COMPLETED) {
+        cluster.waitForActiveCollection("foo", 1, NODE_COUNT);
+        client.setDefaultCollection("foo");
+
+        Map<String, String> adminPathToMbean = new HashMap<>(CommonParams.ADMIN_PATHS.size());
+        adminPathToMbean.put(CommonParams.COLLECTIONS_HANDLER_PATH, CollectionsHandler.class.getName());
+        adminPathToMbean.put(CommonParams.CORES_HANDLER_PATH, CoreAdminHandler.class.getName());
+        adminPathToMbean.put(CommonParams.CONFIGSETS_HANDLER_PATH, ConfigSetsHandler.class.getName());
+        // we do not add the authc/authz handlers because they do not currently expose any mbeans
+
+        for (String adminPath : adminPathToMbean.keySet()) {
+          long errorsBefore = 0;
+          for (JettySolrRunner runner : cluster.getJettySolrRunners()) {
+            Long numRequests = getNumRequests(runner.getBaseUrl().toString(), "foo", "ADMIN", adminPathToMbean.get(adminPath), adminPath, true);
+            errorsBefore += numRequests;
+            log.info("Found {} requests to {} on {}", numRequests, adminPath, runner.getBaseUrl());
+          }
+
+          ModifiableSolrParams params = new ModifiableSolrParams();
+          params.set("qt", adminPath);
+          params.set("action", "foobar"); // this should cause an error
+          QueryRequest req = new QueryRequest(params);
+          try {
+            NamedList<Object> resp = client.request(req);
+            fail("call to foo for admin path " + adminPath + " should have failed");
+          } catch (Exception e) {
+            // expected
+          }
+          long errorsAfter = 0;
+          for (JettySolrRunner runner : cluster.getJettySolrRunners()) {
+            Long numRequests = getNumRequests(runner.getBaseUrl().toString(), "foo", "ADMIN", adminPathToMbean.get(adminPath), adminPath, true);
+            errorsAfter += numRequests;
+            log.info("Found {} requests to {} on {}", numRequests, adminPath, runner.getBaseUrl());
+          }
+          assertEquals(errorsBefore + 1, errorsAfter);
+        }
+      } else {
+        fail("Collection could not be created within 60 seconds");
+      }
+    }
+  }
+
+  @Test
+  public void checkCollectionParameters() throws Exception {
+
+    try (CloudSolrClient client = getCloudSolrClient(cluster.getZkServer().getZkAddress())) {
+
+      String async1 = CollectionAdminRequest.createCollection("multicollection1", "conf", 2, 1)
+          .processAsync(client);
+      String async2 = CollectionAdminRequest.createCollection("multicollection2", "conf", 2, 1)
+          .processAsync(client);
+
+      CollectionAdminRequest.waitForAsyncRequest(async1, client, TIMEOUT);
+      CollectionAdminRequest.waitForAsyncRequest(async2, client, TIMEOUT);
+      cluster.waitForActiveCollection("multicollection1", 2, 2);
+      cluster.waitForActiveCollection("multicollection2", 2, 2);
+      client.setDefaultCollection("multicollection1");
+
+      List<SolrInputDocument> docs = new ArrayList<>(3);
+      for (int i = 0; i < 3; i++) {
+        SolrInputDocument doc = new SolrInputDocument();
+        doc.addField(id, Integer.toString(i));
+        doc.addField("a_t", "hello");
+        docs.add(doc);
+      }
+
+      client.add(docs);     // default - will add them to multicollection1
+      client.commit();
+
+      ModifiableSolrParams queryParams = new ModifiableSolrParams();
+      queryParams.add("q", "*:*");
+      assertEquals(3, client.query(queryParams).getResults().size());
+      assertEquals(0, client.query("multicollection2", queryParams).getResults().size());
+
+      SolrQuery query = new SolrQuery("*:*");
+      query.set("collection", "multicollection2");
+      assertEquals(0, client.query(query).getResults().size());
+
+      client.add("multicollection2", docs);
+      client.commit("multicollection2");
+
+      assertEquals(3, client.query("multicollection2", queryParams).getResults().size());
+
+    }
+
+  }
+
+  @Test
+  public void stateVersionParamTest() throws Exception {
+    CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
+    cluster.waitForActiveCollection(COLLECTION, 2, 2);
+
+    DocCollection coll = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION);
+    Replica r = coll.getSlices().iterator().next().getReplicas().iterator().next();
+
+    SolrQuery q = new SolrQuery().setQuery("*:*");
+    HttpSolrClient.RemoteSolrException sse = null;
+
+    final String url = r.getStr(ZkStateReader.BASE_URL_PROP) + "/" + COLLECTION;
+    try (HttpSolrClient solrClient = getHttpSolrClient(url)) {
+
+      log.info("should work query, result {}", solrClient.query(q));
+      //no problem
+      q.setParam(CloudSolrClient.STATE_VERSION, COLLECTION + ":" + coll.getZNodeVersion());
+      log.info("2nd query , result {}", solrClient.query(q));
+      //no error yet good
+
+      q.setParam(CloudSolrClient.STATE_VERSION, COLLECTION + ":" + (coll.getZNodeVersion() - 1)); //an older version expect error
+
+      QueryResponse rsp = solrClient.query(q);
+      Map m = (Map) rsp.getResponse().get(CloudSolrClient.STATE_VERSION, rsp.getResponse().size()-1);
+      assertNotNull("Expected an extra information from server with the list of invalid collection states", m);
+      assertNotNull(m.get(COLLECTION));
+    }
+
+    //now send the request to another node that does not serve the collection
+
+    Set<String> allNodesOfColl = new HashSet<>();
+    for (Slice slice : coll.getSlices()) {
+      for (Replica replica : slice.getReplicas()) {
+        allNodesOfColl.add(replica.getStr(ZkStateReader.BASE_URL_PROP));
+      }
+    }
+    String theNode = null;
+    Set<String> liveNodes = cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes();
+    for (String s : liveNodes) {
+      String n = cluster.getSolrClient().getZkStateReader().getBaseUrlForNodeName(s);
+      if(!allNodesOfColl.contains(n)){
+        theNode = n;
+        break;
+      }
+    }
+    log.info("the node which does not serve this collection{} ",theNode);
+    assertNotNull(theNode);
+
+
+    final String solrClientUrl = theNode + "/" + COLLECTION;
+    try (SolrClient solrClient = getHttpSolrClient(solrClientUrl)) {
+
+      q.setParam(CloudSolrClient.STATE_VERSION, COLLECTION + ":" + (coll.getZNodeVersion()-1));
+      try {
+        QueryResponse rsp = solrClient.query(q);
+        log.info("error was expected");
+      } catch (HttpSolrClient.RemoteSolrException e) {
+        sse = e;
+      }
+      assertNotNull(sse);
+      assertEquals(" Error code should be 510", SolrException.ErrorCode.INVALID_STATE.code, sse.code());
+    }
+
+  }
+
+  @Test
+  public void testShutdown() throws IOException {
+    try (CloudSolrClient client = getCloudSolrClient("[ff01::114]:33332")) {
+      client.setZkConnectTimeout(100);
+      client.connect();
+      fail("Expected exception");
+    } catch (SolrException e) {
+      assertTrue(e.getCause() instanceof TimeoutException);
+    }
+  }
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @Test
+  public void testWrongZkChrootTest() throws IOException {
+
+    exception.expect(SolrException.class);
+    exception.expectMessage("cluster not found/not ready");
+
+    try (CloudSolrClient client = getCloudSolrClient(cluster.getZkServer().getZkAddress() + "/xyz/foo")) {
+      client.setZkClientTimeout(1000 * 60);
+      client.connect();
+      fail("Expected exception");
+    }
+  }
+
+  @Test
+  public void customHttpClientTest() throws IOException {
+    CloseableHttpClient client = HttpClientUtil.createClient(null);
+    try (CloudSolrClient solrClient = getCloudSolrClient(cluster.getZkServer().getZkAddress(), client)) {
+
+      assertTrue(solrClient.getLbClient().getHttpClient() == client);
+
+    } finally {
+      HttpClientUtil.close(client);
+    }
+  }
+
+  @Test
+  public void testVersionsAreReturned() throws Exception {
+    CollectionAdminRequest.createCollection("versions_collection", "conf", 2, 1).process(cluster.getSolrClient());
+    cluster.waitForActiveCollection("versions_collection", 2, 2);
+    
+    // assert that "adds" are returned
+    UpdateRequest updateRequest = new UpdateRequest()
+        .add("id", "1", "a_t", "hello1")
+        .add("id", "2", "a_t", "hello2");
+    updateRequest.setParam(UpdateParams.VERSIONS, Boolean.TRUE.toString());
+
+    NamedList<Object> response = updateRequest.commit(getRandomClient(), "versions_collection").getResponse();
+    Object addsObject = response.get("adds");
+    
+    assertNotNull("There must be a adds parameter", addsObject);
+    assertTrue(addsObject instanceof NamedList<?>);
+    NamedList<?> adds = (NamedList<?>) addsObject;
+    assertEquals("There must be 2 versions (one per doc)", 2, adds.size());
+
+    Map<String, Long> versions = new HashMap<>();
+    Object object = adds.get("1");
+    assertNotNull("There must be a version for id 1", object);
+    assertTrue("Version for id 1 must be a long", object instanceof Long);
+    versions.put("1", (Long) object);
+
+    object = adds.get("2");
+    assertNotNull("There must be a version for id 2", object);
+    assertTrue("Version for id 2 must be a long", object instanceof Long);
+    versions.put("2", (Long) object);
+
+    QueryResponse resp = getRandomClient().query("versions_collection", new SolrQuery("*:*"));
+    assertEquals("There should be one document because overwrite=true", 2, resp.getResults().getNumFound());
+
+    for (SolrDocument doc : resp.getResults()) {
+      Long version = versions.get(doc.getFieldValue("id"));
+      assertEquals("Version on add must match _version_ field", version, doc.getFieldValue("_version_"));
+    }
+
+    // assert that "deletes" are returned
+    UpdateRequest deleteRequest = new UpdateRequest().deleteById("1");
+    deleteRequest.setParam(UpdateParams.VERSIONS, Boolean.TRUE.toString());
+    response = deleteRequest.commit(getRandomClient(), "versions_collection").getResponse();
+    Object deletesObject = response.get("deletes");
+    assertNotNull("There must be a deletes parameter", deletesObject);
+    NamedList deletes = (NamedList) deletesObject;
+    assertEquals("There must be 1 version", 1, deletes.size());
+  }
+  
+  @Test
+  public void testInitializationWithSolrUrls() throws Exception {
+    CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
+    cluster.waitForActiveCollection(COLLECTION, 2, 2);
+    CloudHttp2SolrClient client = httpBasedCloudSolrClient;
+    SolrInputDocument doc = new SolrInputDocument("id", "1", "title_s", "my doc");
+    client.add(COLLECTION, doc);
+    client.commit(COLLECTION);
+    assertEquals(1, client.query(COLLECTION, params("q", "*:*")).getResults().getNumFound());
+  }
+
+  @Test
+  public void testCollectionDoesntExist() throws Exception {
+    CloudHttp2SolrClient client = getRandomClient();
+    SolrInputDocument doc = new SolrInputDocument("id", "1", "title_s", "my doc");
+    try {
+      client.add("boguscollectionname", doc);
+      fail();
+    } catch (SolrException ex) {
+      if (ex.getMessage().equals("Collection not found: boguscollectionname")) {
+        // pass
+      } else {
+        throw ex;
+      }
+    }
+  }
+
+  public void testRetryUpdatesWhenClusterStateIsStale() throws Exception {
+    final String COL = "stale_state_test_col";
+    assert cluster.getJettySolrRunners().size() >= 2;
+
+    final JettySolrRunner old_leader_node = cluster.getJettySolrRunners().get(0);
+    final JettySolrRunner new_leader_node = cluster.getJettySolrRunners().get(1);
+    
+    // start with exactly 1 shard/replica...
+    assertEquals("Couldn't create collection", 0,
+                 CollectionAdminRequest.createCollection(COL, "conf", 1, 1)
+                 .setCreateNodeSet(old_leader_node.getNodeName())
+                 .process(cluster.getSolrClient()).getStatus());
+    cluster.waitForActiveCollection(COL, 1, 1);
+
+    // determine the coreNodeName of only current replica
+    Collection<Slice> slices = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COL).getSlices();
+    assertEquals(1, slices.size()); // sanity check
+    Slice slice = slices.iterator().next();
+    assertEquals(1, slice.getReplicas().size()); // sanity check
+    final String old_leader_core_node_name = slice.getLeader().getName();
+
+    // NOTE: creating our own CloudSolrClient whose settings we can muck with...
+    try (CloudSolrClient stale_client = new CloudSolrClientBuilder
+        (Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
+        .sendDirectUpdatesToAnyShardReplica()
+        .withParallelUpdates(true)
+        .build()) {
+      // don't let collection cache entries get expired, even on a slow machine...
+      stale_client.setCollectionCacheTTl(Integer.MAX_VALUE);
+      stale_client.setDefaultCollection(COL);
+     
+      // do a query to populate stale_client's cache...
+      assertEquals(0, stale_client.query(new SolrQuery("*:*")).getResults().getNumFound());
+      
+      // add 1 replica on a diff node...
+      assertEquals("Couldn't create collection", 0,
+                   CollectionAdminRequest.addReplicaToShard(COL, "shard1")
+                   .setNode(new_leader_node.getNodeName())
+                   // NOTE: don't use our stale_client for this -- don't tip it off of a collection change
+                   .process(cluster.getSolrClient()).getStatus());
+      AbstractDistribZkTestBase.waitForRecoveriesToFinish
+        (COL, cluster.getSolrClient().getZkStateReader(), true, true, 330);
+      
+      // ...and delete our original leader.
+      assertEquals("Couldn't create collection", 0,
+                   CollectionAdminRequest.deleteReplica(COL, "shard1", old_leader_core_node_name)
+                   // NOTE: don't use our stale_client for this -- don't tip it off of a collection change
+                   .process(cluster.getSolrClient()).getStatus());
+      AbstractDistribZkTestBase.waitForRecoveriesToFinish
+        (COL, cluster.getSolrClient().getZkStateReader(), true, true, 330);
+
+      // stale_client's collection state cache should now only point at a leader that no longer exists.
+      
+      // attempt a (direct) update that should succeed in spite of cached cluster state
+      // pointing solely to a node that's no longer part of our collection...
+      assertEquals(0, (new UpdateRequest().add("id", "1").commit(stale_client, COL)).getStatus());
+      assertEquals(1, stale_client.query(new SolrQuery("*:*")).getResults().getNumFound());
+      
+    }
+  }
+  
+
+  private static void checkSingleServer(NamedList<Object> response) {
+    final RouteResponse rr = (RouteResponse) response;
+    final Map<String,LBSolrClient.Req> routes = rr.getRoutes();
+    final Iterator<Map.Entry<String,LBSolrClient.Req>> it =
+        routes.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<String,LBSolrClient.Req> entry = it.next();
+        assertEquals("wrong number of servers: "+entry.getValue().getServers(),
+            1, entry.getValue().getServers().size());
+    }
+  }
+
+  /**
+   * Tests if the specification of 'preferReplicaTypes` in the query-params
+   * limits the distributed query to locally hosted shards only
+   */
+  @Test
+  // commented 15-Sep-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 2-Aug-2018
+  public void preferReplicaTypesTest() throws Exception {
+
+    String collectionName = "replicaTypesTestColl";
+
+    int liveNodes = cluster.getJettySolrRunners().size();
+
+    // For these tests we need to have multiple replica types.
+    // Hence the below configuration for our collection
+    int pullReplicas = Math.max(1, liveNodes - 2);
+    CollectionAdminRequest.createCollection(collectionName, "conf", liveNodes, 1, 1, pullReplicas)
+        .setMaxShardsPerNode(liveNodes)
+        .processAndWait(cluster.getSolrClient(), TIMEOUT);
+    cluster.waitForActiveCollection(collectionName, liveNodes, liveNodes * (2 + pullReplicas));
+    
+    // Add some new documents
+    new UpdateRequest()
+        .add(id, "0", "a_t", "hello1")
+        .add(id, "2", "a_t", "hello2")
+        .add(id, "3", "a_t", "hello2")
+        .commit(getRandomClient(), collectionName);
+
+    // Run the actual tests for 'shards.preference=replica.type:*'
+    queryWithPreferReplicaTypes(getRandomClient(), "PULL", false, collectionName);
+    queryWithPreferReplicaTypes(getRandomClient(), "PULL|TLOG", false, collectionName);
+    queryWithPreferReplicaTypes(getRandomClient(), "TLOG", false, collectionName);
+    queryWithPreferReplicaTypes(getRandomClient(), "TLOG|PULL", false, collectionName);
+    queryWithPreferReplicaTypes(getRandomClient(), "NRT", false, collectionName);
+    queryWithPreferReplicaTypes(getRandomClient(), "NRT|PULL", false, collectionName);
+    // Test to verify that preferLocalShards=true doesn't break this
+    queryWithPreferReplicaTypes(getRandomClient(), "PULL", true, collectionName);
+    queryWithPreferReplicaTypes(getRandomClient(), "PULL|TLOG", true, collectionName);
+    queryWithPreferReplicaTypes(getRandomClient(), "TLOG", true, collectionName);
+    queryWithPreferReplicaTypes(getRandomClient(), "TLOG|PULL", true, collectionName);
+    queryWithPreferReplicaTypes(getRandomClient(), "NRT", false, collectionName);
+    queryWithPreferReplicaTypes(getRandomClient(), "NRT|PULL", true, collectionName);
+  }
+
+  private void queryWithPreferReplicaTypes(CloudHttp2SolrClient cloudClient,
+                                           String preferReplicaTypes,
+                                           boolean preferLocalShards,
+                                           String collectionName)
+      throws Exception
+  {
+    SolrQuery qRequest = new SolrQuery("*:*");
+    ModifiableSolrParams qParams = new ModifiableSolrParams();
+
+    final List<String> preferredTypes = Arrays.asList(preferReplicaTypes.split("\\|"));
+    StringBuilder rule = new StringBuilder();
+    preferredTypes.forEach(type -> {
+      if (rule.length() != 0) {
+        rule.append(',');
+      }
+      rule.append(ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE);
+      rule.append(':');
+      rule.append(type);
+    });
+    if (preferLocalShards) {
+      if (rule.length() != 0) {
+        rule.append(',');
+      }
+      rule.append(ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION);
+      rule.append(":local");
+    }
+    qParams.add(ShardParams.SHARDS_PREFERENCE, rule.toString());  
+    qParams.add(ShardParams.SHARDS_INFO, "true");
+    qRequest.add(qParams);
+
+    // CloudSolrClient sends the request to some node.
+    // And since all the nodes are hosting cores from all shards, the
+    // distributed query formed by this node will select cores from the
+    // local shards only
+    QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
+
+    Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
+    assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
+
+    Map<String, String> replicaTypeMap = new HashMap<>();
+    DocCollection collection = getCollectionState(collectionName);
+    for (Slice slice : collection.getSlices()) {
+      for (Replica replica : slice.getReplicas()) {
+        String coreUrl = replica.getCoreUrl();
+        // It seems replica reports its core URL with a trailing slash while shard
+        // info returned from the query doesn't. Oh well.
+        if (coreUrl.endsWith("/")) {
+          coreUrl = coreUrl.substring(0, coreUrl.length() - 1);
+        }
+        replicaTypeMap.put(coreUrl, replica.getType().toString());
+      }
+    }
+
+    // Iterate over shards-info and check that replicas of correct type responded
+    SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
+    Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
+    List<String> shardAddresses = new ArrayList<String>();
+    while (itr.hasNext()) {
+      Map.Entry<String, ?> e = itr.next();
+      assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
+      String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
+      assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress);
+      assertTrue(replicaTypeMap.containsKey(shardAddress));
+      assertTrue(preferredTypes.indexOf(replicaTypeMap.get(shardAddress)) == 0);
+      shardAddresses.add(shardAddress);
+    }
+    assertTrue("No responses", shardAddresses.size() > 0);
+    log.info("Shards giving the response: " + Arrays.toString(shardAddresses.toArray()));
+  }
+
+}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
index f3920b0..92c5c62 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
@@ -108,7 +108,7 @@ public class CloudSolrClientCacheTest extends SolrTestCaseJ4 {
   private LBHttpSolrClient getMockLbHttpSolrClient(Map<String, Function> responses) throws Exception {
     LBHttpSolrClient mockLbclient = mock(LBHttpSolrClient.class);
 
-    when(mockLbclient.request(any(LBHttpSolrClient.Req.class))).then(invocationOnMock -> {
+    when(mockLbclient.request(any(LBSolrClient.Req.class))).then(invocationOnMock -> {
       LBHttpSolrClient.Req req = invocationOnMock.getArgument(0);
       Function f = responses.get("request");
       if (f == null) return null;
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
index 8b05c38..07cde2f 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
@@ -86,6 +86,7 @@ import org.apache.lucene.util.TestUtil;
 import org.apache.solr.client.solrj.ResponseParser;
 import org.apache.solr.client.solrj.embedded.JettyConfig;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.ClusterStateProvider;
 import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
@@ -2280,6 +2281,61 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
   }
 
   /**
+   * A variant of {@link  org.apache.solr.client.solrj.impl.CloudHttp2SolrClient.Builder} that will randomize
+   * some internal settings.
+   */
+  public static class CloudHttp2SolrClientBuilder extends CloudHttp2SolrClient.Builder {
+
+    public CloudHttp2SolrClientBuilder(List<String> zkHosts, Optional<String> zkChroot) {
+      super(zkHosts, zkChroot);
+      randomizeCloudSolrClient();
+    }
+
+    public CloudHttp2SolrClientBuilder(ClusterStateProvider stateProvider) {
+      super(new ArrayList<>());
+      this.stateProvider = stateProvider;
+      randomizeCloudSolrClient();
+    }
+
+    public CloudHttp2SolrClientBuilder(MiniSolrCloudCluster cluster) {
+      super(new ArrayList<>());
+      if (random().nextBoolean()) {
+        this.zkHosts.add(cluster.getZkServer().getZkAddress());
+      } else {
+        populateSolrUrls(cluster);
+      }
+
+      randomizeCloudSolrClient();
+    }
+
+    private void populateSolrUrls(MiniSolrCloudCluster cluster) {
+      if (random().nextBoolean()) {
+        final List<JettySolrRunner> solrNodes = cluster.getJettySolrRunners();
+        for (JettySolrRunner node : solrNodes) {
+          this.solrUrls.add(node.getBaseUrl().toString());
+        }
+      } else {
+        this.solrUrls.add(cluster.getRandomJetty(random()).getBaseUrl().toString());
+      }
+    }
+
+    private void randomizeCloudSolrClient() {
+      this.directUpdatesToLeadersOnly = random().nextBoolean();
+      this.shardLeadersOnly = random().nextBoolean();
+      this.parallelUpdates = random().nextBoolean();
+    }
+  }
+
+  /**
+   * This method <i>may</i> randomize unspecified aspects of the resulting SolrClient.
+   * Tests that do not wish to have any randomized behavior should use the
+   * {@link org.apache.solr.client.solrj.impl.CloudHttp2SolrClient.Builder} class directly
+   */
+  public static CloudHttp2SolrClient getCloudHttp2SolrClient(MiniSolrCloudCluster cluster) {
+    return new CloudHttp2SolrClientBuilder(cluster).build();
+  }
+
+  /**
    * A variant of {@link  org.apache.solr.client.solrj.impl.CloudSolrClient.Builder} that will randomize
    * some internal settings.
    */