You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2019/03/04 15:49:31 UTC

[lucene-solr] branch branch_8_0 updated: SOLR-13276: push back to 8.1

This is an automated email from the ASF dual-hosted git repository.

datcm pushed a commit to branch branch_8_0
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8_0 by this push:
     new 2f7dc0d  SOLR-13276: push back to 8.1
2f7dc0d is described below

commit 2f7dc0d7566afa062cc6cf47ceb007fa7ab13c2c
Author: Cao Manh Dat <da...@apache.org>
AuthorDate: Mon Mar 4 15:49:22 2019 +0000

    SOLR-13276: push back to 8.1
---
 solr/CHANGES.txt                                   |    2 -
 .../client/solrj/impl/BaseCloudSolrClient.java     | 1244 --------------------
 .../solrj/impl/BaseHttpClusterStateProvider.java   |  309 -----
 .../solr/client/solrj/impl/BaseHttpSolrClient.java |   80 --
 .../client/solrj/impl/CloudHttp2SolrClient.java    |  237 ----
 .../solr/client/solrj/impl/CloudSolrClient.java    | 1186 ++++++++++++++++++-
 .../solrj/impl/Http2ClusterStateProvider.java      |   46 -
 .../solr/client/solrj/impl/Http2SolrClient.java    |   83 +-
 .../solrj/impl/HttpClusterStateProvider.java       |  301 ++++-
 .../solr/client/solrj/impl/HttpSolrClient.java     |   52 +-
 .../solr/client/solrj/request/UpdateRequest.java   |   76 +-
 .../impl/CloudHttp2SolrClientBadInputTest.java     |   73 --
 .../impl/CloudHttp2SolrClientBuilderTest.java      |   84 --
 .../CloudHttp2SolrClientMultiConstructorTest.java  |   85 --
 .../solrj/impl/CloudHttp2SolrClientRetryTest.java  |   83 --
 .../solrj/impl/CloudHttp2SolrClientTest.java       |  978 ---------------
 .../solrj/impl/CloudSolrClientCacheTest.java       |    2 +-
 .../src/java/org/apache/solr/SolrTestCaseJ4.java   |   56 -
 18 files changed, 1577 insertions(+), 3400 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 0bc0479..6d46019 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -173,8 +173,6 @@ New Features
 
 * SOLR-13241: Add 'autoscaling' tool support to solr.cmd (Jason Gerlowski)
 
-* SOLR-13276: Adding Http2 equivalent classes of CloudSolrClient and HttpClusterStateProvider (Cao Manh Dat)
-
 Bug Fixes
 ----------------------
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
deleted file mode 100644
index 7ae1e02..0000000
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
+++ /dev/null
@@ -1,1244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.client.solrj.impl;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.net.ConnectException;
-import java.net.SocketException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Supplier;
-
-import org.apache.solr.client.solrj.ResponseParser;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.V2RequestSupport;
-import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
-import org.apache.solr.client.solrj.request.IsUpdateRequest;
-import org.apache.solr.client.solrj.request.RequestWriter;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.request.V2Request;
-import org.apache.solr.client.solrj.util.ClientUtils;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.ToleratedUpdateError;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.CollectionStatePredicate;
-import org.apache.solr.common.cloud.CollectionStateWatcher;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.DocRouter;
-import org.apache.solr.common.cloud.ImplicitDocRouter;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.ShardParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.Hash;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SimpleOrderedMap;
-import org.apache.solr.common.util.SolrjNamedThreadFactory;
-import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.common.util.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
-import static org.apache.solr.common.params.CommonParams.ID;
-
-public abstract class BaseCloudSolrClient extends SolrClient {
-
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private volatile String defaultCollection;
-  //no of times collection state to be reloaded if stale state error is received
-  private static final int MAX_STALE_RETRIES = Integer.parseInt(System.getProperty("cloudSolrClientMaxStaleRetries", "5"));
-  private Random rand = new Random();
-
-  private final boolean updatesToLeaders;
-  private final boolean directUpdatesToLeadersOnly;
-  boolean parallelUpdates; //TODO final
-  private ExecutorService threadPool = ExecutorUtil
-      .newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory(
-          "CloudSolrClient ThreadPool"));
-  private String idField = ID;
-  public static final String STATE_VERSION = "_stateVer_";
-  private long retryExpiryTime = TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS);//3 seconds or 3 million nanos
-  private final Set<String> NON_ROUTABLE_PARAMS;
-  {
-    NON_ROUTABLE_PARAMS = new HashSet<>();
-    NON_ROUTABLE_PARAMS.add(UpdateParams.EXPUNGE_DELETES);
-    NON_ROUTABLE_PARAMS.add(UpdateParams.MAX_OPTIMIZE_SEGMENTS);
-    NON_ROUTABLE_PARAMS.add(UpdateParams.COMMIT);
-    NON_ROUTABLE_PARAMS.add(UpdateParams.WAIT_SEARCHER);
-    NON_ROUTABLE_PARAMS.add(UpdateParams.OPEN_SEARCHER);
-
-    NON_ROUTABLE_PARAMS.add(UpdateParams.SOFT_COMMIT);
-    NON_ROUTABLE_PARAMS.add(UpdateParams.PREPARE_COMMIT);
-    NON_ROUTABLE_PARAMS.add(UpdateParams.OPTIMIZE);
-
-    // Not supported via SolrCloud
-    // NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
-
-  }
-  private volatile List<Object> locks = objectList(3);
-
-
-  static class StateCache extends ConcurrentHashMap<String, ExpiringCachedDocCollection> {
-    final AtomicLong puts = new AtomicLong();
-    final AtomicLong hits = new AtomicLong();
-    final Lock evictLock = new ReentrantLock(true);
-    protected volatile long timeToLive = 60 * 1000L;
-
-    @Override
-    public ExpiringCachedDocCollection get(Object key) {
-      ExpiringCachedDocCollection val = super.get(key);
-      if(val == null) {
-        // a new collection is likely to be added now.
-        //check if there are stale items and remove them
-        evictStale();
-        return null;
-      }
-      if(val.isExpired(timeToLive)) {
-        super.remove(key);
-        return null;
-      }
-      hits.incrementAndGet();
-      return val;
-    }
-
-    @Override
-    public ExpiringCachedDocCollection put(String key, ExpiringCachedDocCollection value) {
-      puts.incrementAndGet();
-      return super.put(key, value);
-    }
-
-    void evictStale() {
-      if(!evictLock.tryLock()) return;
-      try {
-        for (Entry<String, ExpiringCachedDocCollection> e : entrySet()) {
-          if(e.getValue().isExpired(timeToLive)){
-            super.remove(e.getKey());
-          }
-        }
-      } finally {
-        evictLock.unlock();
-      }
-    }
-
-  }
-
-  /**
-   * This is the time to wait to refetch the state
-   * after getting the same state version from ZK
-   * <p>
-   * secs
-   */
-  public void setRetryExpiryTime(int secs) {
-    this.retryExpiryTime = TimeUnit.NANOSECONDS.convert(secs, TimeUnit.SECONDS);
-  }
-
-  protected final StateCache collectionStateCache = new StateCache();
-
-  class ExpiringCachedDocCollection {
-    final DocCollection cached;
-    final long cachedAt;
-    //This is the time at which the collection is retried and got the same old version
-    volatile long retriedAt = -1;
-    //flag that suggests that this is potentially to be rechecked
-    volatile boolean maybeStale = false;
-
-    ExpiringCachedDocCollection(DocCollection cached) {
-      this.cached = cached;
-      this.cachedAt = System.nanoTime();
-    }
-
-    boolean isExpired(long timeToLiveMs) {
-      return (System.nanoTime() - cachedAt)
-          > TimeUnit.NANOSECONDS.convert(timeToLiveMs, TimeUnit.MILLISECONDS);
-    }
-
-    boolean shouldRetry() {
-      if (maybeStale) {// we are not sure if it is stale so check with retry time
-        if ((retriedAt == -1 ||
-            (System.nanoTime() - retriedAt) > retryExpiryTime)) {
-          return true;// we retried a while back. and we could not get anything new.
-          //it's likely that it is not going to be available now also.
-        }
-      }
-      return false;
-    }
-
-    void setRetriedAt() {
-      retriedAt = System.nanoTime();
-    }
-  }
-
-  protected BaseCloudSolrClient(boolean updatesToLeaders, boolean parallelUpdates, boolean directUpdatesToLeadersOnly) {
-    this.updatesToLeaders = updatesToLeaders;
-    this.parallelUpdates = parallelUpdates;
-    this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly;
-  }
-
-  /** Sets the cache ttl for DocCollection Objects cached  . This is only applicable for collections which are persisted outside of clusterstate.json
-   * @param seconds ttl value in seconds
-   */
-  public void setCollectionCacheTTl(int seconds){
-    assert seconds > 0;
-    this.collectionStateCache.timeToLive = seconds * 1000L;
-  }
-
-  protected abstract LBSolrClient getLbClient();
-
-  public abstract ClusterStateProvider getClusterStateProvider();
-
-  protected abstract boolean wasCommError(Throwable t);
-
-  @Override
-  public void close() throws IOException {
-    if(this.threadPool != null && !this.threadPool.isShutdown()) {
-      this.threadPool.shutdown();
-    }
-  }
-
-  public ResponseParser getParser() {
-    return getLbClient().getParser();
-  }
-
-  /**
-   * Note: This setter method is <b>not thread-safe</b>.
-   *
-   * @param processor
-   *          Default Response Parser chosen to parse the response if the parser
-   *          were not specified as part of the request.
-   * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser()
-   */
-  public void setParser(ResponseParser processor) {
-    getLbClient().setParser(processor);
-  }
-
-  public RequestWriter getRequestWriter() {
-    return getLbClient().getRequestWriter();
-  }
-
-  public void setRequestWriter(RequestWriter requestWriter) {
-    getLbClient().setRequestWriter(requestWriter);
-  }
-
-  /**
-   * @return the zkHost value used to connect to zookeeper.
-   */
-  public String getZkHost() {
-    return assertZKStateProvider().zkHost;
-  }
-
-  public ZkStateReader getZkStateReader() {
-    if (getClusterStateProvider() instanceof ZkClientClusterStateProvider) {
-      ZkClientClusterStateProvider provider = (ZkClientClusterStateProvider) getClusterStateProvider();
-      getClusterStateProvider().connect();
-      return provider.zkStateReader;
-    }
-    throw new IllegalStateException("This has no Zk stateReader");
-  }
-
-  /**
-   * @param idField the field to route documents on.
-   */
-  public void setIdField(String idField) {
-    this.idField = idField;
-  }
-
-  /**
-   * @return the field that updates are routed on.
-   */
-  public String getIdField() {
-    return idField;
-  }
-
-  /** Sets the default collection for request */
-  public void setDefaultCollection(String collection) {
-    this.defaultCollection = collection;
-  }
-
-  /** Gets the default collection for request */
-  public String getDefaultCollection() {
-    return defaultCollection;
-  }
-
-  /** Set the connect timeout to the zookeeper ensemble in ms */
-  public void setZkConnectTimeout(int zkConnectTimeout) {
-    assertZKStateProvider().zkConnectTimeout = zkConnectTimeout;
-  }
-
-  /** Set the timeout to the zookeeper ensemble in ms */
-  public void setZkClientTimeout(int zkClientTimeout) {
-    assertZKStateProvider().zkClientTimeout = zkClientTimeout;
-  }
-
-  /** Gets whether direct updates are sent in parallel */
-  public boolean isParallelUpdates() {
-    return parallelUpdates;
-  }
-
-  /**
-   * Connect to the zookeeper ensemble.
-   * This is an optional method that may be used to force a connect before any other requests are sent.
-   */
-  public void connect() {
-    getClusterStateProvider().connect();
-  }
-
-  /**
-   * Connect to a cluster.  If the cluster is not ready, retry connection up to a given timeout.
-   * @param duration the timeout
-   * @param timeUnit the units of the timeout
-   * @throws TimeoutException if the cluster is not ready after the timeout
-   * @throws InterruptedException if the wait is interrupted
-   */
-  public void connect(long duration, TimeUnit timeUnit) throws TimeoutException, InterruptedException {
-    log.info("Waiting for {} {} for cluster at {} to be ready", duration, timeUnit, getClusterStateProvider());
-    long timeout = System.nanoTime() + timeUnit.toNanos(duration);
-    while (System.nanoTime() < timeout) {
-      try {
-        connect();
-        log.info("Cluster at {} ready", getClusterStateProvider());
-        return;
-      }
-      catch (RuntimeException e) {
-        // not ready yet, then...
-      }
-      TimeUnit.MILLISECONDS.sleep(250);
-    }
-    throw new TimeoutException("Timed out waiting for cluster");
-  }
-
-  private ZkClientClusterStateProvider assertZKStateProvider() {
-    if (getClusterStateProvider() instanceof ZkClientClusterStateProvider) {
-      return (ZkClientClusterStateProvider) getClusterStateProvider();
-    }
-    throw new IllegalArgumentException("This client does not use ZK");
-
-  }
-
-  /**
-   * Block until a collection state matches a predicate, or a timeout
-   *
-   * Note that the predicate may be called again even after it has returned true, so
-   * implementors should avoid changing state within the predicate call itself.
-   *
-   * @param collection the collection to watch
-   * @param wait       how long to wait
-   * @param unit       the units of the wait parameter
-   * @param predicate  a {@link CollectionStatePredicate} to check the collection state
-   * @throws InterruptedException on interrupt
-   * @throws TimeoutException     on timeout
-   */
-  public void waitForState(String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
-      throws InterruptedException, TimeoutException {
-    getClusterStateProvider().connect();
-    assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, predicate);
-  }
-
-  /**
-   * Register a CollectionStateWatcher to be called when the cluster state for a collection changes
-   *
-   * Note that the watcher is unregistered after it has been called once.  To make a watcher persistent,
-   * it should re-register itself in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)}
-   * call
-   *
-   * @param collection the collection to watch
-   * @param watcher    a watcher that will be called when the state changes
-   */
-  public void registerCollectionStateWatcher(String collection, CollectionStateWatcher watcher) {
-    getClusterStateProvider().connect();
-    assertZKStateProvider().zkStateReader.registerCollectionStateWatcher(collection, watcher);
-  }
-
-  private NamedList<Object> directUpdate(AbstractUpdateRequest request, String collection) throws SolrServerException {
-    UpdateRequest updateRequest = (UpdateRequest) request;
-    SolrParams params = request.getParams();
-    ModifiableSolrParams routableParams = new ModifiableSolrParams();
-    ModifiableSolrParams nonRoutableParams = new ModifiableSolrParams();
-
-    if(params != null) {
-      nonRoutableParams.add(params);
-      routableParams.add(params);
-      for(String param : NON_ROUTABLE_PARAMS) {
-        routableParams.remove(param);
-      }
-    }
-
-    if (collection == null) {
-      throw new SolrServerException("No collection param specified on request and no default collection has been set.");
-    }
-
-    //Check to see if the collection is an alias.
-    List<String> aliasedCollections = getClusterStateProvider().resolveAlias(collection);
-    collection = aliasedCollections.get(0); // pick 1st (consistent with HttpSolrCall behavior)
-
-    DocCollection col = getDocCollection(collection, null);
-
-    DocRouter router = col.getRouter();
-
-    if (router instanceof ImplicitDocRouter) {
-      // short circuit as optimization
-      return null;
-    }
-
-    //Create the URL map, which is keyed on slice name.
-    //The value is a list of URLs for each replica in the slice.
-    //The first value in the list is the leader for the slice.
-    final Map<String,List<String>> urlMap = buildUrlMap(col);
-    final Map<String, ? extends LBSolrClient.Req> routes = createRoutes(updateRequest, routableParams, col, router, urlMap, idField);
-    if (routes == null) {
-      if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(updateRequest, idField)) {
-        // we have info (documents with ids and/or ids to delete) with
-        // which to find the leaders but we could not find (all of) them
-        throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
-            "directUpdatesToLeadersOnly==true but could not find leader(s)");
-      } else {
-        // we could not find a leader or routes yet - use unoptimized general path
-        return null;
-      }
-    }
-
-    final NamedList<Throwable> exceptions = new NamedList<>();
-    final NamedList<NamedList> shardResponses = new NamedList<>(routes.size()+1); // +1 for deleteQuery
-
-    long start = System.nanoTime();
-
-    if (parallelUpdates) {
-      final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<>(routes.size());
-      for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : routes.entrySet()) {
-        final String url = entry.getKey();
-        final LBSolrClient.Req lbRequest = entry.getValue();
-        try {
-          MDC.put("CloudSolrClient.url", url);
-          responseFutures.put(url, threadPool.submit(() -> {
-            return getLbClient().request(lbRequest).getResponse();
-          }));
-        } finally {
-          MDC.remove("CloudSolrClient.url");
-        }
-      }
-
-      for (final Map.Entry<String, Future<NamedList<?>>> entry: responseFutures.entrySet()) {
-        final String url = entry.getKey();
-        final Future<NamedList<?>> responseFuture = entry.getValue();
-        try {
-          shardResponses.add(url, responseFuture.get());
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
-        } catch (ExecutionException e) {
-          exceptions.add(url, e.getCause());
-        }
-      }
-
-      if (exceptions.size() > 0) {
-        Throwable firstException = exceptions.getVal(0);
-        if(firstException instanceof SolrException) {
-          SolrException e = (SolrException) firstException;
-          throw getRouteException(SolrException.ErrorCode.getErrorCode(e.code()), exceptions, routes);
-        } else {
-          throw getRouteException(SolrException.ErrorCode.SERVER_ERROR, exceptions, routes);
-        }
-      }
-    } else {
-      for (Map.Entry<String, ? extends LBSolrClient.Req> entry : routes.entrySet()) {
-        String url = entry.getKey();
-        LBSolrClient.Req lbRequest = entry.getValue();
-        try {
-          NamedList<Object> rsp = getLbClient().request(lbRequest).getResponse();
-          shardResponses.add(url, rsp);
-        } catch (Exception e) {
-          if(e instanceof SolrException) {
-            throw (SolrException) e;
-          } else {
-            throw new SolrServerException(e);
-          }
-        }
-      }
-    }
-
-    UpdateRequest nonRoutableRequest = null;
-    List<String> deleteQuery = updateRequest.getDeleteQuery();
-    if (deleteQuery != null && deleteQuery.size() > 0) {
-      UpdateRequest deleteQueryRequest = new UpdateRequest();
-      deleteQueryRequest.setDeleteQuery(deleteQuery);
-      nonRoutableRequest = deleteQueryRequest;
-    }
-
-    Set<String> paramNames = nonRoutableParams.getParameterNames();
-
-    Set<String> intersection = new HashSet<>(paramNames);
-    intersection.retainAll(NON_ROUTABLE_PARAMS);
-
-    if (nonRoutableRequest != null || intersection.size() > 0) {
-      if (nonRoutableRequest == null) {
-        nonRoutableRequest = new UpdateRequest();
-      }
-      nonRoutableRequest.setParams(nonRoutableParams);
-      nonRoutableRequest.setBasicAuthCredentials(request.getBasicAuthUser(), request.getBasicAuthPassword());
-      List<String> urlList = new ArrayList<>();
-      urlList.addAll(routes.keySet());
-      Collections.shuffle(urlList, rand);
-      LBSolrClient.Req req = new LBSolrClient.Req(nonRoutableRequest, urlList);
-      try {
-        LBSolrClient.Rsp rsp = getLbClient().request(req);
-        shardResponses.add(urlList.get(0), rsp.getResponse());
-      } catch (Exception e) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, urlList.get(0), e);
-      }
-    }
-
-    long end = System.nanoTime();
-
-    RouteResponse rr = condenseResponse(shardResponses, (int) TimeUnit.MILLISECONDS.convert(end - start, TimeUnit.NANOSECONDS));
-    rr.setRouteResponses(shardResponses);
-    rr.setRoutes(routes);
-    return rr;
-  }
-
-  protected RouteException getRouteException(SolrException.ErrorCode serverError, NamedList<Throwable> exceptions, Map<String, ? extends LBSolrClient.Req> routes) {
-    return new RouteException(serverError, exceptions, routes);
-  }
-
-  protected Map<String, ? extends LBSolrClient.Req> createRoutes(UpdateRequest updateRequest, ModifiableSolrParams routableParams,
-                                                       DocCollection col, DocRouter router, Map<String, List<String>> urlMap,
-                                                       String idField) {
-    return urlMap == null ? null : updateRequest.getRoutesToCollection(router, col, urlMap, routableParams, idField);
-  }
-
-  private Map<String,List<String>> buildUrlMap(DocCollection col) {
-    Map<String, List<String>> urlMap = new HashMap<>();
-    Slice[] slices = col.getActiveSlicesArr();
-    for (Slice slice : slices) {
-      String name = slice.getName();
-      List<String> urls = new ArrayList<>();
-      Replica leader = slice.getLeader();
-      if (directUpdatesToLeadersOnly && leader == null) {
-        for (Replica replica : slice.getReplicas(
-            replica -> replica.isActive(getClusterStateProvider().getLiveNodes())
-                && replica.getType() == Replica.Type.NRT)) {
-          leader = replica;
-          break;
-        }
-      }
-      if (leader == null) {
-        if (directUpdatesToLeadersOnly) {
-          continue;
-        }
-        // take unoptimized general path - we cannot find a leader yet
-        return null;
-      }
-      ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
-      String url = zkProps.getCoreUrl();
-      urls.add(url);
-      if (!directUpdatesToLeadersOnly) {
-        for (Replica replica : slice.getReplicas()) {
-          if (!replica.getNodeName().equals(leader.getNodeName()) &&
-              !replica.getName().equals(leader.getName())) {
-            ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
-            String url1 = zkProps1.getCoreUrl();
-            urls.add(url1);
-          }
-        }
-      }
-      urlMap.put(name, urls);
-    }
-    return urlMap;
-  }
-
-  protected <T extends RouteResponse> T condenseResponse(NamedList response, int timeMillis, Supplier<T> supplier) {
-    T condensed = supplier.get();
-    int status = 0;
-    Integer rf = null;
-    Integer minRf = null;
-
-    // TolerantUpdateProcessor
-    List<SimpleOrderedMap<String>> toleratedErrors = null;
-    int maxToleratedErrors = Integer.MAX_VALUE;
-
-    // For "adds", "deletes", "deleteByQuery" etc.
-    Map<String, NamedList> versions = new HashMap<>();
-
-    for(int i=0; i<response.size(); i++) {
-      NamedList shardResponse = (NamedList)response.getVal(i);
-      NamedList header = (NamedList)shardResponse.get("responseHeader");
-      Integer shardStatus = (Integer)header.get("status");
-      int s = shardStatus.intValue();
-      if(s > 0) {
-        status = s;
-      }
-      Object rfObj = header.get(UpdateRequest.REPFACT);
-      if (rfObj != null && rfObj instanceof Integer) {
-        Integer routeRf = (Integer)rfObj;
-        if (rf == null || routeRf < rf)
-          rf = routeRf;
-      }
-      minRf = (Integer)header.get(UpdateRequest.MIN_REPFACT);
-
-      List<SimpleOrderedMap<String>> shardTolerantErrors =
-          (List<SimpleOrderedMap<String>>) header.get("errors");
-      if (null != shardTolerantErrors) {
-        Integer shardMaxToleratedErrors = (Integer) header.get("maxErrors");
-        assert null != shardMaxToleratedErrors : "TolerantUpdateProcessor reported errors but not maxErrors";
-        // if we get into some weird state where the nodes disagree about the effective maxErrors,
-        // assume the min value seen to decide if we should fail.
-        maxToleratedErrors = Math.min(maxToleratedErrors,
-            ToleratedUpdateError.getEffectiveMaxErrors(shardMaxToleratedErrors.intValue()));
-
-        if (null == toleratedErrors) {
-          toleratedErrors = new ArrayList<SimpleOrderedMap<String>>(shardTolerantErrors.size());
-        }
-        for (SimpleOrderedMap<String> err : shardTolerantErrors) {
-          toleratedErrors.add(err);
-        }
-      }
-      for (String updateType: Arrays.asList("adds", "deletes", "deleteByQuery")) {
-        Object obj = shardResponse.get(updateType);
-        if (obj instanceof NamedList) {
-          NamedList versionsList = versions.containsKey(updateType) ?
-              versions.get(updateType): new NamedList();
-          versionsList.addAll((NamedList)obj);
-          versions.put(updateType, versionsList);
-        }
-      }
-    }
-
-    NamedList cheader = new NamedList();
-    cheader.add("status", status);
-    cheader.add("QTime", timeMillis);
-    if (rf != null)
-      cheader.add(UpdateRequest.REPFACT, rf);
-    if (minRf != null)
-      cheader.add(UpdateRequest.MIN_REPFACT, minRf);
-    if (null != toleratedErrors) {
-      cheader.add("maxErrors", ToleratedUpdateError.getUserFriendlyMaxErrors(maxToleratedErrors));
-      cheader.add("errors", toleratedErrors);
-      if (maxToleratedErrors < toleratedErrors.size()) {
-        // cumulative errors are too high, we need to throw a client exception w/correct metadata
-
-        // NOTE: it shouldn't be possible for 1 == toleratedErrors.size(), because if that were the case
-        // then at least one shard should have thrown a real error before this, so we don't worry
-        // about having a more "singular" exception msg for that situation
-        StringBuilder msgBuf =  new StringBuilder()
-            .append(toleratedErrors.size()).append(" Async failures during distributed update: ");
-
-        NamedList metadata = new NamedList<String>();
-        for (SimpleOrderedMap<String> err : toleratedErrors) {
-          ToleratedUpdateError te = ToleratedUpdateError.parseMap(err);
-          metadata.add(te.getMetadataKey(), te.getMetadataValue());
-
-          msgBuf.append("\n").append(te.getMessage());
-        }
-
-        SolrException toThrow = new SolrException(SolrException.ErrorCode.BAD_REQUEST, msgBuf.toString());
-        toThrow.setMetadata(metadata);
-        throw toThrow;
-      }
-    }
-    for (String updateType: versions.keySet()) {
-      condensed.add(updateType, versions.get(updateType));
-    }
-    condensed.add("responseHeader", cheader);
-    return condensed;
-  }
-
-  public RouteResponse condenseResponse(NamedList response, int timeMillis) {
-    return condenseResponse(response, timeMillis, RouteResponse::new);
-  }
-
-  public static class RouteResponse<T extends LBSolrClient.Req> extends NamedList {
-    private NamedList routeResponses;
-    private Map<String, T> routes;
-
-    public void setRouteResponses(NamedList routeResponses) {
-      this.routeResponses = routeResponses;
-    }
-
-    public NamedList getRouteResponses() {
-      return routeResponses;
-    }
-
-    public void setRoutes(Map<String, T> routes) {
-      this.routes = routes;
-    }
-
-    public Map<String, T> getRoutes() {
-      return routes;
-    }
-
-  }
-
-  public static class RouteException extends SolrException {
-
-    private NamedList<Throwable> throwables;
-    private Map<String, ? extends LBSolrClient.Req> routes;
-
-    public RouteException(ErrorCode errorCode, NamedList<Throwable> throwables, Map<String, ? extends LBSolrClient.Req> routes){
-      super(errorCode, throwables.getVal(0).getMessage(), throwables.getVal(0));
-      this.throwables = throwables;
-      this.routes = routes;
-
-      // create a merged copy of the metadata from all wrapped exceptions
-      NamedList<String> metadata = new NamedList<String>();
-      for (int i = 0; i < throwables.size(); i++) {
-        Throwable t = throwables.getVal(i);
-        if (t instanceof SolrException) {
-          SolrException e = (SolrException) t;
-          NamedList<String> eMeta = e.getMetadata();
-          if (null != eMeta) {
-            metadata.addAll(eMeta);
-          }
-        }
-      }
-      if (0 < metadata.size()) {
-        this.setMetadata(metadata);
-      }
-    }
-
-    public NamedList<Throwable> getThrowables() {
-      return throwables;
-    }
-
-    public Map<String, ? extends LBSolrClient.Req> getRoutes() {
-      return this.routes;
-    }
-  }
-
-  @Override
-  public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
-    // the collection parameter of the request overrides that of the parameter to this method
-    String requestCollection = request.getCollection();
-    if (requestCollection != null) {
-      collection = requestCollection;
-    } else if (collection == null) {
-      collection = defaultCollection;
-    }
-    List<String> inputCollections =
-        collection == null ? Collections.emptyList() : StrUtils.splitSmart(collection, ",", true);
-    return requestWithRetryOnStaleState(request, 0, inputCollections);
-  }
-
-  /**
-   * As this class doesn't watch external collections on the client side,
-   * there's a chance that the request will fail due to cached stale state,
-   * which means the state must be refreshed from ZK and retried.
-   */
-  protected NamedList<Object> requestWithRetryOnStaleState(SolrRequest request, int retryCount, List<String> inputCollections)
-      throws SolrServerException, IOException {
-    connect(); // important to call this before you start working with the ZkStateReader
-
-    // build up a _stateVer_ param to pass to the server containing all of the
-    // external collection state versions involved in this request, which allows
-    // the server to notify us that our cached state for one or more of the external
-    // collections is stale and needs to be refreshed ... this code has no impact on internal collections
-    String stateVerParam = null;
-    List<DocCollection> requestedCollections = null;
-    boolean isCollectionRequestOfV2 = false;
-    if (request instanceof V2RequestSupport) {
-      request = ((V2RequestSupport) request).getV2Request();
-    }
-    if (request instanceof V2Request) {
-      isCollectionRequestOfV2 = ((V2Request) request).isPerCollectionRequest();
-    }
-    boolean isAdmin = ADMIN_PATHS.contains(request.getPath());
-    if (!inputCollections.isEmpty() && !isAdmin && !isCollectionRequestOfV2) { // don't do _stateVer_ checking for admin, v2 api requests
-      Set<String> requestedCollectionNames = resolveAliases(inputCollections);
-
-      StringBuilder stateVerParamBuilder = null;
-      for (String requestedCollection : requestedCollectionNames) {
-        // track the version of state we're using on the client side using the _stateVer_ param
-        DocCollection coll = getDocCollection(requestedCollection, null);
-        if (coll == null) {
-          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + requestedCollection);
-        }
-        int collVer = coll.getZNodeVersion();
-        if (coll.getStateFormat()>1) {
-          if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
-          requestedCollections.add(coll);
-
-          if (stateVerParamBuilder == null) {
-            stateVerParamBuilder = new StringBuilder();
-          } else {
-            stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
-          }
-
-          stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
-        }
-      }
-
-      if (stateVerParamBuilder != null) {
-        stateVerParam = stateVerParamBuilder.toString();
-      }
-    }
-
-    if (request.getParams() instanceof ModifiableSolrParams) {
-      ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
-      if (stateVerParam != null) {
-        params.set(STATE_VERSION, stateVerParam);
-      } else {
-        params.remove(STATE_VERSION);
-      }
-    } // else: ??? how to set this ???
-
-    NamedList<Object> resp = null;
-    try {
-      resp = sendRequest(request, inputCollections);
-      //to avoid an O(n) operation we always add STATE_VERSION to the last and try to read it from there
-      Object o = resp == null || resp.size() == 0 ? null : resp.get(STATE_VERSION, resp.size() - 1);
-      if(o != null && o instanceof Map) {
-        //remove this because no one else needs this and tests would fail if they are comparing responses
-        resp.remove(resp.size()-1);
-        Map invalidStates = (Map) o;
-        for (Object invalidEntries : invalidStates.entrySet()) {
-          Map.Entry e = (Map.Entry) invalidEntries;
-          getDocCollection((String) e.getKey(), (Integer) e.getValue());
-        }
-
-      }
-    } catch (Exception exc) {
-
-      Throwable rootCause = SolrException.getRootCause(exc);
-      // don't do retry support for admin requests
-      // or if the request doesn't have a collection specified
-      // or request is v2 api and its method is not GET
-      if (inputCollections.isEmpty() || isAdmin || (request instanceof V2Request && request.getMethod() != SolrRequest.METHOD.GET)) {
-        if (exc instanceof SolrServerException) {
-          throw (SolrServerException)exc;
-        } else if (exc instanceof IOException) {
-          throw (IOException)exc;
-        }else if (exc instanceof RuntimeException) {
-          throw (RuntimeException) exc;
-        }
-        else {
-          throw new SolrServerException(rootCause);
-        }
-      }
-
-      int errorCode = (rootCause instanceof SolrException) ?
-          ((SolrException)rootCause).code() : SolrException.ErrorCode.UNKNOWN.code;
-
-      boolean wasCommError =
-          (rootCause instanceof ConnectException ||
-              rootCause instanceof SocketException ||
-              wasCommError(rootCause));
-
-      log.error("Request to collection {} failed due to (" + errorCode + ") {}, retry={} commError={} errorCode={} ",
-          inputCollections, rootCause.toString(), retryCount, wasCommError, errorCode);
-
-      if (wasCommError
-          || (exc instanceof RouteException && (errorCode == 503)) // 404 because the core does not exist 503 service unavailable
-        //TODO there are other reasons for 404. We need to change the solr response format from HTML to structured data to know that
-      ) {
-        // it was a communication error. it is likely that
-        // the node to which the request to be sent is down . So , expire the state
-        // so that the next attempt would fetch the fresh state
-        // just re-read state for all of them, if it has not been retried
-        // in retryExpiryTime time
-        if (requestedCollections != null) {
-          for (DocCollection ext : requestedCollections) {
-            ExpiringCachedDocCollection cacheEntry = collectionStateCache.get(ext.getName());
-            if (cacheEntry == null) continue;
-            cacheEntry.maybeStale = true;
-          }
-        }
-        if (retryCount < MAX_STALE_RETRIES) {//if it is a communication error , we must try again
-          //may be, we have a stale version of the collection state
-          // and we could not get any information from the server
-          //it is probably not worth trying again and again because
-          // the state would not have been updated
-          log.info("trying request again");
-          return requestWithRetryOnStaleState(request, retryCount + 1, inputCollections);
-        }
-      } else {
-        log.info("request was not communication error it seems");
-      }
-
-      boolean stateWasStale = false;
-      if (retryCount < MAX_STALE_RETRIES  &&
-          requestedCollections != null    &&
-          !requestedCollections.isEmpty() &&
-          (SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE || errorCode == 404))
-      {
-        // cached state for one or more external collections was stale
-        // re-issue request using updated state
-        stateWasStale = true;
-
-        // just re-read state for all of them, which is a little heavy handed but hopefully a rare occurrence
-        for (DocCollection ext : requestedCollections) {
-          collectionStateCache.remove(ext.getName());
-        }
-      }
-
-      // if we experienced a communication error, it's worth checking the state
-      // with ZK just to make sure the node we're trying to hit is still part of the collection
-      if (retryCount < MAX_STALE_RETRIES &&
-          !stateWasStale &&
-          requestedCollections != null &&
-          !requestedCollections.isEmpty() &&
-          wasCommError) {
-        for (DocCollection ext : requestedCollections) {
-          DocCollection latestStateFromZk = getDocCollection(ext.getName(), null);
-          if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
-            // looks like we couldn't reach the server because the state was stale == retry
-            stateWasStale = true;
-            // we just pulled state from ZK, so update the cache so that the retry uses it
-            collectionStateCache.put(ext.getName(), new ExpiringCachedDocCollection(latestStateFromZk));
-          }
-        }
-      }
-
-      if (requestedCollections != null) {
-        requestedCollections.clear(); // done with this
-      }
-
-      // if the state was stale, then we retry the request once with new state pulled from Zk
-      if (stateWasStale) {
-        log.warn("Re-trying request to collection(s) "+inputCollections+" after stale state error from server.");
-        resp = requestWithRetryOnStaleState(request, retryCount+1, inputCollections);
-      } else {
-        if (exc instanceof SolrException || exc instanceof SolrServerException || exc instanceof IOException) {
-          throw exc;
-        } else {
-          throw new SolrServerException(rootCause);
-        }
-      }
-    }
-
-    return resp;
-  }
-
-  protected NamedList<Object> sendRequest(SolrRequest request, List<String> inputCollections)
-      throws SolrServerException, IOException {
-    connect();
-
-    boolean sendToLeaders = false;
-
-    if (request instanceof IsUpdateRequest) {
-      if (request instanceof UpdateRequest) {
-        String collection = inputCollections.isEmpty() ? null : inputCollections.get(0); // getting first mimics HttpSolrCall
-        NamedList<Object> response = directUpdate((AbstractUpdateRequest) request, collection);
-        if (response != null) {
-          return response;
-        }
-      }
-      sendToLeaders = true;
-    }
-
-    SolrParams reqParams = request.getParams();
-    if (reqParams == null) { // TODO fix getParams to never return null!
-      reqParams = new ModifiableSolrParams();
-    }
-
-    final Set<String> liveNodes = getClusterStateProvider().getLiveNodes();
-
-    final List<String> theUrlList = new ArrayList<>(); // we populate this as follows...
-
-    if (request instanceof V2Request) {
-      if (!liveNodes.isEmpty()) {
-        List<String> liveNodesList = new ArrayList<>(liveNodes);
-        Collections.shuffle(liveNodesList, rand);
-        theUrlList.add(Utils.getBaseUrlForNodeName(liveNodesList.get(0),
-            getClusterStateProvider().getClusterProperty(ZkStateReader.URL_SCHEME,"http")));
-      }
-
-    } else if (ADMIN_PATHS.contains(request.getPath())) {
-      for (String liveNode : liveNodes) {
-        theUrlList.add(Utils.getBaseUrlForNodeName(liveNode,
-            getClusterStateProvider().getClusterProperty(ZkStateReader.URL_SCHEME,"http")));
-      }
-
-    } else { // Typical...
-      Set<String> collectionNames = resolveAliases(inputCollections);
-      if (collectionNames.isEmpty()) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-            "No collection param specified on request and no default collection has been set: " + inputCollections);
-      }
-
-      // TODO: not a big deal because of the caching, but we could avoid looking
-      //   at every shard when getting leaders if we tweaked some things
-
-      // Retrieve slices from the cloud state and, for each collection specified, add it to the Map of slices.
-      Map<String,Slice> slices = new HashMap<>();
-      String shardKeys = reqParams.get(ShardParams._ROUTE_);
-      for (String collectionName : collectionNames) {
-        DocCollection col = getDocCollection(collectionName, null);
-        if (col == null) {
-          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
-        }
-        Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col);
-        ClientUtils.addSlices(slices, collectionName, routeSlices, true);
-      }
-
-      // Gather URLs, grouped by leader or replica
-      // TODO: allow filtering by group, role, etc
-      Set<String> seenNodes = new HashSet<>();
-      List<String> replicas = new ArrayList<>();
-      String joinedInputCollections = StrUtils.join(inputCollections, ',');
-      for (Slice slice : slices.values()) {
-        for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) {
-          ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
-          String node = coreNodeProps.getNodeName();
-          if (!liveNodes.contains(node) // Must be a live node to continue
-              || Replica.State.getState(coreNodeProps.getState()) != Replica.State.ACTIVE) // Must be an ACTIVE replica to continue
-            continue;
-          if (seenNodes.add(node)) { // if we haven't yet collected a URL to this node...
-            String url = ZkCoreNodeProps.getCoreUrl(nodeProps.getStr(ZkStateReader.BASE_URL_PROP), joinedInputCollections);
-            if (sendToLeaders && coreNodeProps.isLeader()) {
-              theUrlList.add(url); // put leaders here eagerly (if sendToLeader mode)
-            } else {
-              replicas.add(url); // replicas here
-            }
-          }
-        }
-      }
-
-      // Shuffle the leaders, if any    (none if !sendToLeaders)
-      Collections.shuffle(theUrlList, rand);
-
-      // Shuffle the replicas, if any, and append to our list
-      Collections.shuffle(replicas, rand);
-      theUrlList.addAll(replicas);
-
-      if (theUrlList.isEmpty()) {
-        collectionStateCache.keySet().removeAll(collectionNames);
-        throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
-            "Could not find a healthy node to handle the request.");
-      }
-    }
-
-    LBSolrClient.Req req = new LBSolrClient.Req(request, theUrlList);
-    LBSolrClient.Rsp rsp = getLbClient().request(req);
-    return rsp.getResponse();
-  }
-
-  /** Resolves the input collections to their possible aliased collections. Doesn't validate collection existence. */
-  private LinkedHashSet<String> resolveAliases(List<String> inputCollections) {
-    LinkedHashSet<String> collectionNames = new LinkedHashSet<>(); // consistent ordering
-    for (String collectionName : inputCollections) {
-      if (getClusterStateProvider().getState(collectionName) == null) {
-        // perhaps it's an alias
-        List<String> aliasedCollections = getClusterStateProvider().resolveAlias(collectionName);
-        // one more level of alias indirection...  (dubious that we should support this)
-        for (String aliasedCollection : aliasedCollections) {
-          collectionNames.addAll(getClusterStateProvider().resolveAlias(aliasedCollection));
-        }
-      } else {
-        collectionNames.add(collectionName); // it's a collection
-      }
-    }
-    return collectionNames;
-  }
-
-  public boolean isUpdatesToLeaders() {
-    return updatesToLeaders;
-  }
-
-  /**
-   * @return true if direct updates are sent to shard leaders only
-   */
-  public boolean isDirectUpdatesToLeadersOnly() {
-    return directUpdatesToLeadersOnly;
-  }
-
-  /**If caches are expired they are refreshed after acquiring a lock.
-   * use this to set the number of locks
-   */
-  public void setParallelCacheRefreshes(int n){ locks = objectList(n); }
-
-  protected static ArrayList<Object> objectList(int n) {
-    ArrayList<Object> l =  new ArrayList<>(n);
-    for(int i=0;i<n;i++) l.add(new Object());
-    return l;
-  }
-
-
-  protected DocCollection getDocCollection(String collection, Integer expectedVersion) throws SolrException {
-    if (expectedVersion == null) expectedVersion = -1;
-    if (collection == null) return null;
-    ExpiringCachedDocCollection cacheEntry = collectionStateCache.get(collection);
-    DocCollection col = cacheEntry == null ? null : cacheEntry.cached;
-    if (col != null) {
-      if (expectedVersion <= col.getZNodeVersion()
-          && !cacheEntry.shouldRetry()) return col;
-    }
-
-    ClusterState.CollectionRef ref = getCollectionRef(collection);
-    if (ref == null) {
-      //no such collection exists
-      return null;
-    }
-    if (!ref.isLazilyLoaded()) {
-      //it is readily available just return it
-      return ref.get();
-    }
-    List locks = this.locks;
-    final Object lock = locks.get(Math.abs(Hash.murmurhash3_x86_32(collection, 0, collection.length(), 0) % locks.size()));
-    DocCollection fetchedCol = null;
-    synchronized (lock) {
-      /*we have waited for sometime just check once again*/
-      cacheEntry = collectionStateCache.get(collection);
-      col = cacheEntry == null ? null : cacheEntry.cached;
-      if (col != null) {
-        if (expectedVersion <= col.getZNodeVersion()
-            && !cacheEntry.shouldRetry()) return col;
-      }
-      // We are going to fetch a new version
-      // we MUST try to get a new version
-      fetchedCol = ref.get();//this is a call to ZK
-      if (fetchedCol == null) return null;// this collection no more exists
-      if (col != null && fetchedCol.getZNodeVersion() == col.getZNodeVersion()) {
-        cacheEntry.setRetriedAt();//we retried and found that it is the same version
-        cacheEntry.maybeStale = false;
-      } else {
-        if (fetchedCol.getStateFormat() > 1)
-          collectionStateCache.put(collection, new ExpiringCachedDocCollection(fetchedCol));
-      }
-      return fetchedCol;
-    }
-  }
-
-  ClusterState.CollectionRef getCollectionRef(String collection) {
-    return getClusterStateProvider().getState(collection);
-  }
-
-  /**
-   * Useful for determining the minimum achieved replication factor across
-   * all shards involved in processing an update request, typically useful
-   * for gauging the replication factor of a batch.
-   */
-  @SuppressWarnings("rawtypes")
-  public int getMinAchievedReplicationFactor(String collection, NamedList resp) {
-    // it's probably already on the top-level header set by condense
-    NamedList header = (NamedList)resp.get("responseHeader");
-    Integer achRf = (Integer)header.get(UpdateRequest.REPFACT);
-    if (achRf != null)
-      return achRf.intValue();
-
-    // not on the top-level header, walk the shard route tree
-    Map<String,Integer> shardRf = getShardReplicationFactor(collection, resp);
-    for (Integer rf : shardRf.values()) {
-      if (achRf == null || rf < achRf) {
-        achRf = rf;
-      }
-    }
-    return (achRf != null) ? achRf.intValue() : -1;
-  }
-
-  /**
-   * Walks the NamedList response after performing an update request looking for
-   * the replication factor that was achieved in each shard involved in the request.
-   * For single doc updates, there will be only one shard in the return value.
-   */
-  @SuppressWarnings("rawtypes")
-  public Map<String,Integer> getShardReplicationFactor(String collection, NamedList resp) {
-    connect();
-
-    Map<String,Integer> results = new HashMap<String,Integer>();
-    if (resp instanceof RouteResponse) {
-      NamedList routes = ((RouteResponse)resp).getRouteResponses();
-      DocCollection coll = getDocCollection(collection, null);
-      Map<String,String> leaders = new HashMap<String,String>();
-      for (Slice slice : coll.getActiveSlicesArr()) {
-        Replica leader = slice.getLeader();
-        if (leader != null) {
-          ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
-          String leaderUrl = zkProps.getBaseUrl() + "/" + zkProps.getCoreName();
-          leaders.put(leaderUrl, slice.getName());
-          String altLeaderUrl = zkProps.getBaseUrl() + "/" + collection;
-          leaders.put(altLeaderUrl, slice.getName());
-        }
-      }
-
-      Iterator<Map.Entry<String,Object>> routeIter = routes.iterator();
-      while (routeIter.hasNext()) {
-        Map.Entry<String,Object> next = routeIter.next();
-        String host = next.getKey();
-        NamedList hostResp = (NamedList)next.getValue();
-        Integer rf = (Integer)((NamedList)hostResp.get("responseHeader")).get(UpdateRequest.REPFACT);
-        if (rf != null) {
-          String shard = leaders.get(host);
-          if (shard == null) {
-            if (host.endsWith("/"))
-              shard = leaders.get(host.substring(0,host.length()-1));
-            if (shard == null) {
-              shard = host;
-            }
-          }
-          results.put(shard, rf);
-        }
-      }
-    }
-    return results;
-  }
-
-  private static boolean hasInfoToFindLeaders(UpdateRequest updateRequest, String idField) {
-    final Map<SolrInputDocument,Map<String,Object>> documents = updateRequest.getDocumentsMap();
-    final Map<String,Map<String,Object>> deleteById = updateRequest.getDeleteByIdMap();
-
-    final boolean hasNoDocuments = (documents == null || documents.isEmpty());
-    final boolean hasNoDeleteById = (deleteById == null || deleteById.isEmpty());
-    if (hasNoDocuments && hasNoDeleteById) {
-      // no documents and no delete-by-id, so no info to find leader(s)
-      return false;
-    }
-
-    if (documents != null) {
-      for (final Map.Entry<SolrInputDocument,Map<String,Object>> entry : documents.entrySet()) {
-        final SolrInputDocument doc = entry.getKey();
-        final Object fieldValue = doc.getFieldValue(idField);
-        if (fieldValue == null) {
-          // a document with no id field value, so can't find leader for it
-          return false;
-        }
-      }
-    }
-
-    return true;
-  }
-
-}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
deleted file mode 100644
index 042b6e4..0000000
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.client.solrj.impl;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.common.cloud.Aliases;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SimpleOrderedMap;
-import org.apache.solr.common.util.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.*;
-
-public abstract class BaseHttpClusterStateProvider implements ClusterStateProvider {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private String urlScheme;
-  volatile Set<String> liveNodes;
-  long liveNodesTimestamp = 0;
-  volatile Map<String, List<String>> aliases;
-  long aliasesTimestamp = 0;
-
-  private int cacheTimeout = 5; // the liveNodes and aliases cache will be invalidated after 5 secs
-
-  public void init(List<String> solrUrls) throws Exception {
-    for (String solrUrl: solrUrls) {
-      urlScheme = solrUrl.startsWith("https")? "https": "http";
-      try (SolrClient initialClient = getSolrClient(solrUrl)) {
-        this.liveNodes = fetchLiveNodes(initialClient);
-        liveNodesTimestamp = System.nanoTime();
-        break;
-      } catch (IOException e) {
-        log.warn("Attempt to fetch cluster state from {} failed.", solrUrl, e);
-      }
-    }
-
-    if (this.liveNodes == null || this.liveNodes.isEmpty()) {
-      throw new RuntimeException("Tried fetching live_nodes using Solr URLs provided, i.e. " + solrUrls + ". However, "
-          + "succeeded in obtaining the cluster state from none of them."
-          + "If you think your Solr cluster is up and is accessible,"
-          + " you could try re-creating a new CloudSolrClient using working"
-          + " solrUrl(s) or zkHost(s).");
-    }
-  }
-
-  protected abstract SolrClient getSolrClient(String baseUrl);
-
-  @Override
-  public ClusterState.CollectionRef getState(String collection) {
-    for (String nodeName: liveNodes) {
-      String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
-      try (SolrClient client = getSolrClient(baseUrl)) {
-        ClusterState cs = fetchClusterState(client, collection, null);
-        return cs.getCollectionRef(collection);
-      } catch (SolrServerException | IOException e) {
-        log.warn("Attempt to fetch cluster state from " +
-            Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
-      } catch (RemoteSolrException e) {
-        if ("NOT_FOUND".equals(e.getMetadata("CLUSTERSTATUS"))) {
-          return null;
-        }
-        log.warn("Attempt to fetch cluster state from {} failed.", baseUrl, e);
-      } catch (NotACollectionException e) {
-        // Cluster state for the given collection was not found, could be an alias.
-        // Lets fetch/update our aliases:
-        getAliases(true);
-        return null;
-      }
-    }
-    throw new RuntimeException("Tried fetching cluster state using the node names we knew of, i.e. " + liveNodes +". However, "
-        + "succeeded in obtaining the cluster state from none of them."
-        + "If you think your Solr cluster is up and is accessible,"
-        + " you could try re-creating a new CloudSolrClient using working"
-        + " solrUrl(s) or zkHost(s).");
-  }
-
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  private ClusterState fetchClusterState(SolrClient client, String collection, Map<String, Object> clusterProperties) throws SolrServerException, IOException, NotACollectionException {
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    if (collection != null) {
-      params.set("collection", collection);
-    }
-    params.set("action", "CLUSTERSTATUS");
-    QueryRequest request = new QueryRequest(params);
-    request.setPath("/admin/collections");
-    NamedList cluster = (SimpleOrderedMap) client.request(request).get("cluster");
-    Map<String, Object> collectionsMap;
-    if (collection != null) {
-      collectionsMap = Collections.singletonMap(collection,
-          ((NamedList) cluster.get("collections")).get(collection));
-    } else {
-      collectionsMap = ((NamedList)cluster.get("collections")).asMap(10);
-    }
-    int znodeVersion;
-    Map<String, Object> collFromStatus = (Map<String, Object>) (collectionsMap).get(collection);
-    if (collection != null && collFromStatus == null) {
-      throw new NotACollectionException(); // probably an alias
-    }
-    if (collection != null) { // can be null if alias
-      znodeVersion =  (int) collFromStatus.get("znodeVersion");
-    } else {
-      znodeVersion = -1;
-    }
-    Set<String> liveNodes = new HashSet((List<String>)(cluster.get("live_nodes")));
-    this.liveNodes = liveNodes;
-    liveNodesTimestamp = System.nanoTime();
-    //TODO SOLR-11877 we don't know the znode path; CLUSTER_STATE is probably wrong leading to bad stateFormat
-    ClusterState cs = ClusterState.load(znodeVersion, collectionsMap, liveNodes, ZkStateReader.CLUSTER_STATE);
-    if (clusterProperties != null) {
-      Map<String, Object> properties = (Map<String, Object>) cluster.get("properties");
-      if (properties != null) {
-        clusterProperties.putAll(properties);
-      }
-    }
-    return cs;
-  }
-
-  @Override
-  public Set<String> getLiveNodes() {
-    if (liveNodes == null) {
-      throw new RuntimeException("We don't know of any live_nodes to fetch the"
-          + " latest live_nodes information from. "
-          + "If you think your Solr cluster is up and is accessible,"
-          + " you could try re-creating a new CloudSolrClient using working"
-          + " solrUrl(s) or zkHost(s).");
-    }
-    if (TimeUnit.SECONDS.convert((System.nanoTime() - liveNodesTimestamp), TimeUnit.NANOSECONDS) > getCacheTimeout()) {
-      for (String nodeName: liveNodes) {
-        String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
-        try (SolrClient client = getSolrClient(baseUrl)) {
-          Set<String> liveNodes = fetchLiveNodes(client);
-          this.liveNodes = (liveNodes);
-          liveNodesTimestamp = System.nanoTime();
-          return liveNodes;
-        } catch (Exception e) {
-          log.warn("Attempt to fetch cluster state from {} failed.", baseUrl, e);
-        }
-      }
-      throw new RuntimeException("Tried fetching live_nodes using all the node names we knew of, i.e. " + liveNodes +". However, "
-          + "succeeded in obtaining the cluster state from none of them."
-          + "If you think your Solr cluster is up and is accessible,"
-          + " you could try re-creating a new CloudSolrClient using working"
-          + " solrUrl(s) or zkHost(s).");
-    } else {
-      return liveNodes; // cached copy is fresh enough
-    }
-  }
-
-  private static Set<String> fetchLiveNodes(SolrClient client) throws Exception {
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    params.set("action", "CLUSTERSTATUS");
-    QueryRequest request = new QueryRequest(params);
-    request.setPath("/admin/collections");
-    NamedList cluster = (SimpleOrderedMap) client.request(request).get("cluster");
-    return (Set<String>) new HashSet((List<String>)(cluster.get("live_nodes")));
-  }
-
-  @Override
-  public List<String> resolveAlias(String aliasName) {
-    return Aliases.resolveAliasesGivenAliasMap(getAliases(false), aliasName);
-  }
-
-  private Map<String, List<String>> getAliases(boolean forceFetch) {
-    if (this.liveNodes == null) {
-      throw new RuntimeException("We don't know of any live_nodes to fetch the"
-          + " latest aliases information from. "
-          + "If you think your Solr cluster is up and is accessible,"
-          + " you could try re-creating a new CloudSolrClient using working"
-          + " solrUrl(s) or zkHost(s).");
-    }
-
-    if (forceFetch || this.aliases == null ||
-        TimeUnit.SECONDS.convert((System.nanoTime() - aliasesTimestamp), TimeUnit.NANOSECONDS) > getCacheTimeout()) {
-      for (String nodeName: liveNodes) {
-        String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
-        try (SolrClient client = getSolrClient(baseUrl)) {
-
-          this.aliases = new CollectionAdminRequest.ListAliases().process(client).getAliasesAsLists();
-          this.aliasesTimestamp = System.nanoTime();
-          return Collections.unmodifiableMap(this.aliases);
-        } catch (SolrServerException | RemoteSolrException | IOException e) {
-          // Situation where we're hitting an older Solr which doesn't have LISTALIASES
-          if (e instanceof RemoteSolrException && ((RemoteSolrException)e).code()==400) {
-            log.warn("LISTALIASES not found, possibly using older Solr server. Aliases won't work"
-                + " unless you re-create the CloudSolrClient using zkHost(s) or upgrade Solr server", e);
-            this.aliases = Collections.emptyMap();
-            this.aliasesTimestamp = System.nanoTime();
-            return aliases;
-          }
-          log.warn("Attempt to fetch cluster state from {} failed.", baseUrl, e);
-        }
-      }
-
-      throw new RuntimeException("Tried fetching aliases using all the node names we knew of, i.e. " + liveNodes +". However, "
-          + "succeeded in obtaining the cluster state from none of them."
-          + "If you think your Solr cluster is up and is accessible,"
-          + " you could try re-creating a new CloudSolrClient using a working"
-          + " solrUrl or zkHost.");
-    } else {
-      return Collections.unmodifiableMap(this.aliases); // cached copy is fresh enough
-    }
-  }
-
-  @Override
-  public ClusterState getClusterState() throws IOException {
-    for (String nodeName: liveNodes) {
-      String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
-      try (SolrClient client = getSolrClient(baseUrl)) {
-        return fetchClusterState(client, null, null);
-      } catch (SolrServerException | HttpSolrClient.RemoteSolrException | IOException e) {
-        log.warn("Attempt to fetch cluster state from {} failed.", baseUrl, e);
-      } catch (NotACollectionException e) {
-        // not possible! (we passed in null for collection so it can't be an alias)
-        throw new RuntimeException("null should never cause NotACollectionException in " +
-            "fetchClusterState() Please report this as a bug!");
-      }
-    }
-    throw new RuntimeException("Tried fetching cluster state using the node names we knew of, i.e. " + liveNodes +". However, "
-        + "succeeded in obtaining the cluster state from none of them."
-        + "If you think your Solr cluster is up and is accessible,"
-        + " you could try re-creating a new CloudSolrClient using working"
-        + " solrUrl(s) or zkHost(s).");
-  }
-
-  @Override
-  public Map<String, Object> getClusterProperties() {
-    for (String nodeName : liveNodes) {
-      String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
-      try (SolrClient client = getSolrClient(baseUrl)) {
-        Map<String, Object> clusterProperties = new HashMap<>();
-        fetchClusterState(client, null, clusterProperties);
-        return clusterProperties;
-      } catch (SolrServerException | HttpSolrClient.RemoteSolrException | IOException e) {
-        log.warn("Attempt to fetch cluster state from {} failed.", baseUrl, e);
-      } catch (NotACollectionException e) {
-        // not possible! (we passed in null for collection so it can't be an alias)
-        throw new RuntimeException("null should never cause NotACollectionException in " +
-            "fetchClusterState() Please report this as a bug!");
-      }
-    }
-    throw new RuntimeException("Tried fetching cluster state using the node names we knew of, i.e. " + liveNodes + ". However, "
-        + "succeeded in obtaining the cluster state from none of them."
-        + "If you think your Solr cluster is up and is accessible,"
-        + " you could try re-creating a new CloudSolrClient using working"
-        + " solrUrl(s) or zkHost(s).");
-  }
-
-  @Override
-  public String getPolicyNameByCollection(String coll) {
-    throw new UnsupportedOperationException("Fetching cluster properties not supported"
-        + " using the HttpClusterStateProvider. "
-        + "ZkClientClusterStateProvider can be used for this."); // TODO
-  }
-
-  @Override
-  public Object getClusterProperty(String propertyName) {
-    if (propertyName.equals(ZkStateReader.URL_SCHEME)) {
-      return this.urlScheme;
-    }
-    return getClusterProperties().get(propertyName);
-  }
-
-  @Override
-  public void connect() {}
-
-  public int getCacheTimeout() {
-    return cacheTimeout;
-  }
-
-  public void setCacheTimeout(int cacheTimeout) {
-    this.cacheTimeout = cacheTimeout;
-  }
-
-  // This exception is not meant to escape this class it should be caught and wrapped.
-  private class NotACollectionException extends Exception {
-  }
-}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpSolrClient.java
deleted file mode 100644
index dceb13c..0000000
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpSolrClient.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.client.solrj.impl;
-
-import java.util.Collections;
-
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.util.NamedList;
-
-import static org.apache.solr.common.util.Utils.getObjectByPath;
-
-public abstract class BaseHttpSolrClient extends SolrClient {
-
-  /**
-   * Subclass of SolrException that allows us to capture an arbitrary HTTP
-   * status code that may have been returned by the remote server or a
-   * proxy along the way.
-   */
-  public static class RemoteSolrException extends SolrException {
-    /**
-     * @param remoteHost the host the error was received from
-     * @param code Arbitrary HTTP status code
-     * @param msg Exception Message
-     * @param th Throwable to wrap with this Exception
-     */
-    public RemoteSolrException(String remoteHost, int code, String msg, Throwable th) {
-      super(code, "Error from server at " + remoteHost + ": " + msg, th);
-    }
-  }
-
-  /**
-   * This should be thrown when a server has an error in executing the request and
-   * it sends a proper payload back to the client
-   */
-  public static class RemoteExecutionException extends HttpSolrClient.RemoteSolrException {
-    private NamedList meta;
-
-    public RemoteExecutionException(String remoteHost, int code, String msg, NamedList meta) {
-      super(remoteHost, code, msg, null);
-      this.meta = meta;
-    }
-
-
-    public static HttpSolrClient.RemoteExecutionException create(String host, NamedList errResponse) {
-      Object errObj = errResponse.get("error");
-      if (errObj != null) {
-        Number code = (Number) getObjectByPath(errObj, true, Collections.singletonList("code"));
-        String msg = (String) getObjectByPath(errObj, true, Collections.singletonList("msg"));
-        return new HttpSolrClient.RemoteExecutionException(host, code == null ? ErrorCode.UNKNOWN.code : code.intValue(),
-            msg == null ? "Unknown Error" : msg, errResponse);
-
-      } else {
-        throw new RuntimeException("No error");
-      }
-
-    }
-
-    public NamedList getMetaData() {
-
-      return meta;
-    }
-  }
-
-}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
deleted file mode 100644
index 8e6ac82..0000000
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.client.solrj.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.common.SolrException;
-
-/**
- * SolrJ client class to communicate with SolrCloud using Http2SolrClient.
- * Instances of this class communicate with Zookeeper to discover
- * Solr endpoints for SolrCloud collections, and then use the
- * {@link LBHttp2SolrClient} to issue requests.
- *
- * This class assumes the id field for your documents is called
- * 'id' - if this is not the case, you must set the right name
- * with {@link #setIdField(String)}.
- *
- * @lucene.experimental
- * @since solr 8.0
- */
-@SuppressWarnings("serial")
-public class CloudHttp2SolrClient  extends BaseCloudSolrClient {
-
-  private final ClusterStateProvider stateProvider;
-  private final LBHttp2SolrClient lbClient;
-  private Http2SolrClient myClient;
-  private final boolean clientIsInternal;
-
-  /**
-   * Create a new client object that connects to Zookeeper and is always aware
-   * of the SolrCloud state. If there is a fully redundant Zookeeper quorum and
-   * SolrCloud has enough replicas for every shard in a collection, there is no
-   * single point of failure. Updates will be sent to shard leaders by default.
-   *
-   * @param builder a {@link Http2SolrClient.Builder} with the options used to create the client.
-   */
-  protected CloudHttp2SolrClient(Builder builder) {
-    super(builder.shardLeadersOnly, builder.parallelUpdates, builder.directUpdatesToLeadersOnly);
-    this.clientIsInternal = builder.httpClient == null;
-    this.myClient = (builder.httpClient == null) ? new Http2SolrClient.Builder().build() : builder.httpClient;
-    if (builder.stateProvider == null) {
-      if (builder.zkHosts != null && builder.solrUrls != null) {
-        throw new IllegalArgumentException("Both zkHost(s) & solrUrl(s) have been specified. Only specify one.");
-      }
-      if (builder.zkHosts != null) {
-        this.stateProvider = new ZkClientClusterStateProvider(builder.zkHosts, builder.zkChroot);
-      } else if (builder.solrUrls != null && !builder.solrUrls.isEmpty()) {
-        try {
-          this.stateProvider = new Http2ClusterStateProvider(builder.solrUrls, builder.httpClient);
-        } catch (Exception e) {
-          throw new RuntimeException("Couldn't initialize a HttpClusterStateProvider (is/are the "
-              + "Solr server(s), "  + builder.solrUrls + ", down?)", e);
-        }
-      } else {
-        throw new IllegalArgumentException("Both zkHosts and solrUrl cannot be null.");
-      }
-    } else {
-      this.stateProvider = builder.stateProvider;
-    }
-    this.lbClient = new LBHttp2SolrClient(myClient);
-
-  }
-
-
-  @Override
-  public void close() throws IOException {
-    stateProvider.close();
-    lbClient.close();
-
-    if (clientIsInternal && myClient!=null) {
-      myClient.close();
-    }
-
-    super.close();
-  }
-
-  public LBHttp2SolrClient getLbClient() {
-    return lbClient;
-  }
-
-  @Override
-  public ClusterStateProvider getClusterStateProvider() {
-    return stateProvider;
-  }
-
-  public Http2SolrClient getHttpClient() {
-    return myClient;
-  }
-
-  @Override
-  protected boolean wasCommError(Throwable rootCause) {
-    return false;
-  }
-
-  /**
-   * Constructs {@link CloudHttp2SolrClient} instances from provided configuration.
-   */
-  public static class Builder {
-    protected Collection<String> zkHosts = new ArrayList<>();
-    protected List<String> solrUrls = new ArrayList<>();
-    protected String zkChroot;
-    protected Http2SolrClient httpClient;
-    protected boolean shardLeadersOnly = true;
-    protected boolean directUpdatesToLeadersOnly = false;
-    protected boolean parallelUpdates = true;
-    protected ClusterStateProvider stateProvider;
-
-    /**
-     * Provide a series of Solr URLs to be used when configuring {@link CloudHttp2SolrClient} instances.
-     * The solr client will use these urls to understand the cluster topology, which solr nodes are active etc.
-     *
-     * Provided Solr URLs are expected to point to the root Solr path ("http://hostname:8983/solr"); they should not
-     * include any collections, cores, or other path components.
-     *
-     * Usage example:
-     *
-     * <pre>
-     *   final List&lt;String&gt; solrBaseUrls = new ArrayList&lt;String&gt;();
-     *   solrBaseUrls.add("http://solr1:8983/solr"); solrBaseUrls.add("http://solr2:8983/solr"); solrBaseUrls.add("http://solr3:8983/solr");
-     *   final SolrClient client = new CloudHttp2SolrClient.Builder(solrBaseUrls).build();
-     * </pre>
-     */
-    public Builder(List<String> solrUrls) {
-      this.solrUrls = solrUrls;
-    }
-
-    /**
-     * Provide a series of ZK hosts which will be used when configuring {@link CloudHttp2SolrClient} instances.
-     *
-     * Usage example when Solr stores data at the ZooKeeper root ('/'):
-     *
-     * <pre>
-     *   final List&lt;String&gt; zkServers = new ArrayList&lt;String&gt;();
-     *   zkServers.add("zookeeper1:2181"); zkServers.add("zookeeper2:2181"); zkServers.add("zookeeper3:2181");
-     *   final SolrClient client = new CloudHttp2SolrClient.Builder(zkServers, Optional.empty()).build();
-     * </pre>
-     *
-     * Usage example when Solr data is stored in a ZooKeeper chroot:
-     *
-     *  <pre>
-     *    final List&lt;String&gt; zkServers = new ArrayList&lt;String&gt;();
-     *    zkServers.add("zookeeper1:2181"); zkServers.add("zookeeper2:2181"); zkServers.add("zookeeper3:2181");
-     *    final SolrClient client = new CloudHttp2SolrClient.Builder(zkServers, Optional.of("/solr")).build();
-     *  </pre>
-     *
-     * @param zkHosts a List of at least one ZooKeeper host and port (e.g. "zookeeper1:2181")
-     * @param zkChroot the path to the root ZooKeeper node containing Solr data.  Provide {@code java.util.Optional.empty()} if no ZK chroot is used.
-     */
-    public Builder(List<String> zkHosts, Optional<String> zkChroot) {
-      this.zkHosts = zkHosts;
-      if (zkChroot.isPresent()) this.zkChroot = zkChroot.get();
-    }
-
-    /**
-     * Tells {@link CloudHttp2SolrClient.Builder} that created clients should send direct updates to shard leaders only.
-     *
-     * UpdateRequests whose leaders cannot be found will "fail fast" on the client side with a {@link SolrException}
-     */
-    public Builder sendDirectUpdatesToShardLeadersOnly() {
-      directUpdatesToLeadersOnly = true;
-      return this;
-    }
-
-    /**
-     * Tells {@link CloudHttp2SolrClient.Builder} that created clients can send updates to any shard replica (shard leaders and non-leaders).
-     *
-     * Shard leaders are still preferred, but the created clients will fallback to using other replicas if a leader
-     * cannot be found.
-     */
-    public Builder sendDirectUpdatesToAnyShardReplica() {
-      directUpdatesToLeadersOnly = false;
-      return this;
-    }
-
-    /**
-     * Tells {@link CloudHttp2SolrClient.Builder} whether created clients should send shard updates serially or in parallel
-     *
-     * When an {@link UpdateRequest} affects multiple shards, {@link CloudHttp2SolrClient} splits it up and sends a request
-     * to each affected shard.  This setting chooses whether those sub-requests are sent serially or in parallel.
-     * <p>
-     * If not set, this defaults to 'true' and sends sub-requests in parallel.
-     */
-    public Builder withParallelUpdates(boolean parallelUpdates) {
-      this.parallelUpdates = parallelUpdates;
-      return this;
-    }
-
-    public Builder withHttpClient(Http2SolrClient httpClient) {
-      this.httpClient = httpClient;
-      return this;
-    }
-
-    /**
-     * Create a {@link CloudHttp2SolrClient} based on the provided configuration.
-     */
-    public CloudHttp2SolrClient build() {
-      if (stateProvider == null) {
-        if (!zkHosts.isEmpty()) {
-          stateProvider = new ZkClientClusterStateProvider(zkHosts, zkChroot);
-        }
-        else if (!this.solrUrls.isEmpty()) {
-          try {
-            stateProvider = new Http2ClusterStateProvider(solrUrls, httpClient);
-          } catch (Exception e) {
-            throw new RuntimeException("Couldn't initialize a HttpClusterStateProvider (is/are the "
-                + "Solr server(s), "  + solrUrls + ", down?)", e);
-          }
-        } else {
-          throw new IllegalArgumentException("Both zkHosts and solrUrl cannot be null.");
-        }
-      }
-      return new CloudHttp2SolrClient(this);
-    }
-
-  }
-}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index 0b08780..37cdba7 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -17,21 +17,78 @@
 package org.apache.solr.client.solrj.impl;
 
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.net.ConnectException;
+import java.net.SocketException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.http.NoHttpResponseException;
 import org.apache.http.client.HttpClient;
 import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.solr.client.solrj.ResponseParser;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.V2RequestSupport;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.IsUpdateRequest;
+import org.apache.solr.client.solrj.request.RequestWriter;
 import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.ToleratedUpdateError;
+import org.apache.solr.common.cloud.ClusterState.CollectionRef;
+import org.apache.solr.common.cloud.CollectionStatePredicate;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.UpdateParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.Hash;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
+import static org.apache.solr.common.params.CommonParams.ID;
 
 /**
  * SolrJ client class to communicate with SolrCloud.
@@ -44,15 +101,102 @@ import org.apache.solr.common.util.NamedList;
  * with {@link #setIdField(String)}.
  */
 @SuppressWarnings("serial")
-public class CloudSolrClient extends BaseCloudSolrClient {
+public class CloudSolrClient extends SolrClient {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final ClusterStateProvider stateProvider;
+  private volatile String defaultCollection;
   private final LBHttpSolrClient lbClient;
   private final boolean shutdownLBHttpSolrServer;
   private HttpClient myClient;
   private final boolean clientIsInternal;
+  //no of times collection state to be reloaded if stale state error is received
+  private static final int MAX_STALE_RETRIES = Integer.parseInt(System.getProperty("cloudSolrClientMaxStaleRetries", "5"));
+  Random rand = new Random();
+  
+  private final boolean updatesToLeaders;
+  private final boolean directUpdatesToLeadersOnly;
+  private boolean parallelUpdates; //TODO final
+  private ExecutorService threadPool = ExecutorUtil
+      .newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory(
+          "CloudSolrClient ThreadPool"));
+  private String idField = ID;
+  public static final String STATE_VERSION = "_stateVer_";
+  private long retryExpiryTime = TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS);//3 seconds or 3 million nanos
+  private final Set<String> NON_ROUTABLE_PARAMS;
+  {
+    NON_ROUTABLE_PARAMS = new HashSet<>();
+    NON_ROUTABLE_PARAMS.add(UpdateParams.EXPUNGE_DELETES);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.MAX_OPTIMIZE_SEGMENTS);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.COMMIT);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.WAIT_SEARCHER);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.OPEN_SEARCHER);
+    
+    NON_ROUTABLE_PARAMS.add(UpdateParams.SOFT_COMMIT);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.PREPARE_COMMIT);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.OPTIMIZE);
+    
+    // Not supported via SolrCloud
+    // NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
+
+  }
+  private volatile List<Object> locks = objectList(3);
+
+
+  static class StateCache extends ConcurrentHashMap<String, ExpiringCachedDocCollection> {
+    final AtomicLong puts = new AtomicLong();
+    final AtomicLong hits = new AtomicLong();
+    final Lock evictLock = new ReentrantLock(true);
+    private volatile long timeToLive = 60 * 1000L;
+
+    @Override
+    public ExpiringCachedDocCollection get(Object key) {
+      ExpiringCachedDocCollection val = super.get(key);
+      if(val == null) {
+        // a new collection is likely to be added now.
+        //check if there are stale items and remove them
+        evictStale();
+        return null;
+      }
+      if(val.isExpired(timeToLive)) {
+        super.remove(key);
+        return null;
+      }
+      hits.incrementAndGet();
+      return val;
+    }
+
+    @Override
+    public ExpiringCachedDocCollection put(String key, ExpiringCachedDocCollection value) {
+      puts.incrementAndGet();
+      return super.put(key, value);
+    }
+
+    void evictStale() {
+      if(!evictLock.tryLock()) return;
+      try {
+        for (Entry<String, ExpiringCachedDocCollection> e : entrySet()) {
+          if(e.getValue().isExpired(timeToLive)){
+            super.remove(e.getKey());
+          }
+        }
+      } finally {
+        evictLock.unlock();
+      }
+    }
+
+  }
 
-  public static final String STATE_VERSION = BaseCloudSolrClient.STATE_VERSION;
+  /**
+   * This is the time to wait to refetch the state
+   * after getting the same state version from ZK
+   * <p>
+   * secs
+   */
+  public void setRetryExpiryTime(int secs) {
+    this.retryExpiryTime = TimeUnit.NANOSECONDS.convert(secs, TimeUnit.SECONDS);
+  }
 
   /**
    * @deprecated since 7.0  Use {@link Builder} methods instead. 
@@ -62,6 +206,42 @@ public class CloudSolrClient extends BaseCloudSolrClient {
     lbClient.setSoTimeout(timeout);
   }
 
+  protected final StateCache collectionStateCache = new StateCache();
+
+  class ExpiringCachedDocCollection {
+    final DocCollection cached;
+    final long cachedAt;
+    //This is the time at which the collection is retried and got the same old version
+    volatile long retriedAt = -1;
+    //flag that suggests that this is potentially to be rechecked
+    volatile boolean maybeStale = false;
+
+    ExpiringCachedDocCollection(DocCollection cached) {
+      this.cached = cached;
+      this.cachedAt = System.nanoTime();
+    }
+
+    boolean isExpired(long timeToLiveMs) {
+      return (System.nanoTime() - cachedAt)
+          > TimeUnit.NANOSECONDS.convert(timeToLiveMs, TimeUnit.MILLISECONDS);
+    }
+
+    boolean shouldRetry() {
+      if (maybeStale) {// we are not sure if it is stale so check with retry time
+        if ((retriedAt == -1 ||
+            (System.nanoTime() - retriedAt) > retryExpiryTime)) {
+          return true;// we retried a while back. and we could not get anything new.
+          //it's likely that it is not going to be available now also.
+        }
+      }
+      return false;
+    }
+
+    void setRetriedAt() {
+      retriedAt = System.nanoTime();
+    }
+  }
+  
   /**
    * Create a new client object that connects to Zookeeper and is always aware
    * of the SolrCloud state. If there is a fully redundant Zookeeper quorum and
@@ -71,7 +251,6 @@ public class CloudSolrClient extends BaseCloudSolrClient {
    * @param builder a {@link CloudSolrClient.Builder} with the options used to create the client.
    */
   protected CloudSolrClient(Builder builder) {
-    super(builder.shardLeadersOnly, builder.parallelUpdates, builder.directUpdatesToLeadersOnly);
     if (builder.stateProvider == null) {
       if (builder.zkHosts != null && builder.solrUrls != null) {
         throw new IllegalArgumentException("Both zkHost(s) & solrUrl(s) have been specified. Only specify one.");
@@ -101,6 +280,9 @@ public class CloudSolrClient extends BaseCloudSolrClient {
     this.myClient = (builder.httpClient == null) ? HttpClientUtil.createClient(null) : builder.httpClient;
     if (builder.loadBalancedSolrClient == null) builder.loadBalancedSolrClient = createLBHttpSolrClient(builder, myClient);
     this.lbClient = builder.loadBalancedSolrClient;
+    this.updatesToLeaders = builder.shardLeadersOnly;
+    this.parallelUpdates = builder.parallelUpdates;
+    this.directUpdatesToLeadersOnly = builder.directUpdatesToLeadersOnly;
   }
   
   private void propagateLBClientConfigOptions(Builder builder) {
@@ -115,14 +297,91 @@ public class CloudSolrClient extends BaseCloudSolrClient {
     }
   }
 
-  protected Map<String, LBHttpSolrClient.Req> createRoutes(UpdateRequest updateRequest, ModifiableSolrParams routableParams,
-                                                       DocCollection col, DocRouter router, Map<String, List<String>> urlMap,
-                                                       String idField) {
-    return urlMap == null ? null : updateRequest.getRoutes(router, col, urlMap, routableParams, idField);
+  /** Sets the cache ttl for DocCollection Objects cached  . This is only applicable for collections which are persisted outside of clusterstate.json
+   * @param seconds ttl value in seconds
+   */
+  public void setCollectionCacheTTl(int seconds){
+    assert seconds > 0;
+    this.collectionStateCache.timeToLive = seconds * 1000L;
+  }
+
+  public ResponseParser getParser() {
+    return lbClient.getParser();
+  }
+
+  /**
+   * Note: This setter method is <b>not thread-safe</b>.
+   * 
+   * @param processor
+   *          Default Response Parser chosen to parse the response if the parser
+   *          were not specified as part of the request.
+   * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser()
+   */
+  public void setParser(ResponseParser processor) {
+    lbClient.setParser(processor);
+  }
+  
+  public RequestWriter getRequestWriter() {
+    return lbClient.getRequestWriter();
+  }
+  
+  public void setRequestWriter(RequestWriter requestWriter) {
+    lbClient.setRequestWriter(requestWriter);
+  }
+
+  /**
+   * @return the zkHost value used to connect to zookeeper.
+   */
+  public String getZkHost() {
+    return assertZKStateProvider().zkHost;
+  }
+
+  public ZkStateReader getZkStateReader() {
+    if (stateProvider instanceof ZkClientClusterStateProvider) {
+      ZkClientClusterStateProvider provider = (ZkClientClusterStateProvider) stateProvider;
+      stateProvider.connect();
+      return provider.zkStateReader;
+    }
+    throw new IllegalStateException("This has no Zk stateReader");
+  }
+
+  /**
+   * @param idField the field to route documents on.
+   */
+  public void setIdField(String idField) {
+    this.idField = idField;
+  }
+
+  /**
+   * @return the field that updates are routed on.
+   */
+  public String getIdField() {
+    return idField;
+  }
+  
+  /** Sets the default collection for request */
+  public void setDefaultCollection(String collection) {
+    this.defaultCollection = collection;
+  }
+
+  /** Gets the default collection for request */
+  public String getDefaultCollection() {
+    return defaultCollection;
+  }
+
+  /** Set the connect timeout to the zookeeper ensemble in ms */
+  public void setZkConnectTimeout(int zkConnectTimeout) {
+    assertZKStateProvider().zkConnectTimeout = zkConnectTimeout;
+  }
+
+  /** Set the timeout to the zookeeper ensemble in ms */
+  public void setZkClientTimeout(int zkClientTimeout) {
+    assertZKStateProvider().zkClientTimeout = zkClientTimeout;
   }
 
-  protected RouteException getRouteException(SolrException.ErrorCode serverError, NamedList<Throwable> exceptions, Map<String, ? extends LBSolrClient.Req> routes) {
-    return new RouteException(serverError, exceptions, routes);
+  /** Gets whether direct updates are sent in parallel */
+  public boolean isParallelUpdates() {
+    return parallelUpdates;
   }
 
   /** @deprecated since 7.2  Use {@link Builder} methods instead. */
@@ -132,33 +391,745 @@ public class CloudSolrClient extends BaseCloudSolrClient {
   }
 
   /**
-   * @deprecated since Solr 8.0
+   * Connect to the zookeeper ensemble.
+   * This is an optional method that may be used to force a connect before any other requests are sent.
+   *
    */
-  @Deprecated
-  public RouteResponse condenseResponse(NamedList response, int timeMillis) {
-    return condenseResponse(response, timeMillis, RouteResponse::new);
+  public void connect() {
+    stateProvider.connect();
   }
 
   /**
-   * @deprecated since Solr 8.0
+   * Connect to a cluster.  If the cluster is not ready, retry connection up to a given timeout.
+   * @param duration the timeout
+   * @param timeUnit the units of the timeout
+   * @throws TimeoutException if the cluster is not ready after the timeout
+   * @throws InterruptedException if the wait is interrupted
    */
-  @Deprecated
-  public static class RouteResponse extends BaseCloudSolrClient.RouteResponse<LBHttpSolrClient.Req> {
+  public void connect(long duration, TimeUnit timeUnit) throws TimeoutException, InterruptedException {
+    log.info("Waiting for {} {} for cluster at {} to be ready", duration, timeUnit, stateProvider);
+    long timeout = System.nanoTime() + timeUnit.toNanos(duration);
+    while (System.nanoTime() < timeout) {
+      try {
+        connect();
+        log.info("Cluster at {} ready", stateProvider);
+        return;
+      }
+      catch (RuntimeException e) {
+        // not ready yet, then...
+      }
+      TimeUnit.MILLISECONDS.sleep(250);
+    }
+    throw new TimeoutException("Timed out waiting for cluster");
+  }
+
+  private ZkClientClusterStateProvider assertZKStateProvider() {
+    if (stateProvider instanceof ZkClientClusterStateProvider) {
+      return (ZkClientClusterStateProvider) stateProvider;
+    }
+    throw new IllegalArgumentException("This client does not use ZK");
 
   }
 
   /**
-   * @deprecated since Solr 8.0
+   * Block until a collection state matches a predicate, or a timeout
+   *
+   * Note that the predicate may be called again even after it has returned true, so
+   * implementors should avoid changing state within the predicate call itself.
+   *
+   * @param collection the collection to watch
+   * @param wait       how long to wait
+   * @param unit       the units of the wait parameter
+   * @param predicate  a {@link CollectionStatePredicate} to check the collection state
+   * @throws InterruptedException on interrupt
+   * @throws TimeoutException     on timeout
    */
-  @Deprecated
-  public static class RouteException extends BaseCloudSolrClient.RouteException {
+  public void waitForState(String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
+      throws InterruptedException, TimeoutException {
+    stateProvider.connect();
+    assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, predicate);
+  }
+
+  /**
+   * Register a CollectionStateWatcher to be called when the cluster state for a collection changes
+   *
+   * Note that the watcher is unregistered after it has been called once.  To make a watcher persistent,
+   * it should re-register itself in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)}
+   * call
+   *
+   * @param collection the collection to watch
+   * @param watcher    a watcher that will be called when the state changes
+   */
+  public void registerCollectionStateWatcher(String collection, CollectionStateWatcher watcher) {
+    stateProvider.connect();
+    assertZKStateProvider().zkStateReader.registerCollectionStateWatcher(collection, watcher);
+  }
+
+  private NamedList<Object> directUpdate(AbstractUpdateRequest request, String collection) throws SolrServerException {
+    UpdateRequest updateRequest = (UpdateRequest) request;
+    SolrParams params = request.getParams();
+    ModifiableSolrParams routableParams = new ModifiableSolrParams();
+    ModifiableSolrParams nonRoutableParams = new ModifiableSolrParams();
+
+    if(params != null) {
+      nonRoutableParams.add(params);
+      routableParams.add(params);
+      for(String param : NON_ROUTABLE_PARAMS) {
+        routableParams.remove(param);
+      }
+    }
+
+    if (collection == null) {
+      throw new SolrServerException("No collection param specified on request and no default collection has been set.");
+    }
+
+    //Check to see if the collection is an alias.
+    List<String> aliasedCollections = stateProvider.resolveAlias(collection);
+    collection = aliasedCollections.get(0); // pick 1st (consistent with HttpSolrCall behavior)
+
+    DocCollection col = getDocCollection(collection, null);
+
+    DocRouter router = col.getRouter();
+    
+    if (router instanceof ImplicitDocRouter) {
+      // short circuit as optimization
+      return null;
+    }
+
+    //Create the URL map, which is keyed on slice name.
+    //The value is a list of URLs for each replica in the slice.
+    //The first value in the list is the leader for the slice.
+    final Map<String,List<String>> urlMap = buildUrlMap(col);
+    final Map<String, LBHttpSolrClient.Req> routes = (urlMap == null ? null : updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField));
+    if (routes == null) {
+      if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(updateRequest, idField)) {
+          // we have info (documents with ids and/or ids to delete) with
+          // which to find the leaders but we could not find (all of) them
+          throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
+              "directUpdatesToLeadersOnly==true but could not find leader(s)");
+      } else {
+        // we could not find a leader or routes yet - use unoptimized general path
+        return null;
+      }
+    }
+
+    final NamedList<Throwable> exceptions = new NamedList<>();
+    final NamedList<NamedList> shardResponses = new NamedList<>(routes.size()+1); // +1 for deleteQuery
+
+    long start = System.nanoTime();
+
+    if (parallelUpdates) {
+      final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<>(routes.size());
+      for (final Map.Entry<String, LBHttpSolrClient.Req> entry : routes.entrySet()) {
+        final String url = entry.getKey();
+        final LBHttpSolrClient.Req lbRequest = entry.getValue();
+        try {
+          MDC.put("CloudSolrClient.url", url);
+          responseFutures.put(url, threadPool.submit(() -> lbClient.request(lbRequest).getResponse()));
+        } finally {
+          MDC.remove("CloudSolrClient.url");
+        }
+      }
+
+      for (final Map.Entry<String, Future<NamedList<?>>> entry: responseFutures.entrySet()) {
+        final String url = entry.getKey();
+        final Future<NamedList<?>> responseFuture = entry.getValue();
+        try {
+          shardResponses.add(url, responseFuture.get());
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException(e);
+        } catch (ExecutionException e) {
+          exceptions.add(url, e.getCause());
+        }
+      }
+
+      if (exceptions.size() > 0) {
+        Throwable firstException = exceptions.getVal(0);
+        if(firstException instanceof SolrException) {
+          SolrException e = (SolrException) firstException;
+          throw new RouteException(ErrorCode.getErrorCode(e.code()), exceptions, routes);
+        } else {
+          throw new RouteException(ErrorCode.SERVER_ERROR, exceptions, routes);
+        }
+      }
+    } else {
+      for (Map.Entry<String, LBHttpSolrClient.Req> entry : routes.entrySet()) {
+        String url = entry.getKey();
+        LBHttpSolrClient.Req lbRequest = entry.getValue();
+        try {
+          NamedList<Object> rsp = lbClient.request(lbRequest).getResponse();
+          shardResponses.add(url, rsp);
+        } catch (Exception e) {
+          if(e instanceof SolrException) {
+            throw (SolrException) e;
+          } else {
+            throw new SolrServerException(e);
+          }
+        }
+      }
+    }
 
-    public RouteException(ErrorCode errorCode, NamedList<Throwable> throwables, Map<String, ? extends LBSolrClient.Req> routes) {
-      super(errorCode, throwables, routes);
+    UpdateRequest nonRoutableRequest = null;
+    List<String> deleteQuery = updateRequest.getDeleteQuery();
+    if (deleteQuery != null && deleteQuery.size() > 0) {
+      UpdateRequest deleteQueryRequest = new UpdateRequest();
+      deleteQueryRequest.setDeleteQuery(deleteQuery);
+      nonRoutableRequest = deleteQueryRequest;
+    }
+    
+    Set<String> paramNames = nonRoutableParams.getParameterNames();
+    
+    Set<String> intersection = new HashSet<>(paramNames);
+    intersection.retainAll(NON_ROUTABLE_PARAMS);
+    
+    if (nonRoutableRequest != null || intersection.size() > 0) {
+      if (nonRoutableRequest == null) {
+        nonRoutableRequest = new UpdateRequest();
+      }
+      nonRoutableRequest.setParams(nonRoutableParams);
+      nonRoutableRequest.setBasicAuthCredentials(request.getBasicAuthUser(), request.getBasicAuthPassword());
+      List<String> urlList = new ArrayList<>();
+      urlList.addAll(routes.keySet());
+      Collections.shuffle(urlList, rand);
+      LBHttpSolrClient.Req req = new LBHttpSolrClient.Req(nonRoutableRequest, urlList);
+      try {
+        LBHttpSolrClient.Rsp rsp = lbClient.request(req);
+        shardResponses.add(urlList.get(0), rsp.getResponse());
+      } catch (Exception e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, urlList.get(0), e);
+      }
+    }
+
+    long end = System.nanoTime();
+
+    RouteResponse rr = condenseResponse(shardResponses, (int) TimeUnit.MILLISECONDS.convert(end - start, TimeUnit.NANOSECONDS));
+    rr.setRouteResponses(shardResponses);
+    rr.setRoutes(routes);
+    return rr;
+  }
+
+  private Map<String,List<String>> buildUrlMap(DocCollection col) {
+    Map<String, List<String>> urlMap = new HashMap<>();
+    Slice[] slices = col.getActiveSlicesArr();
+    for (Slice slice : slices) {
+      String name = slice.getName();
+      List<String> urls = new ArrayList<>();
+      Replica leader = slice.getLeader();
+      if (directUpdatesToLeadersOnly && leader == null) {
+        for (Replica replica : slice.getReplicas(
+            replica -> replica.isActive(getClusterStateProvider().getLiveNodes())
+                && replica.getType() == Replica.Type.NRT)) {
+          leader = replica;
+          break;
+        }
+      }
+      if (leader == null) {
+        if (directUpdatesToLeadersOnly) {
+          continue;
+        }
+        // take unoptimized general path - we cannot find a leader yet
+        return null;
+      }
+      ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
+      String url = zkProps.getCoreUrl();
+      urls.add(url);
+      if (!directUpdatesToLeadersOnly) {
+        for (Replica replica : slice.getReplicas()) {
+          if (!replica.getNodeName().equals(leader.getNodeName()) &&
+              !replica.getName().equals(leader.getName())) {
+            ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
+            String url1 = zkProps1.getCoreUrl();
+            urls.add(url1);
+          }
+        }
+      }
+      urlMap.put(name, urls);
+    }
+    return urlMap;
+  }
+
+  public RouteResponse condenseResponse(NamedList response, int timeMillis) {
+    RouteResponse condensed = new RouteResponse();
+    int status = 0;
+    Integer rf = null;
+    Integer minRf = null;
+    
+    // TolerantUpdateProcessor
+    List<SimpleOrderedMap<String>> toleratedErrors = null; 
+    int maxToleratedErrors = Integer.MAX_VALUE;
+
+    // For "adds", "deletes", "deleteByQuery" etc.
+    Map<String, NamedList> versions = new HashMap<>();
+
+    for(int i=0; i<response.size(); i++) {
+      NamedList shardResponse = (NamedList)response.getVal(i);
+      NamedList header = (NamedList)shardResponse.get("responseHeader");      
+      Integer shardStatus = (Integer)header.get("status");
+      int s = shardStatus.intValue();
+      if(s > 0) {
+          status = s;
+      }
+      Object rfObj = header.get(UpdateRequest.REPFACT);
+      if (rfObj != null && rfObj instanceof Integer) {
+        Integer routeRf = (Integer)rfObj;
+        if (rf == null || routeRf < rf)
+          rf = routeRf;
+      }
+      minRf = (Integer)header.get(UpdateRequest.MIN_REPFACT);
+
+      List<SimpleOrderedMap<String>> shardTolerantErrors = 
+        (List<SimpleOrderedMap<String>>) header.get("errors");
+      if (null != shardTolerantErrors) {
+        Integer shardMaxToleratedErrors = (Integer) header.get("maxErrors");
+        assert null != shardMaxToleratedErrors : "TolerantUpdateProcessor reported errors but not maxErrors";
+        // if we get into some weird state where the nodes disagree about the effective maxErrors,
+        // assume the min value seen to decide if we should fail.
+        maxToleratedErrors = Math.min(maxToleratedErrors,
+                                      ToleratedUpdateError.getEffectiveMaxErrors(shardMaxToleratedErrors.intValue()));
+        
+        if (null == toleratedErrors) {
+          toleratedErrors = new ArrayList<SimpleOrderedMap<String>>(shardTolerantErrors.size());
+        }
+        for (SimpleOrderedMap<String> err : shardTolerantErrors) {
+          toleratedErrors.add(err);
+        }
+      }
+      for (String updateType: Arrays.asList("adds", "deletes", "deleteByQuery")) {
+        Object obj = shardResponse.get(updateType);
+        if (obj instanceof NamedList) {
+          NamedList versionsList = versions.containsKey(updateType) ?
+              versions.get(updateType): new NamedList();
+          versionsList.addAll((NamedList)obj);
+          versions.put(updateType, versionsList);
+        }
+      }
+    }
+
+    NamedList cheader = new NamedList();
+    cheader.add("status", status);
+    cheader.add("QTime", timeMillis);
+    if (rf != null)
+      cheader.add(UpdateRequest.REPFACT, rf);
+    if (minRf != null)
+      cheader.add(UpdateRequest.MIN_REPFACT, minRf);
+    if (null != toleratedErrors) {
+      cheader.add("maxErrors", ToleratedUpdateError.getUserFriendlyMaxErrors(maxToleratedErrors));
+      cheader.add("errors", toleratedErrors);
+      if (maxToleratedErrors < toleratedErrors.size()) {
+        // cumulative errors are too high, we need to throw a client exception w/correct metadata
+
+        // NOTE: it shouldn't be possible for 1 == toleratedErrors.size(), because if that were the case
+        // then at least one shard should have thrown a real error before this, so we don't worry
+        // about having a more "singular" exception msg for that situation
+        StringBuilder msgBuf =  new StringBuilder()
+          .append(toleratedErrors.size()).append(" Async failures during distributed update: ");
+          
+        NamedList metadata = new NamedList<String>();
+        for (SimpleOrderedMap<String> err : toleratedErrors) {
+          ToleratedUpdateError te = ToleratedUpdateError.parseMap(err);
+          metadata.add(te.getMetadataKey(), te.getMetadataValue());
+          
+          msgBuf.append("\n").append(te.getMessage());
+        }
+        
+        SolrException toThrow = new SolrException(ErrorCode.BAD_REQUEST, msgBuf.toString());
+        toThrow.setMetadata(metadata);
+        throw toThrow;
+      }
+    }
+    for (String updateType: versions.keySet()) {
+      condensed.add(updateType, versions.get(updateType));
+    }
+    condensed.add("responseHeader", cheader);
+    return condensed;
+  }
+
+  public static class RouteResponse extends NamedList {
+    private NamedList routeResponses;
+    private Map<String, LBHttpSolrClient.Req> routes;
+
+    public void setRouteResponses(NamedList routeResponses) {
+      this.routeResponses = routeResponses;
+    }
+
+    public NamedList getRouteResponses() {
+      return routeResponses;
+    }
+
+    public void setRoutes(Map<String, LBHttpSolrClient.Req> routes) {
+      this.routes = routes;
+    }
+
+    public Map<String, LBHttpSolrClient.Req> getRoutes() {
+      return routes;
+    }
+
+  }
+
+  public static class RouteException extends SolrException {
+
+    private NamedList<Throwable> throwables;
+    private Map<String, LBHttpSolrClient.Req> routes;
+
+    public RouteException(ErrorCode errorCode, NamedList<Throwable> throwables, Map<String, LBHttpSolrClient.Req> routes){
+      super(errorCode, throwables.getVal(0).getMessage(), throwables.getVal(0));
+      this.throwables = throwables;
+      this.routes = routes;
+
+      // create a merged copy of the metadata from all wrapped exceptions
+      NamedList<String> metadata = new NamedList<String>();
+      for (int i = 0; i < throwables.size(); i++) {
+        Throwable t = throwables.getVal(i);
+        if (t instanceof SolrException) {
+          SolrException e = (SolrException) t;
+          NamedList<String> eMeta = e.getMetadata();
+          if (null != eMeta) {
+            metadata.addAll(eMeta);
+          }
+        }
+      }
+      if (0 < metadata.size()) {
+        this.setMetadata(metadata);
+      }
+    }
+
+    public NamedList<Throwable> getThrowables() {
+      return throwables;
+    }
+
+    public Map<String, LBHttpSolrClient.Req> getRoutes() {
+      return this.routes;
     }
   }
 
   @Override
+  public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
+    // the collection parameter of the request overrides that of the parameter to this method
+    String requestCollection = request.getCollection();
+    if (requestCollection != null) {
+      collection = requestCollection;
+    } else if (collection == null) {
+      collection = defaultCollection;
+    }
+    List<String> inputCollections =
+        collection == null ? Collections.emptyList() : StrUtils.splitSmart(collection, ",", true);
+    return requestWithRetryOnStaleState(request, 0, inputCollections);
+  }
+
+  /**
+   * As this class doesn't watch external collections on the client side,
+   * there's a chance that the request will fail due to cached stale state,
+   * which means the state must be refreshed from ZK and retried.
+   */
+  protected NamedList<Object> requestWithRetryOnStaleState(SolrRequest request, int retryCount, List<String> inputCollections)
+      throws SolrServerException, IOException {
+    connect(); // important to call this before you start working with the ZkStateReader
+
+    // build up a _stateVer_ param to pass to the server containing all of the
+    // external collection state versions involved in this request, which allows
+    // the server to notify us that our cached state for one or more of the external
+    // collections is stale and needs to be refreshed ... this code has no impact on internal collections
+    String stateVerParam = null;
+    List<DocCollection> requestedCollections = null;
+    boolean isCollectionRequestOfV2 = false;
+    if (request instanceof V2RequestSupport) {
+      request = ((V2RequestSupport) request).getV2Request();
+    }
+    if (request instanceof V2Request) {
+      isCollectionRequestOfV2 = ((V2Request) request).isPerCollectionRequest();
+    }
+    boolean isAdmin = ADMIN_PATHS.contains(request.getPath());
+    if (!inputCollections.isEmpty() && !isAdmin && !isCollectionRequestOfV2) { // don't do _stateVer_ checking for admin, v2 api requests
+      Set<String> requestedCollectionNames = resolveAliases(inputCollections);
+
+      StringBuilder stateVerParamBuilder = null;
+      for (String requestedCollection : requestedCollectionNames) {
+        // track the version of state we're using on the client side using the _stateVer_ param
+        DocCollection coll = getDocCollection(requestedCollection, null);
+        if (coll == null) {
+          throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + requestedCollection);
+        }
+        int collVer = coll.getZNodeVersion();
+        if (coll.getStateFormat()>1) {
+          if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
+          requestedCollections.add(coll);
+
+          if (stateVerParamBuilder == null) {
+            stateVerParamBuilder = new StringBuilder();
+          } else {
+            stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
+          }
+
+          stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
+        }
+      }
+
+      if (stateVerParamBuilder != null) {
+        stateVerParam = stateVerParamBuilder.toString();
+      }
+    }
+
+    if (request.getParams() instanceof ModifiableSolrParams) {
+      ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
+      if (stateVerParam != null) {
+        params.set(STATE_VERSION, stateVerParam);
+      } else {
+        params.remove(STATE_VERSION);
+      }
+    } // else: ??? how to set this ???
+
+    NamedList<Object> resp = null;
+    try {
+      resp = sendRequest(request, inputCollections);
+      //to avoid an O(n) operation we always add STATE_VERSION to the last and try to read it from there
+      Object o = resp == null || resp.size() == 0 ? null : resp.get(STATE_VERSION, resp.size() - 1);
+      if(o != null && o instanceof Map) {
+        //remove this because no one else needs this and tests would fail if they are comparing responses
+        resp.remove(resp.size()-1);
+        Map invalidStates = (Map) o;
+        for (Object invalidEntries : invalidStates.entrySet()) {
+          Map.Entry e = (Map.Entry) invalidEntries;
+          getDocCollection((String) e.getKey(), (Integer) e.getValue());
+        }
+
+      }
+    } catch (Exception exc) {
+
+      Throwable rootCause = SolrException.getRootCause(exc);
+      // don't do retry support for admin requests
+      // or if the request doesn't have a collection specified
+      // or request is v2 api and its method is not GET
+      if (inputCollections.isEmpty() || isAdmin || (request instanceof V2Request && request.getMethod() != SolrRequest.METHOD.GET)) {
+        if (exc instanceof SolrServerException) {
+          throw (SolrServerException)exc;
+        } else if (exc instanceof IOException) {
+          throw (IOException)exc;
+        }else if (exc instanceof RuntimeException) {
+          throw (RuntimeException) exc;
+        }
+        else {
+          throw new SolrServerException(rootCause);
+        }
+      }
+
+      int errorCode = (rootCause instanceof SolrException) ?
+          ((SolrException)rootCause).code() : SolrException.ErrorCode.UNKNOWN.code;
+
+          boolean wasCommError =
+              (rootCause instanceof ConnectException ||
+                  rootCause instanceof ConnectTimeoutException ||
+                  rootCause instanceof NoHttpResponseException ||
+                  rootCause instanceof SocketException);
+          
+      log.error("Request to collection {} failed due to (" + errorCode + ") {}, retry={} commError={} errorCode={} ",
+          inputCollections, rootCause.toString(), retryCount, wasCommError, errorCode);
+
+      if (wasCommError
+          || (exc instanceof RouteException && (errorCode == 503)) // 404 because the core does not exist 503 service unavailable
+          //TODO there are other reasons for 404. We need to change the solr response format from HTML to structured data to know that
+          ) {
+        // it was a communication error. it is likely that
+        // the node to which the request to be sent is down . So , expire the state
+        // so that the next attempt would fetch the fresh state
+        // just re-read state for all of them, if it has not been retried
+        // in retryExpiryTime time
+        if (requestedCollections != null) {
+          for (DocCollection ext : requestedCollections) {
+            ExpiringCachedDocCollection cacheEntry = collectionStateCache.get(ext.getName());
+            if (cacheEntry == null) continue;
+            cacheEntry.maybeStale = true;
+          }
+        }
+        if (retryCount < MAX_STALE_RETRIES) {//if it is a communication error , we must try again
+          //may be, we have a stale version of the collection state
+          // and we could not get any information from the server
+          //it is probably not worth trying again and again because
+          // the state would not have been updated
+          log.info("trying request again");
+          return requestWithRetryOnStaleState(request, retryCount + 1, inputCollections);
+        }
+      } else {
+        log.info("request was not communication error it seems");
+      }
+
+      boolean stateWasStale = false;
+      if (retryCount < MAX_STALE_RETRIES  &&
+          requestedCollections != null    &&
+          !requestedCollections.isEmpty() &&
+          (SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE || errorCode == 404))
+      {
+        // cached state for one or more external collections was stale
+        // re-issue request using updated state
+        stateWasStale = true;
+
+        // just re-read state for all of them, which is a little heavy handed but hopefully a rare occurrence
+        for (DocCollection ext : requestedCollections) {
+          collectionStateCache.remove(ext.getName());
+        }
+      }
+
+      // if we experienced a communication error, it's worth checking the state
+      // with ZK just to make sure the node we're trying to hit is still part of the collection
+      if (retryCount < MAX_STALE_RETRIES &&
+          !stateWasStale &&
+          requestedCollections != null &&
+          !requestedCollections.isEmpty() &&
+          wasCommError) {
+        for (DocCollection ext : requestedCollections) {
+          DocCollection latestStateFromZk = getDocCollection(ext.getName(), null);
+          if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
+            // looks like we couldn't reach the server because the state was stale == retry
+            stateWasStale = true;
+            // we just pulled state from ZK, so update the cache so that the retry uses it
+            collectionStateCache.put(ext.getName(), new ExpiringCachedDocCollection(latestStateFromZk));
+          }
+        }
+      }
+
+      if (requestedCollections != null) {
+        requestedCollections.clear(); // done with this
+      }
+
+      // if the state was stale, then we retry the request once with new state pulled from Zk
+      if (stateWasStale) {
+        log.warn("Re-trying request to collection(s) "+inputCollections+" after stale state error from server.");
+        resp = requestWithRetryOnStaleState(request, retryCount+1, inputCollections);
+      } else {
+        if (exc instanceof SolrException || exc instanceof SolrServerException || exc instanceof IOException) {
+          throw exc;
+        } else {
+          throw new SolrServerException(rootCause);
+        }
+      }
+    }
+
+    return resp;
+  }
+
+  protected NamedList<Object> sendRequest(SolrRequest request, List<String> inputCollections)
+      throws SolrServerException, IOException {
+    connect();
+
+    boolean sendToLeaders = false;
+
+    if (request instanceof IsUpdateRequest) {
+      if (request instanceof UpdateRequest) {
+        String collection = inputCollections.isEmpty() ? null : inputCollections.get(0); // getting first mimics HttpSolrCall
+        NamedList<Object> response = directUpdate((AbstractUpdateRequest) request, collection);
+        if (response != null) {
+          return response;
+        }
+      }
+      sendToLeaders = true;
+    }
+    
+    SolrParams reqParams = request.getParams();
+    if (reqParams == null) { // TODO fix getParams to never return null!
+      reqParams = new ModifiableSolrParams();
+    }
+
+    final Set<String> liveNodes = stateProvider.getLiveNodes();
+
+    final List<String> theUrlList = new ArrayList<>(); // we populate this as follows...
+
+    if (request instanceof V2Request) {
+      if (!liveNodes.isEmpty()) {
+        List<String> liveNodesList = new ArrayList<>(liveNodes);
+        Collections.shuffle(liveNodesList, rand);
+        theUrlList.add(Utils.getBaseUrlForNodeName(liveNodesList.get(0),
+            (String) stateProvider.getClusterProperty(ZkStateReader.URL_SCHEME,"http")));
+      }
+
+    } else if (ADMIN_PATHS.contains(request.getPath())) {
+      for (String liveNode : liveNodes) {
+        theUrlList.add(Utils.getBaseUrlForNodeName(liveNode,
+            (String) stateProvider.getClusterProperty(ZkStateReader.URL_SCHEME,"http")));
+      }
+
+    } else { // Typical...
+      Set<String> collectionNames = resolveAliases(inputCollections);
+      if (collectionNames.isEmpty()) {
+        throw new SolrException(ErrorCode.BAD_REQUEST,
+            "No collection param specified on request and no default collection has been set: " + inputCollections);
+      }
+
+      // TODO: not a big deal because of the caching, but we could avoid looking
+      //   at every shard when getting leaders if we tweaked some things
+      
+      // Retrieve slices from the cloud state and, for each collection specified, add it to the Map of slices.
+      Map<String,Slice> slices = new HashMap<>();
+      String shardKeys = reqParams.get(ShardParams._ROUTE_);
+      for (String collectionName : collectionNames) {
+        DocCollection col = getDocCollection(collectionName, null);
+        if (col == null) {
+          throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
+        }
+        Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col);
+        ClientUtils.addSlices(slices, collectionName, routeSlices, true);
+      }
+
+      // Gather URLs, grouped by leader or replica
+      // TODO: allow filtering by group, role, etc
+      Set<String> seenNodes = new HashSet<>();
+      List<String> replicas = new ArrayList<>();
+      String joinedInputCollections = StrUtils.join(inputCollections, ',');
+      for (Slice slice : slices.values()) {
+        for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) {
+          ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
+          String node = coreNodeProps.getNodeName();
+          if (!liveNodes.contains(node) // Must be a live node to continue
+              || Replica.State.getState(coreNodeProps.getState()) != Replica.State.ACTIVE) // Must be an ACTIVE replica to continue
+            continue;
+          if (seenNodes.add(node)) { // if we haven't yet collected a URL to this node...
+            String url = ZkCoreNodeProps.getCoreUrl(nodeProps.getStr(ZkStateReader.BASE_URL_PROP), joinedInputCollections);
+            if (sendToLeaders && coreNodeProps.isLeader()) {
+              theUrlList.add(url); // put leaders here eagerly (if sendToLeader mode)
+            } else {
+              replicas.add(url); // replicas here
+            }
+          }
+        }
+      }
+
+      // Shuffle the leaders, if any    (none if !sendToLeaders)
+      Collections.shuffle(theUrlList, rand);
+
+      // Shuffle the replicas, if any, and append to our list
+      Collections.shuffle(replicas, rand);
+      theUrlList.addAll(replicas);
+
+      if (theUrlList.isEmpty()) {
+        collectionStateCache.keySet().removeAll(collectionNames);
+        throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
+            "Could not find a healthy node to handle the request.");
+      }
+    }
+
+    LBHttpSolrClient.Req req = new LBHttpSolrClient.Req(request, theUrlList);
+    LBHttpSolrClient.Rsp rsp = lbClient.request(req);
+    return rsp.getResponse();
+  }
+
+  /** Resolves the input collections to their possible aliased collections. Doesn't validate collection existence. */
+  private LinkedHashSet<String> resolveAliases(List<String> inputCollections) {
+    LinkedHashSet<String> collectionNames = new LinkedHashSet<>(); // consistent ordering
+    for (String collectionName : inputCollections) {
+      if (stateProvider.getState(collectionName) == null) {
+        // perhaps it's an alias
+        List<String> aliasedCollections = stateProvider.resolveAlias(collectionName);
+        // one more level of alias indirection...  (dubious that we should support this)
+        for (String aliasedCollection : aliasedCollections) {
+          collectionNames.addAll(stateProvider.resolveAlias(aliasedCollection));
+        }
+      } else {
+        collectionNames.add(collectionName); // it's a collection
+      }
+    }
+    return collectionNames;
+  }
+
+  @Override
   public void close() throws IOException {
     stateProvider.close();
     
@@ -170,7 +1141,9 @@ public class CloudSolrClient extends BaseCloudSolrClient {
       HttpClientUtil.close(myClient);
     }
 
-    super.close();
+    if(this.threadPool != null && !this.threadPool.isShutdown()) {
+      this.threadPool.shutdown();
+    }
   }
 
   public LBHttpSolrClient getLbClient() {
@@ -180,8 +1153,150 @@ public class CloudSolrClient extends BaseCloudSolrClient {
   public HttpClient getHttpClient() {
     return myClient;
   }
+  
+  public boolean isUpdatesToLeaders() {
+    return updatesToLeaders;
+  }
 
   /**
+   * @return true if direct updates are sent to shard leaders only
+   */
+  public boolean isDirectUpdatesToLeadersOnly() {
+    return directUpdatesToLeadersOnly;
+  }
+
+  /**If caches are expired they are refreshed after acquiring a lock.
+   * use this to set the number of locks
+   */
+  public void setParallelCacheRefreshes(int n){ locks = objectList(n); }
+
+  private static ArrayList<Object> objectList(int n) {
+    ArrayList<Object> l =  new ArrayList<>(n);
+    for(int i=0;i<n;i++) l.add(new Object());
+    return l;
+  }
+
+
+  protected DocCollection getDocCollection(String collection, Integer expectedVersion) throws SolrException {
+    if (expectedVersion == null) expectedVersion = -1;
+    if (collection == null) return null;
+    ExpiringCachedDocCollection cacheEntry = collectionStateCache.get(collection);
+    DocCollection col = cacheEntry == null ? null : cacheEntry.cached;
+    if (col != null) {
+      if (expectedVersion <= col.getZNodeVersion()
+          && !cacheEntry.shouldRetry()) return col;
+    }
+
+    CollectionRef ref = getCollectionRef(collection);
+    if (ref == null) {
+      //no such collection exists
+      return null;
+    }
+    if (!ref.isLazilyLoaded()) {
+      //it is readily available just return it
+      return ref.get();
+    }
+    List locks = this.locks;
+    final Object lock = locks.get(Math.abs(Hash.murmurhash3_x86_32(collection, 0, collection.length(), 0) % locks.size()));
+    DocCollection fetchedCol = null;
+    synchronized (lock) {
+      /*we have waited for sometime just check once again*/
+      cacheEntry = collectionStateCache.get(collection);
+      col = cacheEntry == null ? null : cacheEntry.cached;
+      if (col != null) {
+        if (expectedVersion <= col.getZNodeVersion()
+            && !cacheEntry.shouldRetry()) return col;
+      }
+      // We are going to fetch a new version
+      // we MUST try to get a new version
+      fetchedCol = ref.get();//this is a call to ZK
+      if (fetchedCol == null) return null;// this collection no more exists
+      if (col != null && fetchedCol.getZNodeVersion() == col.getZNodeVersion()) {
+        cacheEntry.setRetriedAt();//we retried and found that it is the same version
+        cacheEntry.maybeStale = false;
+      } else {
+        if (fetchedCol.getStateFormat() > 1)
+          collectionStateCache.put(collection, new ExpiringCachedDocCollection(fetchedCol));
+      }
+      return fetchedCol;
+    }
+  }
+
+  CollectionRef getCollectionRef(String collection) {
+    return stateProvider.getState(collection);
+  }
+
+  /**
+   * Useful for determining the minimum achieved replication factor across
+   * all shards involved in processing an update request, typically useful
+   * for gauging the replication factor of a batch. 
+   */
+  @SuppressWarnings("rawtypes")
+  public int getMinAchievedReplicationFactor(String collection, NamedList resp) {
+    // it's probably already on the top-level header set by condense
+    NamedList header = (NamedList)resp.get("responseHeader");
+    Integer achRf = (Integer)header.get(UpdateRequest.REPFACT);
+    if (achRf != null)
+      return achRf.intValue();
+
+    // not on the top-level header, walk the shard route tree
+    Map<String,Integer> shardRf = getShardReplicationFactor(collection, resp);
+    for (Integer rf : shardRf.values()) {
+      if (achRf == null || rf < achRf) {
+        achRf = rf;
+      }
+    }    
+    return (achRf != null) ? achRf.intValue() : -1;
+  }
+  
+  /**
+   * Walks the NamedList response after performing an update request looking for
+   * the replication factor that was achieved in each shard involved in the request.
+   * For single doc updates, there will be only one shard in the return value. 
+   */
+  @SuppressWarnings("rawtypes")
+  public Map<String,Integer> getShardReplicationFactor(String collection, NamedList resp) {
+    connect();
+    
+    Map<String,Integer> results = new HashMap<String,Integer>();
+    if (resp instanceof CloudSolrClient.RouteResponse) {
+      NamedList routes = ((CloudSolrClient.RouteResponse)resp).getRouteResponses();
+      DocCollection coll = getDocCollection(collection, null);
+      Map<String,String> leaders = new HashMap<String,String>();
+      for (Slice slice : coll.getActiveSlicesArr()) {
+        Replica leader = slice.getLeader();
+        if (leader != null) {
+          ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
+          String leaderUrl = zkProps.getBaseUrl() + "/" + zkProps.getCoreName();
+          leaders.put(leaderUrl, slice.getName());
+          String altLeaderUrl = zkProps.getBaseUrl() + "/" + collection;
+          leaders.put(altLeaderUrl, slice.getName());
+        }
+      }
+      
+      Iterator<Map.Entry<String,Object>> routeIter = routes.iterator();
+      while (routeIter.hasNext()) {
+        Map.Entry<String,Object> next = routeIter.next();
+        String host = next.getKey();
+        NamedList hostResp = (NamedList)next.getValue();
+        Integer rf = (Integer)((NamedList)hostResp.get("responseHeader")).get(UpdateRequest.REPFACT);
+        if (rf != null) {
+          String shard = leaders.get(host);
+          if (shard == null) {
+            if (host.endsWith("/"))
+              shard = leaders.get(host.substring(0,host.length()-1));
+            if (shard == null) {
+              shard = host;
+            }
+          }
+          results.put(shard, rf);
+        }
+      }
+    }    
+    return results;
+  }
+  
+  /**
    * @deprecated since 7.0  Use {@link Builder} methods instead. 
    */
   @Deprecated
@@ -193,10 +1308,29 @@ public class CloudSolrClient extends BaseCloudSolrClient {
     return stateProvider;
   }
 
-  @Override
-  protected boolean wasCommError(Throwable rootCause) {
-    return rootCause instanceof ConnectTimeoutException ||
-        rootCause instanceof NoHttpResponseException;
+  private static boolean hasInfoToFindLeaders(UpdateRequest updateRequest, String idField) {
+    final Map<SolrInputDocument,Map<String,Object>> documents = updateRequest.getDocumentsMap();
+    final Map<String,Map<String,Object>> deleteById = updateRequest.getDeleteByIdMap();
+
+    final boolean hasNoDocuments = (documents == null || documents.isEmpty());
+    final boolean hasNoDeleteById = (deleteById == null || deleteById.isEmpty());
+    if (hasNoDocuments && hasNoDeleteById) {
+      // no documents and no delete-by-id, so no info to find leader(s)
+      return false;
+    }
+
+    if (documents != null) {
+      for (final Map.Entry<SolrInputDocument,Map<String,Object>> entry : documents.entrySet()) {
+        final SolrInputDocument doc = entry.getKey();
+        final Object fieldValue = doc.getFieldValue(idField);
+        if (fieldValue == null) {
+          // a document with no id field value, so can't find leader for it
+          return false;
+        }
+      }
+    }
+
+    return true;
   }
 
   private static LBHttpSolrClient createLBHttpSolrClient(Builder cloudSolrClientBuilder, HttpClient httpClient) {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2ClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2ClusterStateProvider.java
deleted file mode 100644
index 335684a..0000000
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2ClusterStateProvider.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.client.solrj.impl;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.solr.client.solrj.SolrClient;
-
-public class Http2ClusterStateProvider extends BaseHttpClusterStateProvider {
-  final Http2SolrClient httpClient;
-  final boolean closeClient;
-
-  public Http2ClusterStateProvider(List<String> solrUrls, Http2SolrClient httpClient) throws Exception {
-    this.httpClient = httpClient == null? new Http2SolrClient.Builder().build(): httpClient;
-    this.closeClient = httpClient == null;
-    init(solrUrls);
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (this.closeClient && this.httpClient != null) {
-      httpClient.close();
-    }
-  }
-
-  @Override
-  protected SolrClient getSolrClient(String baseUrl) {
-    return new Http2SolrClient.Builder(baseUrl).withHttpClient(httpClient).build();
-  }
-}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 580a849..9b60460 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -93,18 +93,10 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.*;
 import static org.apache.solr.common.util.Utils.getObjectByPath;
 
 // TODO: error handling, small Http2SolrClient features, security, ssl
 /**
- * Difference between this {@link Http2SolrClient} and {@link HttpSolrClient}:
- * <ul>
- *  <li>{@link Http2SolrClient} sends requests in HTTP/2</li>
- *  <li>{@link Http2SolrClient} can point to multiple urls</li>
- *  <li>{@link Http2SolrClient} does not expose its internal httpClient like {@link HttpSolrClient#getHttpClient()},
- * sharing connection pools should be done by {@link Http2SolrClient.Builder#withHttpClient(Http2SolrClient)} </li>
- * </ul>
  * @lucene.experimental
  */
 public class Http2SolrClient extends SolrClient {
@@ -147,12 +139,20 @@ public class Http2SolrClient extends SolrClient {
     if (builder.idleTimeout != null) idleTimeout = builder.idleTimeout;
     else idleTimeout = HttpClientUtil.DEFAULT_SO_TIMEOUT;
 
-    if (builder.http2SolrClient == null) {
+    if (builder.httpClient == null) {
       httpClient = createHttpClient(builder);
       closeClient = true;
     } else {
-      httpClient = builder.http2SolrClient.httpClient;
+      httpClient = builder.httpClient;
     }
+    if (!httpClient.isStarted()) {
+      try {
+        httpClient.start();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
     assert ObjectReleaseTracker.track(this);
   }
 
@@ -160,12 +160,10 @@ public class Http2SolrClient extends SolrClient {
     this.listenerFactory.add(factory);
   }
 
-  // internal usage only
   HttpClient getHttpClient() {
     return httpClient;
   }
 
-  // internal usage only
   ProtocolHandlers getProtocolHandlers() {
     return httpClient.getProtocolHandlers();
   }
@@ -215,11 +213,6 @@ public class Http2SolrClient extends SolrClient {
 
     if (builder.idleTimeout != null) httpClient.setIdleTimeout(builder.idleTimeout);
     if (builder.connectionTimeout != null) httpClient.setConnectTimeout(builder.connectionTimeout);
-    try {
-      httpClient.start();
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
     return httpClient;
   }
 
@@ -452,7 +445,6 @@ public class Http2SolrClient extends SolrClient {
   private Request makeRequest(SolrRequest solrRequest, String collection)
       throws SolrServerException, IOException {
     Request req = createRequest(solrRequest, collection);
-    req.header(HttpHeader.ACCEPT_ENCODING, null);
     setListeners(solrRequest, req);
     if (solrRequest.getUserPrincipal() != null) {
       req.attribute(REQ_PRINCIPAL_KEY, solrRequest.getUserPrincipal());
@@ -798,7 +790,7 @@ public class Http2SolrClient extends SolrClient {
 
   public static class Builder {
 
-    private Http2SolrClient http2SolrClient;
+    private HttpClient httpClient;
     private SSLConfig sslConfig = defaultSSLConfig;
     private Integer idleTimeout;
     private Integer connectionTimeout;
@@ -818,11 +810,8 @@ public class Http2SolrClient extends SolrClient {
       return new Http2SolrClient(baseSolrUrl, this);
     }
 
-    /**
-     * Reuse {@code httpClient} connections pool
-     */
-    public Builder withHttpClient(Http2SolrClient httpClient) {
-      this.http2SolrClient = httpClient;
+    public Builder withHttpClient(HttpClient httpClient) {
+      this.httpClient = httpClient;
       return this;
     }
 
@@ -856,6 +845,52 @@ public class Http2SolrClient extends SolrClient {
 
   }
 
+  /**
+   * Subclass of SolrException that allows us to capture an arbitrary HTTP status code that may have been returned by
+   * the remote server or a proxy along the way.
+   */
+  public static class RemoteSolrException extends SolrException {
+    /**
+     * @param remoteHost the host the error was received from
+     * @param code       Arbitrary HTTP status code
+     * @param msg        Exception Message
+     * @param th         Throwable to wrap with this Exception
+     */
+    public RemoteSolrException(String remoteHost, int code, String msg, Throwable th) {
+      super(code, "Error from server at " + remoteHost + ": " + msg, th);
+    }
+  }
+
+  /**
+   * This should be thrown when a server has an error in executing the request and it sends a proper payload back to the
+   * client
+   */
+  public static class RemoteExecutionException extends RemoteSolrException {
+    private NamedList meta;
+
+    public RemoteExecutionException(String remoteHost, int code, String msg, NamedList meta) {
+      super(remoteHost, code, msg, null);
+      this.meta = meta;
+    }
+
+    public static RemoteExecutionException create(String host, NamedList errResponse) {
+      Object errObj = errResponse.get("error");
+      if (errObj != null) {
+        Number code = (Number) getObjectByPath(errObj, true, Collections.singletonList("code"));
+        String msg = (String) getObjectByPath(errObj, true, Collections.singletonList("msg"));
+        return new RemoteExecutionException(host, code == null ? ErrorCode.UNKNOWN.code : code.intValue(),
+            msg == null ? "Unknown Error" : msg, errResponse);
+
+      } else {
+        throw new RuntimeException("No error");
+      }
+    }
+
+    public NamedList getMetaData() {
+      return meta;
+    }
+  }
+
   public Set<String> getQueryParams() {
     return queryParams;
   }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java
index 07fd8f8..bbab3aa 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java
@@ -17,25 +17,67 @@
 package org.apache.solr.client.solrj.impl;
 
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.http.client.HttpClient;
 import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ClusterState.CollectionRef;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class HttpClusterStateProvider extends BaseHttpClusterStateProvider {
+public class HttpClusterStateProvider implements ClusterStateProvider {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private final HttpClient httpClient;
-  private final boolean clientIsInternal;
+  private String urlScheme;
+  volatile Set<String> liveNodes;
+  long liveNodesTimestamp = 0;
+  volatile Map<String, List<String>> aliases;
+  long aliasesTimestamp = 0;
+
+  private int cacheTimeout = 5; // the liveNodes and aliases cache will be invalidated after 5 secs
+  final HttpClient httpClient;
+  final boolean clientIsInternal;
 
   public HttpClusterStateProvider(List<String> solrUrls, HttpClient httpClient) throws Exception {
     this.httpClient = httpClient == null? HttpClientUtil.createClient(null): httpClient;
     this.clientIsInternal = httpClient == null;
-    init(solrUrls);
-  }
+    for (String solrUrl: solrUrls) {
+      urlScheme = solrUrl.startsWith("https")? "https": "http";
+      try (SolrClient initialClient = new HttpSolrClient.Builder().withBaseSolrUrl(solrUrl).withHttpClient(httpClient).build()) {
+        Set<String> liveNodes = fetchLiveNodes(initialClient); // throws exception if unable to fetch
+        this.liveNodes = liveNodes;
+        liveNodesTimestamp = System.nanoTime();
+        break;
+      } catch (IOException e) {
+        log.warn("Attempt to fetch live_nodes from " + solrUrl + " failed.", e);
+      }
+    }
 
-  @Override
-  protected SolrClient getSolrClient(String baseUrl) {
-    return new HttpSolrClient.Builder().withBaseSolrUrl(baseUrl).withHttpClient(httpClient).build();
+    if (this.liveNodes == null || this.liveNodes.isEmpty()) {
+      throw new RuntimeException("Tried fetching live_nodes using Solr URLs provided, i.e. " + solrUrls + ". However, "
+          + "succeeded in obtaining the cluster state from none of them."
+          + "If you think your Solr cluster is up and is accessible,"
+          + " you could try re-creating a new CloudSolrClient using working"
+          + " solrUrl(s) or zkHost(s).");
+    }
   }
 
   @Override
@@ -44,4 +86,247 @@ public class HttpClusterStateProvider extends BaseHttpClusterStateProvider {
       HttpClientUtil.close(httpClient);
     }
   }
+
+  @Override
+  public CollectionRef getState(String collection) {
+    for (String nodeName: liveNodes) {
+      try (HttpSolrClient client = new HttpSolrClient.Builder().
+          withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
+          withHttpClient(httpClient).build()) {
+        ClusterState cs = fetchClusterState(client, collection, null);
+        return cs.getCollectionRef(collection);
+      } catch (SolrServerException | IOException e) {
+        log.warn("Attempt to fetch cluster state from " +
+            Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
+      } catch (RemoteSolrException e) {
+        if ("NOT_FOUND".equals(e.getMetadata("CLUSTERSTATUS"))) {
+          return null;
+        }
+        log.warn("Attempt to fetch cluster state from " +
+            Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
+      } catch (NotACollectionException e) {
+        // Cluster state for the given collection was not found, could be an alias.
+        // Lets fetch/update our aliases:
+        getAliases(true);
+        return null;
+      }
+    }
+    throw new RuntimeException("Tried fetching cluster state using the node names we knew of, i.e. " + liveNodes +". However, "
+        + "succeeded in obtaining the cluster state from none of them."
+        + "If you think your Solr cluster is up and is accessible,"
+        + " you could try re-creating a new CloudSolrClient using working"
+        + " solrUrl(s) or zkHost(s).");
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private ClusterState fetchClusterState(SolrClient client, String collection, Map<String, Object> clusterProperties) throws SolrServerException, IOException, NotACollectionException {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    if (collection != null) {
+      params.set("collection", collection);
+    }
+    params.set("action", "CLUSTERSTATUS");
+    QueryRequest request = new QueryRequest(params);
+    request.setPath("/admin/collections");
+    NamedList cluster = (SimpleOrderedMap) client.request(request).get("cluster");
+    Map<String, Object> collectionsMap;
+    if (collection != null) {
+      collectionsMap = Collections.singletonMap(collection,
+          ((NamedList) cluster.get("collections")).get(collection));
+    } else {
+      collectionsMap = ((NamedList)cluster.get("collections")).asMap(10);
+    }
+    int znodeVersion;
+    Map<String, Object> collFromStatus = (Map<String, Object>) (collectionsMap).get(collection);
+    if (collection != null && collFromStatus == null) {
+      throw new NotACollectionException(); // probably an alias
+    }
+    if (collection != null) { // can be null if alias
+      znodeVersion =  (int) collFromStatus.get("znodeVersion");
+    } else {
+      znodeVersion = -1;
+    }
+    Set<String> liveNodes = new HashSet((List<String>)(cluster.get("live_nodes")));
+    this.liveNodes = liveNodes;
+    liveNodesTimestamp = System.nanoTime();
+    //TODO SOLR-11877 we don't know the znode path; CLUSTER_STATE is probably wrong leading to bad stateFormat
+    ClusterState cs = ClusterState.load(znodeVersion, collectionsMap, liveNodes, ZkStateReader.CLUSTER_STATE);
+    if (clusterProperties != null) {
+      Map<String, Object> properties = (Map<String, Object>) cluster.get("properties");
+      if (properties != null) {
+        clusterProperties.putAll(properties);
+      }
+    }
+    return cs;
+  }
+
+  @Override
+  public Set<String> getLiveNodes() {
+    if (liveNodes == null) {
+      throw new RuntimeException("We don't know of any live_nodes to fetch the"
+          + " latest live_nodes information from. "
+          + "If you think your Solr cluster is up and is accessible,"
+          + " you could try re-creating a new CloudSolrClient using working"
+          + " solrUrl(s) or zkHost(s).");
+    }
+    if (TimeUnit.SECONDS.convert((System.nanoTime() - liveNodesTimestamp), TimeUnit.NANOSECONDS) > getCacheTimeout()) {
+      for (String nodeName: liveNodes) {
+        try (HttpSolrClient client = new HttpSolrClient.Builder().
+            withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
+            withHttpClient(httpClient).build()) {
+          Set<String> liveNodes = fetchLiveNodes(client);
+          this.liveNodes = (liveNodes);
+          liveNodesTimestamp = System.nanoTime();
+          return liveNodes;
+        } catch (Exception e) {
+          log.warn("Attempt to fetch live_nodes from " +
+              Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
+        }
+      }
+      throw new RuntimeException("Tried fetching live_nodes using all the node names we knew of, i.e. " + liveNodes +". However, "
+          + "succeeded in obtaining the cluster state from none of them."
+          + "If you think your Solr cluster is up and is accessible,"
+          + " you could try re-creating a new CloudSolrClient using working"
+          + " solrUrl(s) or zkHost(s).");
+    } else {
+      return liveNodes; // cached copy is fresh enough
+    }
+  }
+
+  private static Set<String> fetchLiveNodes(SolrClient client) throws Exception {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set("action", "CLUSTERSTATUS");
+    QueryRequest request = new QueryRequest(params);
+    request.setPath("/admin/collections");
+    NamedList cluster = (SimpleOrderedMap) client.request(request).get("cluster");
+    Set<String> liveNodes = new HashSet((List<String>)(cluster.get("live_nodes")));
+    return liveNodes;
+  }
+
+  @Override
+  public List<String> resolveAlias(String aliasName) {
+    return Aliases.resolveAliasesGivenAliasMap(getAliases(false), aliasName);
+  }
+
+  private Map<String, List<String>> getAliases(boolean forceFetch) {
+    if (this.liveNodes == null) {
+      throw new RuntimeException("We don't know of any live_nodes to fetch the"
+          + " latest aliases information from. "
+          + "If you think your Solr cluster is up and is accessible,"
+          + " you could try re-creating a new CloudSolrClient using working"
+          + " solrUrl(s) or zkHost(s).");
+    }
+
+    if (forceFetch || this.aliases == null ||
+        TimeUnit.SECONDS.convert((System.nanoTime() - aliasesTimestamp), TimeUnit.NANOSECONDS) > getCacheTimeout()) {
+      for (String nodeName: liveNodes) {
+        try (HttpSolrClient client = new HttpSolrClient.Builder().
+            withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
+            withHttpClient(httpClient).build()) {
+
+          Map<String, List<String>> aliases = new CollectionAdminRequest.ListAliases().process(client).getAliasesAsLists();
+          this.aliases = aliases;
+          this.aliasesTimestamp = System.nanoTime();
+          return Collections.unmodifiableMap(this.aliases);
+        } catch (SolrServerException | RemoteSolrException | IOException e) {
+          // Situation where we're hitting an older Solr which doesn't have LISTALIASES
+          if (e instanceof RemoteSolrException && ((RemoteSolrException)e).code()==400) {
+            log.warn("LISTALIASES not found, possibly using older Solr server. Aliases won't work"
+                + " unless you re-create the CloudSolrClient using zkHost(s) or upgrade Solr server", e);
+            this.aliases = Collections.emptyMap();
+            this.aliasesTimestamp = System.nanoTime();
+            return aliases;
+          }
+          log.warn("Attempt to fetch cluster state from " +
+              Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
+        }
+      }
+
+      throw new RuntimeException("Tried fetching aliases using all the node names we knew of, i.e. " + liveNodes +". However, "
+          + "succeeded in obtaining the cluster state from none of them."
+          + "If you think your Solr cluster is up and is accessible,"
+          + " you could try re-creating a new CloudSolrClient using a working"
+          + " solrUrl or zkHost.");
+    } else {
+      return Collections.unmodifiableMap(this.aliases); // cached copy is fresh enough
+    }
+  }
+
+  @Override
+  public ClusterState getClusterState() throws IOException {
+    for (String nodeName: liveNodes) {
+      try (HttpSolrClient client = new HttpSolrClient.Builder().
+          withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
+          withHttpClient(httpClient).build()) {
+        ClusterState cs = fetchClusterState(client, null, null);
+        return cs;
+      } catch (SolrServerException | RemoteSolrException | IOException e) {
+        log.warn("Attempt to fetch cluster state from " +
+            Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
+      } catch (NotACollectionException e) {
+        // not possible! (we passed in null for collection so it can't be an alias)
+        throw new RuntimeException("null should never cause NotACollectionException in " +
+            "fetchClusterState() Please report this as a bug!");
+      }
+    }
+    throw new RuntimeException("Tried fetching cluster state using the node names we knew of, i.e. " + liveNodes +". However, "
+        + "succeeded in obtaining the cluster state from none of them."
+        + "If you think your Solr cluster is up and is accessible,"
+        + " you could try re-creating a new CloudSolrClient using working"
+        + " solrUrl(s) or zkHost(s).");
+  }
+
+  @Override
+  public Map<String, Object> getClusterProperties() {
+    for (String nodeName : liveNodes) {
+      try (HttpSolrClient client = new HttpSolrClient.Builder().
+          withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
+          withHttpClient(httpClient).build()) {
+        Map<String, Object> clusterProperties = new HashMap<>();
+        fetchClusterState(client, null, clusterProperties);
+        return clusterProperties;
+      } catch (SolrServerException | RemoteSolrException | IOException e) {
+        log.warn("Attempt to fetch cluster state from " +
+            Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
+      } catch (NotACollectionException e) {
+        // not possible! (we passed in null for collection so it can't be an alias)
+        throw new RuntimeException("null should never cause NotACollectionException in " +
+            "fetchClusterState() Please report this as a bug!");
+      }
+    }
+    throw new RuntimeException("Tried fetching cluster state using the node names we knew of, i.e. " + liveNodes + ". However, "
+        + "succeeded in obtaining the cluster state from none of them."
+        + "If you think your Solr cluster is up and is accessible,"
+        + " you could try re-creating a new CloudSolrClient using working"
+        + " solrUrl(s) or zkHost(s).");
+  }
+
+  @Override
+  public String getPolicyNameByCollection(String coll) {
+    throw new UnsupportedOperationException("Fetching cluster properties not supported"
+        + " using the HttpClusterStateProvider. "
+        + "ZkClientClusterStateProvider can be used for this."); // TODO
+  }
+
+  @Override
+  public Object getClusterProperty(String propertyName) {
+    if (propertyName.equals(ZkStateReader.URL_SCHEME)) {
+      return this.urlScheme;
+    }
+    return getClusterProperties().get(propertyName);
+  }
+
+  @Override
+  public void connect() {}
+
+  public int getCacheTimeout() {
+    return cacheTimeout;
+  }
+
+  public void setCacheTimeout(int cacheTimeout) {
+    this.cacheTimeout = cacheTimeout;
+  }
+
+  // This exception is not meant to escape this class it should be caught and wrapped.
+  private class NotACollectionException extends Exception {
+  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
index 8856be5..8831448 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
@@ -66,6 +66,7 @@ import org.apache.http.message.BasicHeader;
 import org.apache.http.message.BasicNameValuePair;
 import org.apache.http.util.EntityUtils;
 import org.apache.solr.client.solrj.ResponseParser;
+import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.V2RequestSupport;
@@ -90,7 +91,7 @@ import static org.apache.solr.common.util.Utils.getObjectByPath;
 /**
  * A SolrClient implementation that talks directly to a Solr server via HTTP
  */
-public class HttpSolrClient extends BaseHttpSolrClient {
+public class HttpSolrClient extends SolrClient {
 
   private static final String UTF_8 = StandardCharsets.UTF_8.name();
   private static final String DEFAULT_PATH = "/select";
@@ -559,7 +560,7 @@ public class HttpSolrClient extends BaseHttpSolrClient {
       } else {
         contentType = "";
       }
-
+      
       // handle some http level checks before trying to parse the response
       switch (httpStatus) {
         case HttpStatus.SC_OK:
@@ -797,26 +798,53 @@ s   * @deprecated since 7.0  Use {@link Builder} methods instead.
     this.useMultiPartPost = useMultiPartPost;
   }
 
-
   /**
-   * @deprecated since 8.0, catch {@link BaseHttpSolrClient.RemoteSolrException} instead
+   * Subclass of SolrException that allows us to capture an arbitrary HTTP
+   * status code that may have been returned by the remote server or a 
+   * proxy along the way.
    */
-  @Deprecated
-  public static class RemoteSolrException extends BaseHttpSolrClient.RemoteSolrException {
-
+  public static class RemoteSolrException extends SolrException {
+    /**
+     * @param remoteHost the host the error was received from
+     * @param code Arbitrary HTTP status code
+     * @param msg Exception Message
+     * @param th Throwable to wrap with this Exception
+     */
     public RemoteSolrException(String remoteHost, int code, String msg, Throwable th) {
-      super(remoteHost, code, msg, th);
+      super(code, "Error from server at " + remoteHost + ": " + msg, th);
     }
   }
 
   /**
-   * @deprecated since 8.0, catch {@link BaseHttpSolrClient.RemoteExecutionException} instead
+   * This should be thrown when a server has an error in executing the request and
+   * it sends a proper payload back to the client
    */
-  @Deprecated
-  public static class RemoteExecutionException extends BaseHttpSolrClient.RemoteExecutionException {
+  public static class RemoteExecutionException extends RemoteSolrException {
+    private NamedList meta;
 
     public RemoteExecutionException(String remoteHost, int code, String msg, NamedList meta) {
-      super(remoteHost, code, msg, meta);
+      super(remoteHost, code, msg, null);
+      this.meta = meta;
+    }
+
+
+    public static RemoteExecutionException create(String host, NamedList errResponse) {
+      Object errObj = errResponse.get("error");
+      if (errObj != null) {
+        Number code = (Number) getObjectByPath(errObj, true, Collections.singletonList("code"));
+        String msg = (String) getObjectByPath(errObj, true, Collections.singletonList("msg"));
+        return new RemoteExecutionException(host, code == null ? ErrorCode.UNKNOWN.code : code.intValue(),
+            msg == null ? "Unknown Error" : msg, errResponse);
+
+      } else {
+        throw new RuntimeException("No error");
+      }
+
+    }
+
+    public NamedList getMetaData() {
+
+      return meta;
     }
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
index 3eb20eb..8a142bf 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
@@ -31,10 +31,8 @@ import java.util.Objects;
 import java.util.Set;
 
 import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
-import org.apache.solr.client.solrj.impl.LBSolrClient;
 import org.apache.solr.client.solrj.response.UpdateResponse;
 import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.common.SolrInputDocument;
@@ -238,21 +236,25 @@ public class UpdateRequest extends AbstractUpdateRequest {
     params.set(UpdateParams.COMMIT, "true");
     return process(client, collection);
   }
-
-  private interface ReqSupplier<T extends LBSolrClient.Req> {
-    T get(SolrRequest solrRequest, List<String> servers);
-  }
-
-  private <T extends LBSolrClient.Req> Map<String, T> getRoutes(DocRouter router,
-                                                                               DocCollection col, Map<String,List<String>> urlMap,
-                                                                               ModifiableSolrParams params, String idField,
-                                                                               ReqSupplier<T> reqSupplier) {
+  
+  /**
+   * @param router to route updates with
+   * @param col DocCollection for the updates
+   * @param urlMap of the cluster
+   * @param params params to use
+   * @param idField the id field
+   * @return a Map of urls to requests
+   */
+  public Map<String,LBHttpSolrClient.Req> getRoutes(DocRouter router,
+      DocCollection col, Map<String,List<String>> urlMap,
+      ModifiableSolrParams params, String idField) {
+    
     if ((documents == null || documents.size() == 0)
         && (deleteById == null || deleteById.size() == 0)) {
       return null;
     }
-
-    Map<String,T> routes = new HashMap<>();
+    
+    Map<String,LBHttpSolrClient.Req> routes = new HashMap<>();
     if (documents != null) {
       Set<Entry<SolrInputDocument,Map<String,Object>>> entries = documents.entrySet();
       for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
@@ -271,7 +273,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
           return null;
         }
         String leaderUrl = urls.get(0);
-        T request = routes
+        LBHttpSolrClient.Req request = routes
             .get(leaderUrl);
         if (request == null) {
           UpdateRequest updateRequest = new UpdateRequest();
@@ -281,7 +283,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
           updateRequest.setPath(getPath());
           updateRequest.setBasicAuthCredentials(getBasicAuthUser(), getBasicAuthPassword());
           updateRequest.setResponseParser(getResponseParser());
-          request = reqSupplier.get(updateRequest, urls);
+          request = new LBHttpSolrClient.Req(updateRequest, urls);
           routes.put(leaderUrl, request);
         }
         UpdateRequest urequest = (UpdateRequest) request.getRequest();
@@ -297,17 +299,17 @@ public class UpdateRequest extends AbstractUpdateRequest {
         }
       }
     }
-
+    
     // Route the deleteById's
-
+    
     if (deleteById != null) {
-
+      
       Iterator<Map.Entry<String,Map<String,Object>>> entries = deleteById.entrySet()
           .iterator();
       while (entries.hasNext()) {
-
+        
         Map.Entry<String,Map<String,Object>> entry = entries.next();
-
+        
         String deleteId = entry.getKey();
         Map<String,Object> map = entry.getValue();
         Long version = null;
@@ -323,7 +325,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
           return null;
         }
         String leaderUrl = urls.get(0);
-        T request = routes.get(leaderUrl);
+        LBHttpSolrClient.Req request = routes.get(leaderUrl);
         if (request != null) {
           UpdateRequest urequest = (UpdateRequest) request.getRequest();
           urequest.deleteById(deleteId, version);
@@ -333,7 +335,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
           urequest.deleteById(deleteId, version);
           urequest.setCommitWithin(getCommitWithin());
           urequest.setBasicAuthCredentials(getBasicAuthUser(), getBasicAuthPassword());
-          request = reqSupplier.get(urequest, urls);
+          request = new LBHttpSolrClient.Req(urequest, urls);
           routes.put(leaderUrl, request);
         }
       }
@@ -342,36 +344,6 @@ public class UpdateRequest extends AbstractUpdateRequest {
     return routes;
   }
   
-  /**
-   * @param router to route updates with
-   * @param col DocCollection for the updates
-   * @param urlMap of the cluster
-   * @param params params to use
-   * @param idField the id field
-   * @return a Map of urls to requests
-   */
-  public Map<String, LBSolrClient.Req> getRoutesToCollection(DocRouter router,
-                                                             DocCollection col, Map<String,List<String>> urlMap,
-                                                             ModifiableSolrParams params, String idField) {
-    return getRoutes(router, col, urlMap, params, idField, LBSolrClient.Req::new);
-  }
-  
-  /**
-   * @param router to route updates with
-   * @param col DocCollection for the updates
-   * @param urlMap of the cluster
-   * @param params params to use
-   * @param idField the id field
-   * @return a Map of urls to requests
-   * @deprecated since 8.0, uses {@link #getRoutesToCollection(DocRouter, DocCollection, Map, ModifiableSolrParams, String)} instead
-   */
-  @Deprecated
-  public Map<String,LBHttpSolrClient.Req> getRoutes(DocRouter router,
-      DocCollection col, Map<String,List<String>> urlMap,
-      ModifiableSolrParams params, String idField) {
-    return getRoutes(router, col, urlMap, params, idField, LBHttpSolrClient.Req::new);
-  }
-  
   public void setDocIterator(Iterator<SolrInputDocument> docIterator) {
     this.docIterator = docIterator;
   }
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientBadInputTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientBadInputTest.java
deleted file mode 100644
index 6206d4d..0000000
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientBadInputTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.client.solrj.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.common.collect.Lists;
-import org.apache.lucene.util.LuceneTestCase;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.cloud.SolrCloudTestCase;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.hamcrest.core.StringContains.containsString;
-
-public class CloudHttp2SolrClientBadInputTest extends SolrCloudTestCase {
-  private static final List<String> NULL_STR_LIST = null;
-  private static final List<String> EMPTY_STR_LIST = new ArrayList<>();
-  private static final String ANY_COLLECTION = "ANY_COLLECTION";
-  private static final int ANY_COMMIT_WITHIN_TIME = -1;
-
-  @BeforeClass
-  public static void setupCluster() throws Exception {
-    configureCluster(1)
-        .configure();
-
-    final List<String> solrUrls = new ArrayList<>();
-  }
-
-  @Test
-  public void testDeleteByIdReportsInvalidIdLists() throws Exception {
-    try (SolrClient client = getCloudHttp2SolrClient(cluster)) {
-      assertExceptionThrownWithMessageContaining(IllegalArgumentException.class, Lists.newArrayList("ids", "null"), () -> {
-        client.deleteById(ANY_COLLECTION, NULL_STR_LIST);
-      });
-      assertExceptionThrownWithMessageContaining(IllegalArgumentException.class, Lists.newArrayList("ids", "empty"), () -> {
-        client.deleteById(ANY_COLLECTION, EMPTY_STR_LIST);
-      });
-      assertExceptionThrownWithMessageContaining(IllegalArgumentException.class, Lists.newArrayList("ids", "null"), () -> {
-        client.deleteById(ANY_COLLECTION, NULL_STR_LIST, ANY_COMMIT_WITHIN_TIME);
-      });
-      assertExceptionThrownWithMessageContaining(IllegalArgumentException.class, Lists.newArrayList("ids", "empty"), () -> {
-        client.deleteById(ANY_COLLECTION, EMPTY_STR_LIST, ANY_COMMIT_WITHIN_TIME);
-      });
-    }
-  }
-
-  private void assertExceptionThrownWithMessageContaining(Class expectedType, List<String> expectedStrings, LuceneTestCase.ThrowingRunnable runnable) {
-    Throwable thrown = expectThrows(expectedType, runnable);
-
-    if (expectedStrings != null) {
-      for (String expectedString : expectedStrings) {
-        assertThat(thrown.getMessage(), containsString(expectedString));
-      }
-    }
-  }
-}
\ No newline at end of file
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientBuilderTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientBuilderTest.java
deleted file mode 100644
index 72d5e8c..0000000
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientBuilderTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.client.solrj.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-
-import org.apache.lucene.util.LuceneTestCase;
-import org.junit.Test;
-
-public class CloudHttp2SolrClientBuilderTest extends LuceneTestCase {
-  private static final String ANY_CHROOT = "/ANY_CHROOT";
-  private static final String ANY_ZK_HOST = "ANY_ZK_HOST";
-  private static final String ANY_OTHER_ZK_HOST = "ANY_OTHER_ZK_HOST";
-
-  @Test
-  public void testSingleZkHostSpecified() throws IOException {
-    try(CloudHttp2SolrClient createdClient = new CloudHttp2SolrClient.Builder(Collections.singletonList(ANY_ZK_HOST), Optional.of(ANY_CHROOT))
-        .build()) {
-      final String clientZkHost = createdClient.getZkHost();
-
-      assertTrue(clientZkHost.contains(ANY_ZK_HOST));
-    }
-  }
-
-  @Test
-  public void testSeveralZkHostsSpecifiedSingly() throws IOException {
-    final List<String> zkHostList = new ArrayList<>();
-    zkHostList.add(ANY_ZK_HOST); zkHostList.add(ANY_OTHER_ZK_HOST);
-    try (CloudHttp2SolrClient createdClient = new CloudHttp2SolrClient.Builder(zkHostList, Optional.of(ANY_CHROOT))
-        .build()) {
-      final String clientZkHost = createdClient.getZkHost();
-
-      assertTrue(clientZkHost.contains(ANY_ZK_HOST));
-      assertTrue(clientZkHost.contains(ANY_OTHER_ZK_HOST));
-    }
-  }
-
-  @Test
-  public void testSeveralZkHostsSpecifiedTogether() throws IOException {
-    final ArrayList<String> zkHosts = new ArrayList<String>();
-    zkHosts.add(ANY_ZK_HOST);
-    zkHosts.add(ANY_OTHER_ZK_HOST);
-    try(CloudHttp2SolrClient createdClient = new CloudHttp2SolrClient.Builder(zkHosts, Optional.of(ANY_CHROOT)).build()) {
-      final String clientZkHost = createdClient.getZkHost();
-
-      assertTrue(clientZkHost.contains(ANY_ZK_HOST));
-      assertTrue(clientZkHost.contains(ANY_OTHER_ZK_HOST));
-    }
-  }
-
-  @Test
-  public void testByDefaultConfiguresClientToSendUpdatesOnlyToShardLeaders() throws IOException {
-    try(CloudHttp2SolrClient createdClient = new CloudHttp2SolrClient.Builder(Collections.singletonList(ANY_ZK_HOST), Optional.of(ANY_CHROOT)).build()) {
-      assertTrue(createdClient.isUpdatesToLeaders() == true);
-    }
-  }
-
-  @Test
-  public void testIsDirectUpdatesToLeadersOnlyDefault() throws IOException {
-    try(CloudHttp2SolrClient createdClient = new CloudHttp2SolrClient.Builder(Collections.singletonList(ANY_ZK_HOST), Optional.of(ANY_CHROOT)).build()) {
-      assertFalse(createdClient.isDirectUpdatesToLeadersOnly());
-    }
-  }
-
-}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientMultiConstructorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientMultiConstructorTest.java
deleted file mode 100644
index fa3425b..0000000
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientMultiConstructorTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.client.solrj.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Optional;
-
-import org.apache.lucene.util.LuceneTestCase;
-import org.apache.lucene.util.TestUtil;
-import org.junit.Test;
-
-public class CloudHttp2SolrClientMultiConstructorTest extends LuceneTestCase {
-  
-  /*
-   * NOTE: If you only include one String argument, it will NOT use the
-   * constructor with the variable argument list, which is the one that
-   * we are testing here.
-   */
-  Collection<String> hosts;
-
-  @Test
-  // commented out on: 24-Dec-2018   @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 20-Sep-2018
-  public void testZkConnectionStringConstructorWithValidChroot() throws IOException {
-    boolean setOrList = random().nextBoolean();
-    int numOfZKServers = TestUtil.nextInt(random(), 1, 5);
-    boolean withChroot = random().nextBoolean();
-
-    final String chroot = "/mychroot";
-
-    StringBuilder sb = new StringBuilder();
-
-    if(setOrList) {
-      /*
-        A LinkedHashSet is required here for testing, or we can't guarantee
-        the order of entries in the final string.
-       */
-      hosts = new LinkedHashSet<>();
-    } else {
-      hosts = new ArrayList<>();
-    }
-
-    for(int i=0; i<numOfZKServers; i++) {
-      String ZKString = "host" + i + ":2181";
-      hosts.add(ZKString);
-      sb.append(ZKString);
-      if(i<numOfZKServers -1) sb.append(",");
-    }
-
-    String clientChroot = null;
-    if (withChroot) {
-      sb.append(chroot);
-      clientChroot = "/mychroot";
-    }
-
-    try (CloudHttp2SolrClient client = new CloudHttp2SolrClient.Builder(new ArrayList<>(hosts), Optional.ofNullable(clientChroot)).build()) {
-      assertEquals(sb.toString(), client.getZkHost());
-    }
-  }
-  
-  @Test(expected = IllegalArgumentException.class)
-  // commented out on: 24-Dec-2018   @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 20-Sep-2018
-  public void testBadChroot() {
-    final List<String> zkHosts = new ArrayList<>();
-    zkHosts.add("host1:2181");
-    new CloudHttp2SolrClient.Builder(zkHosts, Optional.of("foo")).build();
-  }
-}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientRetryTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientRetryTest.java
deleted file mode 100644
index 52a4b84..0000000
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientRetryTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.client.solrj.impl;
-
-import java.util.Collections;
-import java.util.Optional;
-
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.cloud.SolrCloudTestCase;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.util.TestInjection;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class CloudHttp2SolrClientRetryTest extends SolrCloudTestCase {
-  private static final int NODE_COUNT = 1;
-
-  @BeforeClass
-  public static void setupCluster() throws Exception {
-    configureCluster(NODE_COUNT)
-        .addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
-        .configure();
-  }
-
-  @Test
-  public void testRetry() throws Exception {
-    String collectionName = "testRetry";
-    try (CloudHttp2SolrClient solrClient = new CloudHttp2SolrClient.Builder(Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty()).build()) {
-      CollectionAdminRequest.createCollection(collectionName, 1, 1)
-          .process(solrClient);
-
-      solrClient.add(collectionName, new SolrInputDocument("id", "1"));
-
-      ModifiableSolrParams params = new ModifiableSolrParams();
-      params.set(CommonParams.QT, "/admin/metrics");
-      String updateRequestCountKey = "solr.core.testRetry.shard1.replica_n1:UPDATE./update.requestTimes:count";
-      params.set("key", updateRequestCountKey);
-      params.set("indent", "true");
-
-      QueryResponse response = solrClient.query(collectionName, params, SolrRequest.METHOD.GET);
-      NamedList<Object> namedList = response.getResponse();
-      System.out.println(namedList);
-      NamedList metrics = (NamedList) namedList.get("metrics");
-      assertEquals(1L, metrics.get(updateRequestCountKey));
-
-      TestInjection.failUpdateRequests = "true:100";
-      try {
-        expectThrows(BaseCloudSolrClient.RouteException.class,
-            "Expected an exception on the client when failure is injected during updates", () -> {
-              solrClient.add(collectionName, new SolrInputDocument("id", "2"));
-            });
-      } finally {
-        TestInjection.reset();
-      }
-
-      response = solrClient.query(collectionName, params, SolrRequest.METHOD.GET);
-      namedList = response.getResponse();
-      System.out.println(namedList);
-      metrics = (NamedList) namedList.get("metrics");
-      assertEquals(2L, metrics.get(updateRequestCountKey));
-    }
-  }
-}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
deleted file mode 100644
index de8c311..0000000
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
+++ /dev/null
@@ -1,978 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.client.solrj.impl;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeoutException;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.lucene.util.LuceneTestCase.Slow;
-import org.apache.lucene.util.TestUtil;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.embedded.JettySolrRunner;
-import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.client.solrj.response.RequestStatusState;
-import org.apache.solr.client.solrj.response.UpdateResponse;
-import org.apache.solr.cloud.AbstractDistribZkTestBase;
-import org.apache.solr.cloud.SolrCloudTestCase;
-import org.apache.solr.common.SolrDocument;
-import org.apache.solr.common.SolrDocumentList;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.DocRouter;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.ShardParams;
-import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SimpleOrderedMap;
-import org.apache.solr.handler.admin.CollectionsHandler;
-import org.apache.solr.handler.admin.ConfigSetsHandler;
-import org.apache.solr.handler.admin.CoreAdminHandler;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.client.solrj.impl.BaseCloudSolrClient.*;
-
-
-/**
- * This test would be faster if we simulated the zk state instead.
- */
-@Slow
-public class CloudHttp2SolrClientTest extends SolrCloudTestCase {
-
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  
-  private static final String COLLECTION = "collection1";
-  private static final String COLLECTION2 = "2nd_collection";
-
-  private static final String id = "id";
-
-  private static final int TIMEOUT = 30;
-  private static final int NODE_COUNT = 3;
-
-  private static CloudHttp2SolrClient httpBasedCloudSolrClient = null;
-  private static CloudHttp2SolrClient zkBasedCloudSolrClient = null;
-
-  @Before
-  public void setupCluster() throws Exception {
-    configureCluster(NODE_COUNT)
-        .addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
-        .configure();
-
-    final List<String> solrUrls = new ArrayList<>();
-    solrUrls.add(cluster.getJettySolrRunner(0).getBaseUrl().toString());
-    httpBasedCloudSolrClient = new CloudHttp2SolrClient.Builder(solrUrls).build();
-    zkBasedCloudSolrClient = new CloudHttp2SolrClient.Builder(Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty()).build();
-  }
-
-  
-  @After 
-  public void tearDown() throws Exception {
-    if (httpBasedCloudSolrClient != null) {
-      try {
-        httpBasedCloudSolrClient.close();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-    if (zkBasedCloudSolrClient != null) {
-      try {
-        zkBasedCloudSolrClient.close();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-    
-    shutdownCluster();
-    super.tearDown();
-  }
-
-  /**
-   * Randomly return the cluster's ZK based CSC, or HttpClusterProvider based CSC.
-   */
-  private CloudHttp2SolrClient getRandomClient() {
-//    return random().nextBoolean()? zkBasedCloudSolrClient : httpBasedCloudSolrClient;
-    return httpBasedCloudSolrClient;
-  }
-
-  @Test
-  public void testParallelUpdateQTime() throws Exception {
-    CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
-    cluster.waitForActiveCollection(COLLECTION, 2, 2);
-    UpdateRequest req = new UpdateRequest();
-    for (int i=0; i<10; i++)  {
-      SolrInputDocument doc = new SolrInputDocument();
-      doc.addField("id", String.valueOf(TestUtil.nextInt(random(), 1000, 1100)));
-      req.add(doc);
-    }
-    UpdateResponse response = req.process(getRandomClient(), COLLECTION);
-    // See SOLR-6547, we just need to ensure that no exception is thrown here
-    assertTrue(response.getQTime() >= 0);
-  }
-
-  @Test
-  public void testOverwriteOption() throws Exception {
-
-    CollectionAdminRequest.createCollection("overwrite", "conf", 1, 1)
-        .processAndWait(cluster.getSolrClient(), TIMEOUT);
-    cluster.waitForActiveCollection("overwrite", 1, 1);
-    
-    new UpdateRequest()
-        .add("id", "0", "a_t", "hello1")
-        .add("id", "0", "a_t", "hello2")
-        .commit(cluster.getSolrClient(), "overwrite");
-
-    QueryResponse resp = cluster.getSolrClient().query("overwrite", new SolrQuery("*:*"));
-    assertEquals("There should be one document because overwrite=true", 1, resp.getResults().getNumFound());
-
-    new UpdateRequest()
-        .add(new SolrInputDocument(id, "1", "a_t", "hello1"), /* overwrite = */ false)
-        .add(new SolrInputDocument(id, "1", "a_t", "hello2"), false)
-        .commit(cluster.getSolrClient(), "overwrite");
-      
-    resp = getRandomClient().query("overwrite", new SolrQuery("*:*"));
-    assertEquals("There should be 3 documents because there should be two id=1 docs due to overwrite=false", 3, resp.getResults().getNumFound());
-
-  }
-
-  @Test
-  public void testAliasHandling() throws Exception {
-    CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
-    cluster.waitForActiveCollection(COLLECTION, 2, 2);
-
-    CollectionAdminRequest.createCollection(COLLECTION2, "conf", 2, 1).process(cluster.getSolrClient());
-    cluster.waitForActiveCollection(COLLECTION2, 2, 2);
-
-    CloudHttp2SolrClient client = getRandomClient();
-    SolrInputDocument doc = new SolrInputDocument("id", "1", "title_s", "my doc");
-    client.add(COLLECTION, doc);
-    client.commit(COLLECTION);
-    CollectionAdminRequest.createAlias("testalias", COLLECTION).process(cluster.getSolrClient());
-
-    SolrInputDocument doc2 = new SolrInputDocument("id", "2", "title_s", "my doc too");
-    client.add(COLLECTION2, doc2);
-    client.commit(COLLECTION2);
-    CollectionAdminRequest.createAlias("testalias2", COLLECTION2).process(cluster.getSolrClient());
-
-    CollectionAdminRequest.createAlias("testaliascombined", COLLECTION + "," + COLLECTION2).process(cluster.getSolrClient());
-
-    // ensure that the aliases have been registered
-    Map<String, String> aliases = new CollectionAdminRequest.ListAliases().process(cluster.getSolrClient()).getAliases();
-    assertEquals(COLLECTION, aliases.get("testalias"));
-    assertEquals(COLLECTION2, aliases.get("testalias2"));
-    assertEquals(COLLECTION + "," + COLLECTION2, aliases.get("testaliascombined"));
-
-    assertEquals(1, client.query(COLLECTION, params("q", "*:*")).getResults().getNumFound());
-    assertEquals(1, client.query("testalias", params("q", "*:*")).getResults().getNumFound());
-
-    assertEquals(1, client.query(COLLECTION2, params("q", "*:*")).getResults().getNumFound());
-    assertEquals(1, client.query("testalias2", params("q", "*:*")).getResults().getNumFound());
-
-    assertEquals(2, client.query("testaliascombined", params("q", "*:*")).getResults().getNumFound());
-
-    ModifiableSolrParams paramsWithBothCollections = params("q", "*:*", "collection", COLLECTION + "," + COLLECTION2);
-    assertEquals(2, client.query(null, paramsWithBothCollections).getResults().getNumFound());
-
-    ModifiableSolrParams paramsWithBothAliases = params("q", "*:*", "collection", "testalias,testalias2");
-    assertEquals(2, client.query(null, paramsWithBothAliases).getResults().getNumFound());
-
-    ModifiableSolrParams paramsWithCombinedAlias = params("q", "*:*", "collection", "testaliascombined");
-    assertEquals(2, client.query(null, paramsWithCombinedAlias).getResults().getNumFound());
-
-    ModifiableSolrParams paramsWithMixedCollectionAndAlias = params("q", "*:*", "collection", "testalias," + COLLECTION2);
-    assertEquals(2, client.query(null, paramsWithMixedCollectionAndAlias).getResults().getNumFound());
-  }
-
-  @Test
-  public void testRouting() throws Exception {
-    CollectionAdminRequest.createCollection("routing_collection", "conf", 2, 1).process(cluster.getSolrClient());
-    cluster.waitForActiveCollection("routing_collection", 2, 2);
-    
-    AbstractUpdateRequest request = new UpdateRequest()
-        .add(id, "0", "a_t", "hello1")
-        .add(id, "2", "a_t", "hello2")
-        .setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
-    
-    // Test single threaded routed updates for UpdateRequest
-    NamedList<Object> response = getRandomClient().request(request, "routing_collection");
-    if (getRandomClient().isDirectUpdatesToLeadersOnly()) {
-      checkSingleServer(response);
-    }
-    RouteResponse rr = (RouteResponse) response;
-    Map<String,LBSolrClient.Req> routes = rr.getRoutes();
-    Iterator<Map.Entry<String,LBSolrClient.Req>> it = routes.entrySet()
-        .iterator();
-    while (it.hasNext()) {
-      Map.Entry<String,LBSolrClient.Req> entry = it.next();
-      String url = entry.getKey();
-      UpdateRequest updateRequest = (UpdateRequest) entry.getValue()
-          .getRequest();
-      SolrInputDocument doc = updateRequest.getDocuments().get(0);
-      String id = doc.getField("id").getValue().toString();
-      ModifiableSolrParams params = new ModifiableSolrParams();
-      params.add("q", "id:" + id);
-      params.add("distrib", "false");
-      QueryRequest queryRequest = new QueryRequest(params);
-      try (HttpSolrClient solrClient = getHttpSolrClient(url)) {
-        QueryResponse queryResponse = queryRequest.process(solrClient);
-        SolrDocumentList docList = queryResponse.getResults();
-        assertTrue(docList.getNumFound() == 1);
-      }
-    }
-    
-    // Test the deleteById routing for UpdateRequest
-    
-    final UpdateResponse uResponse = new UpdateRequest()
-        .deleteById("0")
-        .deleteById("2")
-        .commit(cluster.getSolrClient(), "routing_collection");
-    if (getRandomClient().isDirectUpdatesToLeadersOnly()) {
-      checkSingleServer(uResponse.getResponse());
-    }
-
-    QueryResponse qResponse = getRandomClient().query("routing_collection", new SolrQuery("*:*"));
-    SolrDocumentList docs = qResponse.getResults();
-    assertEquals(0, docs.getNumFound());
-    
-    // Test Multi-Threaded routed updates for UpdateRequest
-    try (CloudSolrClient threadedClient = new CloudSolrClientBuilder
-        (Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
-        .withParallelUpdates(true)
-        .build()) {
-      threadedClient.setDefaultCollection("routing_collection");
-      response = threadedClient.request(request);
-      if (threadedClient.isDirectUpdatesToLeadersOnly()) {
-        checkSingleServer(response);
-      }
-      rr = (RouteResponse) response;
-      routes = rr.getRoutes();
-      it = routes.entrySet()
-          .iterator();
-      while (it.hasNext()) {
-        Map.Entry<String,LBSolrClient.Req> entry = it.next();
-        String url = entry.getKey();
-        UpdateRequest updateRequest = (UpdateRequest) entry.getValue()
-            .getRequest();
-        SolrInputDocument doc = updateRequest.getDocuments().get(0);
-        String id = doc.getField("id").getValue().toString();
-        ModifiableSolrParams params = new ModifiableSolrParams();
-        params.add("q", "id:" + id);
-        params.add("distrib", "false");
-        QueryRequest queryRequest = new QueryRequest(params);
-        try (HttpSolrClient solrClient = getHttpSolrClient(url)) {
-          QueryResponse queryResponse = queryRequest.process(solrClient);
-          SolrDocumentList docList = queryResponse.getResults();
-          assertTrue(docList.getNumFound() == 1);
-        }
-      }
-    }
-
-    // Test that queries with _route_ params are routed by the client
-
-    // Track request counts on each node before query calls
-    ClusterState clusterState = cluster.getSolrClient().getZkStateReader().getClusterState();
-    DocCollection col = clusterState.getCollection("routing_collection");
-    Map<String, Long> requestCountsMap = Maps.newHashMap();
-    for (Slice slice : col.getSlices()) {
-      for (Replica replica : slice.getReplicas()) {
-        String baseURL = (String) replica.get(ZkStateReader.BASE_URL_PROP);
-        requestCountsMap.put(baseURL, getNumRequests(baseURL, "routing_collection"));
-      }
-    }
-
-    // Collect the base URLs of the replicas of shard that's expected to be hit
-    DocRouter router = col.getRouter();
-    Collection<Slice> expectedSlices = router.getSearchSlicesSingle("0", null, col);
-    Set<String> expectedBaseURLs = Sets.newHashSet();
-    for (Slice expectedSlice : expectedSlices) {
-      for (Replica replica : expectedSlice.getReplicas()) {
-        String baseURL = (String) replica.get(ZkStateReader.BASE_URL_PROP);
-        expectedBaseURLs.add(baseURL);
-      }
-    }
-
-    assertTrue("expected urls is not fewer than all urls! expected=" + expectedBaseURLs
-        + "; all=" + requestCountsMap.keySet(),
-        expectedBaseURLs.size() < requestCountsMap.size());
-
-    // Calculate a number of shard keys that route to the same shard.
-    int n;
-    if (TEST_NIGHTLY) {
-      n = random().nextInt(999) + 2;
-    } else {
-      n = random().nextInt(9) + 2;
-    }
-    
-    List<String> sameShardRoutes = Lists.newArrayList();
-    sameShardRoutes.add("0");
-    for (int i = 1; i < n; i++) {
-      String shardKey = Integer.toString(i);
-      Collection<Slice> slices = router.getSearchSlicesSingle(shardKey, null, col);
-      log.info("Expected Slices {}", slices);
-      if (expectedSlices.equals(slices)) {
-        sameShardRoutes.add(shardKey);
-      }
-    }
-
-    assertTrue(sameShardRoutes.size() > 1);
-
-    // Do N queries with _route_ parameter to the same shard
-    for (int i = 0; i < n; i++) {
-      ModifiableSolrParams solrParams = new ModifiableSolrParams();
-      solrParams.set(CommonParams.Q, "*:*");
-      solrParams.set(ShardParams._ROUTE_, sameShardRoutes.get(random().nextInt(sameShardRoutes.size())));
-      log.info("output: {}", getRandomClient().query("routing_collection", solrParams));
-    }
-
-    // Request counts increase from expected nodes should aggregate to 1000, while there should be
-    // no increase in unexpected nodes.
-    int increaseFromExpectedUrls = 0;
-    int increaseFromUnexpectedUrls = 0;
-    Map<String, Long> numRequestsToUnexpectedUrls = Maps.newHashMap();
-    for (Slice slice : col.getSlices()) {
-      for (Replica replica : slice.getReplicas()) {
-        String baseURL = (String) replica.get(ZkStateReader.BASE_URL_PROP);
-
-        Long prevNumRequests = requestCountsMap.get(baseURL);
-        Long curNumRequests = getNumRequests(baseURL, "routing_collection");
-
-        long delta = curNumRequests - prevNumRequests;
-        if (expectedBaseURLs.contains(baseURL)) {
-          increaseFromExpectedUrls += delta;
-        } else {
-          increaseFromUnexpectedUrls += delta;
-          numRequestsToUnexpectedUrls.put(baseURL, delta);
-        }
-      }
-    }
-
-    assertEquals("Unexpected number of requests to expected URLs", n, increaseFromExpectedUrls);
-    assertEquals("Unexpected number of requests to unexpected URLs: " + numRequestsToUnexpectedUrls,
-        0, increaseFromUnexpectedUrls);
-
-  }
-
-  /**
-   * Tests if the specification of 'preferLocalShards' in the query-params
-   * limits the distributed query to locally hosted shards only
-   */
-  @Test
-  // commented 4-Sep-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 2-Aug-2018
-  public void preferLocalShardsTest() throws Exception {
-
-    String collectionName = "localShardsTestColl";
-
-    int liveNodes = cluster.getJettySolrRunners().size();
-
-    // For preferLocalShards to succeed in a test, every shard should have
-    // all its cores on the same node.
-    // Hence the below configuration for our collection
-    CollectionAdminRequest.createCollection(collectionName, "conf", liveNodes, liveNodes)
-        .setMaxShardsPerNode(liveNodes * liveNodes)
-        .processAndWait(cluster.getSolrClient(), TIMEOUT);
-    cluster.waitForActiveCollection(collectionName, liveNodes, liveNodes * liveNodes);
-    // Add some new documents
-    new UpdateRequest()
-        .add(id, "0", "a_t", "hello1")
-        .add(id, "2", "a_t", "hello2")
-        .add(id, "3", "a_t", "hello2")
-        .commit(getRandomClient(), collectionName);
-
-    // Run the actual test for 'preferLocalShards'
-    queryWithShardsPreferenceRules(getRandomClient(), false, collectionName);
-    queryWithShardsPreferenceRules(getRandomClient(), true, collectionName);
-  }
-
-  @SuppressWarnings("deprecation")
-  private void queryWithShardsPreferenceRules(CloudHttp2SolrClient cloudClient,
-                                          boolean useShardsPreference,
-                                          String collectionName)
-      throws Exception
-  {
-    SolrQuery qRequest = new SolrQuery("*:*");
-
-    ModifiableSolrParams qParams = new ModifiableSolrParams();
-    if (useShardsPreference) {
-      qParams.add(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION + ":" + ShardParams.REPLICA_LOCAL);
-    } else {
-      qParams.add(CommonParams.PREFER_LOCAL_SHARDS, "true");
-    }
-    qParams.add(ShardParams.SHARDS_INFO, "true");
-    qRequest.add(qParams);
-
-    // CloudSolrClient sends the request to some node.
-    // And since all the nodes are hosting cores from all shards, the
-    // distributed query formed by this node will select cores from the
-    // local shards only
-    QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
-
-    Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
-    assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
-
-    // Iterate over shards-info and check what cores responded
-    SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
-    Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
-    List<String> shardAddresses = new ArrayList<String>();
-    while (itr.hasNext()) {
-      Map.Entry<String, ?> e = itr.next();
-      assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
-      String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
-      assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress);
-      shardAddresses.add(shardAddress);
-    }
-    log.info("Shards giving the response: " + Arrays.toString(shardAddresses.toArray()));
-
-    // Make sure the distributed queries were directed to a single node only
-    Set<Integer> ports = new HashSet<Integer>();
-    for (String shardAddr: shardAddresses) {
-      URL url = new URL (shardAddr);
-      ports.add(url.getPort());
-    }
-
-    // This assertion would hold true as long as every shard has a core on each node
-    assertTrue ("Response was not received from shards on a single node",
-        shardAddresses.size() > 1 && ports.size()==1);
-  }
-
-  private Long getNumRequests(String baseUrl, String collectionName) throws
-      SolrServerException, IOException {
-    return getNumRequests(baseUrl, collectionName, "QUERY", "/select", null, false);
-  }
-
-  private Long getNumRequests(String baseUrl, String collectionName, String category, String key, String scope, boolean returnNumErrors) throws
-      SolrServerException, IOException {
-
-    NamedList<Object> resp;
-    try (HttpSolrClient client = getHttpSolrClient(baseUrl + "/"+ collectionName, 15000, 60000)) {
-      ModifiableSolrParams params = new ModifiableSolrParams();
-      params.set("qt", "/admin/mbeans");
-      params.set("stats", "true");
-      params.set("key", key);
-      params.set("cat", category);
-      // use generic request to avoid extra processing of queries
-      QueryRequest req = new QueryRequest(params);
-      resp = client.request(req);
-    }
-    String name;
-    if (returnNumErrors) {
-      name = category + "." + (scope != null ? scope : key) + ".errors";
-    } else {
-      name = category + "." + (scope != null ? scope : key) + ".requests";
-    }
-    Map<String,Object> map = (Map<String,Object>)resp.findRecursive("solr-mbeans", category, key, "stats");
-    if (map == null) {
-      return null;
-    }
-    if (scope != null) { // admin handler uses a meter instead of counter here
-      return (Long)map.get(name + ".count");
-    } else {
-      return (Long) map.get(name);
-    }
-  }
-
-  @Test
-  public void testNonRetryableRequests() throws Exception {
-    try (CloudSolrClient client = getCloudSolrClient(cluster.getZkServer().getZkAddress())) {
-      // important to have one replica on each node
-      RequestStatusState state = CollectionAdminRequest.createCollection("foo", "conf", 1, NODE_COUNT).processAndWait(client, 60);
-      if (state == RequestStatusState.COMPLETED) {
-        cluster.waitForActiveCollection("foo", 1, NODE_COUNT);
-        client.setDefaultCollection("foo");
-
-        Map<String, String> adminPathToMbean = new HashMap<>(CommonParams.ADMIN_PATHS.size());
-        adminPathToMbean.put(CommonParams.COLLECTIONS_HANDLER_PATH, CollectionsHandler.class.getName());
-        adminPathToMbean.put(CommonParams.CORES_HANDLER_PATH, CoreAdminHandler.class.getName());
-        adminPathToMbean.put(CommonParams.CONFIGSETS_HANDLER_PATH, ConfigSetsHandler.class.getName());
-        // we do not add the authc/authz handlers because they do not currently expose any mbeans
-
-        for (String adminPath : adminPathToMbean.keySet()) {
-          long errorsBefore = 0;
-          for (JettySolrRunner runner : cluster.getJettySolrRunners()) {
-            Long numRequests = getNumRequests(runner.getBaseUrl().toString(), "foo", "ADMIN", adminPathToMbean.get(adminPath), adminPath, true);
-            errorsBefore += numRequests;
-            log.info("Found {} requests to {} on {}", numRequests, adminPath, runner.getBaseUrl());
-          }
-
-          ModifiableSolrParams params = new ModifiableSolrParams();
-          params.set("qt", adminPath);
-          params.set("action", "foobar"); // this should cause an error
-          QueryRequest req = new QueryRequest(params);
-          try {
-            NamedList<Object> resp = client.request(req);
-            fail("call to foo for admin path " + adminPath + " should have failed");
-          } catch (Exception e) {
-            // expected
-          }
-          long errorsAfter = 0;
-          for (JettySolrRunner runner : cluster.getJettySolrRunners()) {
-            Long numRequests = getNumRequests(runner.getBaseUrl().toString(), "foo", "ADMIN", adminPathToMbean.get(adminPath), adminPath, true);
-            errorsAfter += numRequests;
-            log.info("Found {} requests to {} on {}", numRequests, adminPath, runner.getBaseUrl());
-          }
-          assertEquals(errorsBefore + 1, errorsAfter);
-        }
-      } else {
-        fail("Collection could not be created within 60 seconds");
-      }
-    }
-  }
-
-  @Test
-  public void checkCollectionParameters() throws Exception {
-
-    try (CloudSolrClient client = getCloudSolrClient(cluster.getZkServer().getZkAddress())) {
-
-      String async1 = CollectionAdminRequest.createCollection("multicollection1", "conf", 2, 1)
-          .processAsync(client);
-      String async2 = CollectionAdminRequest.createCollection("multicollection2", "conf", 2, 1)
-          .processAsync(client);
-
-      CollectionAdminRequest.waitForAsyncRequest(async1, client, TIMEOUT);
-      CollectionAdminRequest.waitForAsyncRequest(async2, client, TIMEOUT);
-      cluster.waitForActiveCollection("multicollection1", 2, 2);
-      cluster.waitForActiveCollection("multicollection2", 2, 2);
-      client.setDefaultCollection("multicollection1");
-
-      List<SolrInputDocument> docs = new ArrayList<>(3);
-      for (int i = 0; i < 3; i++) {
-        SolrInputDocument doc = new SolrInputDocument();
-        doc.addField(id, Integer.toString(i));
-        doc.addField("a_t", "hello");
-        docs.add(doc);
-      }
-
-      client.add(docs);     // default - will add them to multicollection1
-      client.commit();
-
-      ModifiableSolrParams queryParams = new ModifiableSolrParams();
-      queryParams.add("q", "*:*");
-      assertEquals(3, client.query(queryParams).getResults().size());
-      assertEquals(0, client.query("multicollection2", queryParams).getResults().size());
-
-      SolrQuery query = new SolrQuery("*:*");
-      query.set("collection", "multicollection2");
-      assertEquals(0, client.query(query).getResults().size());
-
-      client.add("multicollection2", docs);
-      client.commit("multicollection2");
-
-      assertEquals(3, client.query("multicollection2", queryParams).getResults().size());
-
-    }
-
-  }
-
-  @Test
-  public void stateVersionParamTest() throws Exception {
-    CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
-    cluster.waitForActiveCollection(COLLECTION, 2, 2);
-
-    DocCollection coll = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION);
-    Replica r = coll.getSlices().iterator().next().getReplicas().iterator().next();
-
-    SolrQuery q = new SolrQuery().setQuery("*:*");
-    HttpSolrClient.RemoteSolrException sse = null;
-
-    final String url = r.getStr(ZkStateReader.BASE_URL_PROP) + "/" + COLLECTION;
-    try (HttpSolrClient solrClient = getHttpSolrClient(url)) {
-
-      log.info("should work query, result {}", solrClient.query(q));
-      //no problem
-      q.setParam(CloudSolrClient.STATE_VERSION, COLLECTION + ":" + coll.getZNodeVersion());
-      log.info("2nd query , result {}", solrClient.query(q));
-      //no error yet good
-
-      q.setParam(CloudSolrClient.STATE_VERSION, COLLECTION + ":" + (coll.getZNodeVersion() - 1)); //an older version expect error
-
-      QueryResponse rsp = solrClient.query(q);
-      Map m = (Map) rsp.getResponse().get(CloudSolrClient.STATE_VERSION, rsp.getResponse().size()-1);
-      assertNotNull("Expected an extra information from server with the list of invalid collection states", m);
-      assertNotNull(m.get(COLLECTION));
-    }
-
-    //now send the request to another node that does not serve the collection
-
-    Set<String> allNodesOfColl = new HashSet<>();
-    for (Slice slice : coll.getSlices()) {
-      for (Replica replica : slice.getReplicas()) {
-        allNodesOfColl.add(replica.getStr(ZkStateReader.BASE_URL_PROP));
-      }
-    }
-    String theNode = null;
-    Set<String> liveNodes = cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes();
-    for (String s : liveNodes) {
-      String n = cluster.getSolrClient().getZkStateReader().getBaseUrlForNodeName(s);
-      if(!allNodesOfColl.contains(n)){
-        theNode = n;
-        break;
-      }
-    }
-    log.info("the node which does not serve this collection{} ",theNode);
-    assertNotNull(theNode);
-
-
-    final String solrClientUrl = theNode + "/" + COLLECTION;
-    try (SolrClient solrClient = getHttpSolrClient(solrClientUrl)) {
-
-      q.setParam(CloudSolrClient.STATE_VERSION, COLLECTION + ":" + (coll.getZNodeVersion()-1));
-      try {
-        QueryResponse rsp = solrClient.query(q);
-        log.info("error was expected");
-      } catch (HttpSolrClient.RemoteSolrException e) {
-        sse = e;
-      }
-      assertNotNull(sse);
-      assertEquals(" Error code should be 510", SolrException.ErrorCode.INVALID_STATE.code, sse.code());
-    }
-
-  }
-
-  @Test
-  public void testShutdown() throws IOException {
-    try (CloudSolrClient client = getCloudSolrClient("[ff01::114]:33332")) {
-      client.setZkConnectTimeout(100);
-      client.connect();
-      fail("Expected exception");
-    } catch (SolrException e) {
-      assertTrue(e.getCause() instanceof TimeoutException);
-    }
-  }
-
-  @Rule
-  public ExpectedException exception = ExpectedException.none();
-
-  @Test
-  public void testWrongZkChrootTest() throws IOException {
-
-    exception.expect(SolrException.class);
-    exception.expectMessage("cluster not found/not ready");
-
-    try (CloudSolrClient client = getCloudSolrClient(cluster.getZkServer().getZkAddress() + "/xyz/foo")) {
-      client.setZkClientTimeout(1000 * 60);
-      client.connect();
-      fail("Expected exception");
-    }
-  }
-
-  @Test
-  public void customHttpClientTest() throws IOException {
-    CloseableHttpClient client = HttpClientUtil.createClient(null);
-    try (CloudSolrClient solrClient = getCloudSolrClient(cluster.getZkServer().getZkAddress(), client)) {
-
-      assertTrue(solrClient.getLbClient().getHttpClient() == client);
-
-    } finally {
-      HttpClientUtil.close(client);
-    }
-  }
-
-  @Test
-  public void testVersionsAreReturned() throws Exception {
-    CollectionAdminRequest.createCollection("versions_collection", "conf", 2, 1).process(cluster.getSolrClient());
-    cluster.waitForActiveCollection("versions_collection", 2, 2);
-    
-    // assert that "adds" are returned
-    UpdateRequest updateRequest = new UpdateRequest()
-        .add("id", "1", "a_t", "hello1")
-        .add("id", "2", "a_t", "hello2");
-    updateRequest.setParam(UpdateParams.VERSIONS, Boolean.TRUE.toString());
-
-    NamedList<Object> response = updateRequest.commit(getRandomClient(), "versions_collection").getResponse();
-    Object addsObject = response.get("adds");
-    
-    assertNotNull("There must be a adds parameter", addsObject);
-    assertTrue(addsObject instanceof NamedList<?>);
-    NamedList<?> adds = (NamedList<?>) addsObject;
-    assertEquals("There must be 2 versions (one per doc)", 2, adds.size());
-
-    Map<String, Long> versions = new HashMap<>();
-    Object object = adds.get("1");
-    assertNotNull("There must be a version for id 1", object);
-    assertTrue("Version for id 1 must be a long", object instanceof Long);
-    versions.put("1", (Long) object);
-
-    object = adds.get("2");
-    assertNotNull("There must be a version for id 2", object);
-    assertTrue("Version for id 2 must be a long", object instanceof Long);
-    versions.put("2", (Long) object);
-
-    QueryResponse resp = getRandomClient().query("versions_collection", new SolrQuery("*:*"));
-    assertEquals("There should be one document because overwrite=true", 2, resp.getResults().getNumFound());
-
-    for (SolrDocument doc : resp.getResults()) {
-      Long version = versions.get(doc.getFieldValue("id"));
-      assertEquals("Version on add must match _version_ field", version, doc.getFieldValue("_version_"));
-    }
-
-    // assert that "deletes" are returned
-    UpdateRequest deleteRequest = new UpdateRequest().deleteById("1");
-    deleteRequest.setParam(UpdateParams.VERSIONS, Boolean.TRUE.toString());
-    response = deleteRequest.commit(getRandomClient(), "versions_collection").getResponse();
-    Object deletesObject = response.get("deletes");
-    assertNotNull("There must be a deletes parameter", deletesObject);
-    NamedList deletes = (NamedList) deletesObject;
-    assertEquals("There must be 1 version", 1, deletes.size());
-  }
-  
-  @Test
-  public void testInitializationWithSolrUrls() throws Exception {
-    CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
-    cluster.waitForActiveCollection(COLLECTION, 2, 2);
-    CloudHttp2SolrClient client = httpBasedCloudSolrClient;
-    SolrInputDocument doc = new SolrInputDocument("id", "1", "title_s", "my doc");
-    client.add(COLLECTION, doc);
-    client.commit(COLLECTION);
-    assertEquals(1, client.query(COLLECTION, params("q", "*:*")).getResults().getNumFound());
-  }
-
-  @Test
-  public void testCollectionDoesntExist() throws Exception {
-    CloudHttp2SolrClient client = getRandomClient();
-    SolrInputDocument doc = new SolrInputDocument("id", "1", "title_s", "my doc");
-    try {
-      client.add("boguscollectionname", doc);
-      fail();
-    } catch (SolrException ex) {
-      if (ex.getMessage().equals("Collection not found: boguscollectionname")) {
-        // pass
-      } else {
-        throw ex;
-      }
-    }
-  }
-
-  public void testRetryUpdatesWhenClusterStateIsStale() throws Exception {
-    final String COL = "stale_state_test_col";
-    assert cluster.getJettySolrRunners().size() >= 2;
-
-    final JettySolrRunner old_leader_node = cluster.getJettySolrRunners().get(0);
-    final JettySolrRunner new_leader_node = cluster.getJettySolrRunners().get(1);
-    
-    // start with exactly 1 shard/replica...
-    assertEquals("Couldn't create collection", 0,
-                 CollectionAdminRequest.createCollection(COL, "conf", 1, 1)
-                 .setCreateNodeSet(old_leader_node.getNodeName())
-                 .process(cluster.getSolrClient()).getStatus());
-    cluster.waitForActiveCollection(COL, 1, 1);
-
-    // determine the coreNodeName of only current replica
-    Collection<Slice> slices = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COL).getSlices();
-    assertEquals(1, slices.size()); // sanity check
-    Slice slice = slices.iterator().next();
-    assertEquals(1, slice.getReplicas().size()); // sanity check
-    final String old_leader_core_node_name = slice.getLeader().getName();
-
-    // NOTE: creating our own CloudSolrClient whose settings we can muck with...
-    try (CloudSolrClient stale_client = new CloudSolrClientBuilder
-        (Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
-        .sendDirectUpdatesToAnyShardReplica()
-        .withParallelUpdates(true)
-        .build()) {
-      // don't let collection cache entries get expired, even on a slow machine...
-      stale_client.setCollectionCacheTTl(Integer.MAX_VALUE);
-      stale_client.setDefaultCollection(COL);
-     
-      // do a query to populate stale_client's cache...
-      assertEquals(0, stale_client.query(new SolrQuery("*:*")).getResults().getNumFound());
-      
-      // add 1 replica on a diff node...
-      assertEquals("Couldn't create collection", 0,
-                   CollectionAdminRequest.addReplicaToShard(COL, "shard1")
-                   .setNode(new_leader_node.getNodeName())
-                   // NOTE: don't use our stale_client for this -- don't tip it off of a collection change
-                   .process(cluster.getSolrClient()).getStatus());
-      AbstractDistribZkTestBase.waitForRecoveriesToFinish
-        (COL, cluster.getSolrClient().getZkStateReader(), true, true, 330);
-      
-      // ...and delete our original leader.
-      assertEquals("Couldn't create collection", 0,
-                   CollectionAdminRequest.deleteReplica(COL, "shard1", old_leader_core_node_name)
-                   // NOTE: don't use our stale_client for this -- don't tip it off of a collection change
-                   .process(cluster.getSolrClient()).getStatus());
-      AbstractDistribZkTestBase.waitForRecoveriesToFinish
-        (COL, cluster.getSolrClient().getZkStateReader(), true, true, 330);
-
-      // stale_client's collection state cache should now only point at a leader that no longer exists.
-      
-      // attempt a (direct) update that should succeed in spite of cached cluster state
-      // pointing solely to a node that's no longer part of our collection...
-      assertEquals(0, (new UpdateRequest().add("id", "1").commit(stale_client, COL)).getStatus());
-      assertEquals(1, stale_client.query(new SolrQuery("*:*")).getResults().getNumFound());
-      
-    }
-  }
-  
-
-  private static void checkSingleServer(NamedList<Object> response) {
-    final RouteResponse rr = (RouteResponse) response;
-    final Map<String,LBSolrClient.Req> routes = rr.getRoutes();
-    final Iterator<Map.Entry<String,LBSolrClient.Req>> it =
-        routes.entrySet().iterator();
-    while (it.hasNext()) {
-      Map.Entry<String,LBSolrClient.Req> entry = it.next();
-        assertEquals("wrong number of servers: "+entry.getValue().getServers(),
-            1, entry.getValue().getServers().size());
-    }
-  }
-
-  /**
-   * Tests if the specification of 'preferReplicaTypes` in the query-params
-   * limits the distributed query to locally hosted shards only
-   */
-  @Test
-  // commented 15-Sep-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 2-Aug-2018
-  public void preferReplicaTypesTest() throws Exception {
-
-    String collectionName = "replicaTypesTestColl";
-
-    int liveNodes = cluster.getJettySolrRunners().size();
-
-    // For these tests we need to have multiple replica types.
-    // Hence the below configuration for our collection
-    int pullReplicas = Math.max(1, liveNodes - 2);
-    CollectionAdminRequest.createCollection(collectionName, "conf", liveNodes, 1, 1, pullReplicas)
-        .setMaxShardsPerNode(liveNodes)
-        .processAndWait(cluster.getSolrClient(), TIMEOUT);
-    cluster.waitForActiveCollection(collectionName, liveNodes, liveNodes * (2 + pullReplicas));
-    
-    // Add some new documents
-    new UpdateRequest()
-        .add(id, "0", "a_t", "hello1")
-        .add(id, "2", "a_t", "hello2")
-        .add(id, "3", "a_t", "hello2")
-        .commit(getRandomClient(), collectionName);
-
-    // Run the actual tests for 'shards.preference=replica.type:*'
-    queryWithPreferReplicaTypes(getRandomClient(), "PULL", false, collectionName);
-    queryWithPreferReplicaTypes(getRandomClient(), "PULL|TLOG", false, collectionName);
-    queryWithPreferReplicaTypes(getRandomClient(), "TLOG", false, collectionName);
-    queryWithPreferReplicaTypes(getRandomClient(), "TLOG|PULL", false, collectionName);
-    queryWithPreferReplicaTypes(getRandomClient(), "NRT", false, collectionName);
-    queryWithPreferReplicaTypes(getRandomClient(), "NRT|PULL", false, collectionName);
-    // Test to verify that preferLocalShards=true doesn't break this
-    queryWithPreferReplicaTypes(getRandomClient(), "PULL", true, collectionName);
-    queryWithPreferReplicaTypes(getRandomClient(), "PULL|TLOG", true, collectionName);
-    queryWithPreferReplicaTypes(getRandomClient(), "TLOG", true, collectionName);
-    queryWithPreferReplicaTypes(getRandomClient(), "TLOG|PULL", true, collectionName);
-    queryWithPreferReplicaTypes(getRandomClient(), "NRT", false, collectionName);
-    queryWithPreferReplicaTypes(getRandomClient(), "NRT|PULL", true, collectionName);
-  }
-
-  private void queryWithPreferReplicaTypes(CloudHttp2SolrClient cloudClient,
-                                           String preferReplicaTypes,
-                                           boolean preferLocalShards,
-                                           String collectionName)
-      throws Exception
-  {
-    SolrQuery qRequest = new SolrQuery("*:*");
-    ModifiableSolrParams qParams = new ModifiableSolrParams();
-
-    final List<String> preferredTypes = Arrays.asList(preferReplicaTypes.split("\\|"));
-    StringBuilder rule = new StringBuilder();
-    preferredTypes.forEach(type -> {
-      if (rule.length() != 0) {
-        rule.append(',');
-      }
-      rule.append(ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE);
-      rule.append(':');
-      rule.append(type);
-    });
-    if (preferLocalShards) {
-      if (rule.length() != 0) {
-        rule.append(',');
-      }
-      rule.append(ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION);
-      rule.append(":local");
-    }
-    qParams.add(ShardParams.SHARDS_PREFERENCE, rule.toString());  
-    qParams.add(ShardParams.SHARDS_INFO, "true");
-    qRequest.add(qParams);
-
-    // CloudSolrClient sends the request to some node.
-    // And since all the nodes are hosting cores from all shards, the
-    // distributed query formed by this node will select cores from the
-    // local shards only
-    QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
-
-    Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
-    assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
-
-    Map<String, String> replicaTypeMap = new HashMap<>();
-    DocCollection collection = getCollectionState(collectionName);
-    for (Slice slice : collection.getSlices()) {
-      for (Replica replica : slice.getReplicas()) {
-        String coreUrl = replica.getCoreUrl();
-        // It seems replica reports its core URL with a trailing slash while shard
-        // info returned from the query doesn't. Oh well.
-        if (coreUrl.endsWith("/")) {
-          coreUrl = coreUrl.substring(0, coreUrl.length() - 1);
-        }
-        replicaTypeMap.put(coreUrl, replica.getType().toString());
-      }
-    }
-
-    // Iterate over shards-info and check that replicas of correct type responded
-    SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
-    Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
-    List<String> shardAddresses = new ArrayList<String>();
-    while (itr.hasNext()) {
-      Map.Entry<String, ?> e = itr.next();
-      assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
-      String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
-      assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress);
-      assertTrue(replicaTypeMap.containsKey(shardAddress));
-      assertTrue(preferredTypes.indexOf(replicaTypeMap.get(shardAddress)) == 0);
-      shardAddresses.add(shardAddress);
-    }
-    assertTrue("No responses", shardAddresses.size() > 0);
-    log.info("Shards giving the response: " + Arrays.toString(shardAddresses.toArray()));
-  }
-
-}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
index 92c5c62..f3920b0 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
@@ -108,7 +108,7 @@ public class CloudSolrClientCacheTest extends SolrTestCaseJ4 {
   private LBHttpSolrClient getMockLbHttpSolrClient(Map<String, Function> responses) throws Exception {
     LBHttpSolrClient mockLbclient = mock(LBHttpSolrClient.class);
 
-    when(mockLbclient.request(any(LBSolrClient.Req.class))).then(invocationOnMock -> {
+    when(mockLbclient.request(any(LBHttpSolrClient.Req.class))).then(invocationOnMock -> {
       LBHttpSolrClient.Req req = invocationOnMock.getArgument(0);
       Function f = responses.get("request");
       if (f == null) return null;
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
index 7b08c73..748721d 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
@@ -86,7 +86,6 @@ import org.apache.lucene.util.TestUtil;
 import org.apache.solr.client.solrj.ResponseParser;
 import org.apache.solr.client.solrj.embedded.JettyConfig;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
-import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.ClusterStateProvider;
 import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
@@ -2279,61 +2278,6 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
   }
 
   /**
-   * A variant of {@link  org.apache.solr.client.solrj.impl.CloudHttp2SolrClient.Builder} that will randomize
-   * some internal settings.
-   */
-  public static class CloudHttp2SolrClientBuilder extends CloudHttp2SolrClient.Builder {
-
-    public CloudHttp2SolrClientBuilder(List<String> zkHosts, Optional<String> zkChroot) {
-      super(zkHosts, zkChroot);
-      randomizeCloudSolrClient();
-    }
-
-    public CloudHttp2SolrClientBuilder(ClusterStateProvider stateProvider) {
-      super(new ArrayList<>());
-      this.stateProvider = stateProvider;
-      randomizeCloudSolrClient();
-    }
-
-    public CloudHttp2SolrClientBuilder(MiniSolrCloudCluster cluster) {
-      super(new ArrayList<>());
-      if (random().nextBoolean()) {
-        this.zkHosts.add(cluster.getZkServer().getZkAddress());
-      } else {
-        populateSolrUrls(cluster);
-      }
-
-      randomizeCloudSolrClient();
-    }
-
-    private void populateSolrUrls(MiniSolrCloudCluster cluster) {
-      if (random().nextBoolean()) {
-        final List<JettySolrRunner> solrNodes = cluster.getJettySolrRunners();
-        for (JettySolrRunner node : solrNodes) {
-          this.solrUrls.add(node.getBaseUrl().toString());
-        }
-      } else {
-        this.solrUrls.add(cluster.getRandomJetty(random()).getBaseUrl().toString());
-      }
-    }
-
-    private void randomizeCloudSolrClient() {
-      this.directUpdatesToLeadersOnly = random().nextBoolean();
-      this.shardLeadersOnly = random().nextBoolean();
-      this.parallelUpdates = random().nextBoolean();
-    }
-  }
-
-  /**
-   * This method <i>may</i> randomize unspecified aspects of the resulting SolrClient.
-   * Tests that do not wish to have any randomized behavior should use the
-   * {@link org.apache.solr.client.solrj.impl.CloudHttp2SolrClient.Builder} class directly
-   */
-  public static CloudHttp2SolrClient getCloudHttp2SolrClient(MiniSolrCloudCluster cluster) {
-    return new CloudHttp2SolrClientBuilder(cluster).build();
-  }
-
-  /**
    * A variant of {@link  org.apache.solr.client.solrj.impl.CloudSolrClient.Builder} that will randomize
    * some internal settings.
    */