You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ro...@apache.org on 2014/12/31 15:05:50 UTC

svn commit: r1648697 [7/13] - in /lucene/dev/trunk/solr: ./ contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/ contrib/map-reduce/src/java/org/apache/solr/hadoop...

Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java?rev=1648697&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java Wed Dec 31 14:05:48 2014
@@ -0,0 +1,1156 @@
+package org.apache.solr.client.solrj.impl;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.http.NoHttpResponseException;
+import org.apache.http.client.HttpClient;
+import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.solr.client.solrj.ResponseParser;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.IsUpdateRequest;
+import org.apache.solr.client.solrj.request.RequestWriter;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.util.ClientUtils;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.UpdateParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * SolrJ client class to communicate with SolrCloud.
+ * Instances of this class communicate with Zookeeper to discover
+ * Solr endpoints for SolrCloud collections, and then use the 
+ * {@link LBHttpSolrClient} to issue requests.
+ * 
+ * This class assumes the id field for your documents is called
+ * 'id' - if this is not the case, you must set the right name
+ * with {@link #setIdField(String)}.
+ */
+@SuppressWarnings("serial")
+public class CloudSolrClient extends SolrClient {
+  protected static final Logger log = LoggerFactory.getLogger(CloudSolrClient.class);
+
+  private volatile ZkStateReader zkStateReader;
+  private String zkHost; // the zk server connect string
+  private int zkConnectTimeout = 10000;
+  private int zkClientTimeout = 10000;
+  private volatile String defaultCollection;
+  private final LBHttpSolrClient lbClient;
+  private final boolean shutdownLBHttpSolrServer;
+  private HttpClient myClient;
+  private final boolean clientIsInternal;
+  //no of times collection state to be reloaded if stale state error is received
+  private static final int MAX_STALE_RETRIES = 5;
+  Random rand = new Random();
+  
+  private final boolean updatesToLeaders;
+  private boolean parallelUpdates = true;
+  private ExecutorService threadPool = Executors
+      .newCachedThreadPool(new SolrjNamedThreadFactory(
+          "CloudSolrServer ThreadPool"));
+  private String idField = "id";
+  public static final String STATE_VERSION = "_stateVer_";
+  private final Set<String> NON_ROUTABLE_PARAMS;
+  {
+    NON_ROUTABLE_PARAMS = new HashSet<>();
+    NON_ROUTABLE_PARAMS.add(UpdateParams.EXPUNGE_DELETES);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.MAX_OPTIMIZE_SEGMENTS);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.COMMIT);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.WAIT_SEARCHER);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.OPEN_SEARCHER);
+    
+    NON_ROUTABLE_PARAMS.add(UpdateParams.SOFT_COMMIT);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.PREPARE_COMMIT);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.OPTIMIZE);
+    
+    // Not supported via SolrCloud
+    // NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
+
+  }
+  private volatile long timeToLive = 60* 1000L;
+
+
+  protected Map<String, ExpiringCachedDocCollection> collectionStateCache = new ConcurrentHashMap<String, ExpiringCachedDocCollection>(){
+    @Override
+    public ExpiringCachedDocCollection get(Object key) {
+      ExpiringCachedDocCollection val = super.get(key);
+      if(val == null) return null;
+      if(val.isExpired(timeToLive)) {
+        super.remove(key);
+        return null;
+      }
+      return val;
+    }
+
+  };
+
+  class ExpiringCachedDocCollection {
+    DocCollection cached;
+    long cachedAt;
+
+    ExpiringCachedDocCollection(DocCollection cached) {
+      this.cached = cached;
+      this.cachedAt = System.currentTimeMillis();
+    }
+
+    boolean isExpired(long timeToLive) {
+      return (System.currentTimeMillis() - cachedAt) > timeToLive;
+    }
+  }
+
+  /**
+   * Create a new client object that connects to Zookeeper and is always aware
+   * of the SolrCloud state. If there is a fully redundant Zookeeper quorum and
+   * SolrCloud has enough replicas for every shard in a collection, there is no
+   * single point of failure. Updates will be sent to shard leaders by default.
+   * 
+   * @param zkHost
+   *          The client endpoint of the zookeeper quorum containing the cloud
+   *          state. The full specification for this string is one or more comma
+   *          separated HOST:PORT values, followed by an optional chroot value
+   *          that starts with a forward slash. Using a chroot allows multiple
+   *          applications to coexist in one ensemble. For full details, see the
+   *          Zookeeper documentation. Some examples:
+   *          <p/>
+   *          "host1:2181"
+   *          <p/>
+   *          "host1:2181,host2:2181,host3:2181/mysolrchroot"
+   *          <p/>
+   *          "zoo1.example.com:2181,zoo2.example.com:2181,zoo3.example.com:2181"
+   */
+  public CloudSolrClient(String zkHost) {
+      this.zkHost = zkHost;
+      this.clientIsInternal = true;
+      this.myClient = HttpClientUtil.createClient(null);
+      this.lbClient = new LBHttpSolrClient(myClient);
+      this.lbClient.setRequestWriter(new BinaryRequestWriter());
+      this.lbClient.setParser(new BinaryResponseParser());
+      this.updatesToLeaders = true;
+      shutdownLBHttpSolrServer = true;
+      lbClient.addQueryParams(STATE_VERSION);
+  }
+
+  /**
+   * Create a new client object that connects to Zookeeper and is always aware
+   * of the SolrCloud state. If there is a fully redundant Zookeeper quorum and
+   * SolrCloud has enough replicas for every shard in a collection, there is no
+   * single point of failure. Updates will be sent to shard leaders by default.
+   *
+   * @param zkHost
+   *          The client endpoint of the zookeeper quorum containing the cloud
+   *          state. The full specification for this string is one or more comma
+   *          separated HOST:PORT values, followed by an optional chroot value
+   *          that starts with a forward slash. Using a chroot allows multiple
+   *          applications to coexist in one ensemble. For full details, see the
+   *          Zookeeper documentation. Some examples:
+   *          <p/>
+   *          "host1:2181"
+   *          <p/>
+   *          "host1:2181,host2:2181,host3:2181/mysolrchroot"
+   *          <p/>
+   *          "zoo1.example.com:2181,zoo2.example.com:2181,zoo3.example.com:2181"
+   * @param httpClient
+   *          the {@link HttpClient} instance to be used for all requests. The
+   *          provided httpClient should use a multi-threaded connection manager.
+   */
+  public CloudSolrClient(String zkHost, HttpClient httpClient)  {
+    this.zkHost = zkHost;
+    this.clientIsInternal = httpClient == null;
+    this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
+    this.lbClient = new LBHttpSolrClient(myClient);
+    this.lbClient.setRequestWriter(new BinaryRequestWriter());
+    this.lbClient.setParser(new BinaryResponseParser());
+    this.updatesToLeaders = true;
+    shutdownLBHttpSolrServer = true;
+    lbClient.addQueryParams(STATE_VERSION);
+  }
+  
+  /**
+   * Create a new client object using multiple string values in a Collection
+   * instead of a standard zkHost connection string. Note that this method will
+   * not be used if there is only one String argument - that will use
+   * {@link #CloudSolrClient(String)} instead.
+   * 
+   * @param zkHosts
+   *          A Java Collection (List, Set, etc) of HOST:PORT strings, one for
+   *          each host in the zookeeper ensemble. Note that with certain
+   *          Collection types like HashSet, the order of hosts in the final
+   *          connect string may not be in the same order you added them.
+   * @param chroot
+   *          A chroot value for zookeeper, starting with a forward slash. If no
+   *          chroot is required, use null.
+   * @throws IllegalArgumentException
+   *           if the chroot value does not start with a forward slash.
+   * @see #CloudSolrClient(String)
+   */
+  public CloudSolrClient(Collection<String> zkHosts, String chroot) {
+    this(zkHosts, chroot, null);
+  }
+
+  /**
+   * Create a new client object using multiple string values in a Collection
+   * instead of a standard zkHost connection string. Note that this method will
+   * not be used if there is only one String argument - that will use
+   * {@link #CloudSolrClient(String)} instead.
+   *
+   * @param zkHosts
+   *          A Java Collection (List, Set, etc) of HOST:PORT strings, one for
+   *          each host in the zookeeper ensemble. Note that with certain
+   *          Collection types like HashSet, the order of hosts in the final
+   *          connect string may not be in the same order you added them.
+   * @param chroot
+   *          A chroot value for zookeeper, starting with a forward slash. If no
+   *          chroot is required, use null.
+   * @param httpClient
+   *          the {@link HttpClient} instance to be used for all requests. The provided httpClient should use a
+   *          multi-threaded connection manager.
+   * @throws IllegalArgumentException
+   *           if the chroot value does not start with a forward slash.
+   * @see #CloudSolrClient(String)
+   */
+  public CloudSolrClient(Collection<String> zkHosts, String chroot, HttpClient httpClient) {
+    StringBuilder zkBuilder = new StringBuilder();
+    int lastIndexValue = zkHosts.size() - 1;
+    int i = 0;
+    for (String zkHost : zkHosts) {
+      zkBuilder.append(zkHost);
+      if (i < lastIndexValue) {
+        zkBuilder.append(",");
+      }
+      i++;
+    }
+    if (chroot != null) {
+      if (chroot.startsWith("/")) {
+        zkBuilder.append(chroot);
+      } else {
+        throw new IllegalArgumentException(
+            "The chroot must start with a forward slash.");
+      }
+    }
+
+    /* Log the constructed connection string and then initialize. */
+    log.info("Final constructed zkHost string: " + zkBuilder.toString());
+
+    this.zkHost = zkBuilder.toString();
+    this.clientIsInternal = httpClient == null;
+    this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
+    this.lbClient = new LBHttpSolrClient(myClient);
+    this.lbClient.setRequestWriter(new BinaryRequestWriter());
+    this.lbClient.setParser(new BinaryResponseParser());
+    this.updatesToLeaders = true;
+    shutdownLBHttpSolrServer = true;
+  }
+  
+  /**
+   * @param zkHost
+   *          A zookeeper client endpoint.
+   * @param updatesToLeaders
+   *          If true, sends updates only to shard leaders.
+   * @see #CloudSolrClient(String) for full description and details on zkHost
+   */
+  public CloudSolrClient(String zkHost, boolean updatesToLeaders) {
+    this(zkHost, updatesToLeaders, null);
+  }
+
+  /**
+   * @param zkHost
+   *          A zookeeper client endpoint.
+   * @param updatesToLeaders
+   *          If true, sends updates only to shard leaders.
+   * @param httpClient
+   *          the {@link HttpClient} instance to be used for all requests. The provided httpClient should use a
+   *          multi-threaded connection manager.
+   * @see #CloudSolrClient(String) for full description and details on zkHost
+   */
+  public CloudSolrClient(String zkHost, boolean updatesToLeaders, HttpClient httpClient) {
+    this.zkHost = zkHost;
+    this.clientIsInternal = httpClient == null;
+    this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
+    this.lbClient = new LBHttpSolrClient(myClient);
+    this.lbClient.setRequestWriter(new BinaryRequestWriter());
+    this.lbClient.setParser(new BinaryResponseParser());
+    this.updatesToLeaders = updatesToLeaders;
+    shutdownLBHttpSolrServer = true;
+    lbClient.addQueryParams(STATE_VERSION);
+  }
+
+  /**Sets the cache ttl for DocCollection Objects cached  . This is only applicable for collections which are persisted outside of clusterstate.json
+   * @param seconds ttl value in seconds
+   */
+  public void setCollectionCacheTTl(int seconds){
+    assert seconds > 0;
+    timeToLive = seconds*1000L;
+  }
+
+  /**
+   * @param zkHost
+   *          A zookeeper client endpoint.
+   * @param lbClient
+   *          LBHttpSolrServer instance for requests.
+   * @see #CloudSolrClient(String) for full description and details on zkHost
+   */
+  public CloudSolrClient(String zkHost, LBHttpSolrClient lbClient) {
+    this(zkHost, lbClient, true);
+  }
+  
+  /**
+   * @param zkHost
+   *          A zookeeper client endpoint.
+   * @param lbClient
+   *          LBHttpSolrServer instance for requests.
+   * @param updatesToLeaders
+   *          If true, sends updates only to shard leaders.
+   * @see #CloudSolrClient(String) for full description and details on zkHost
+   */
+  public CloudSolrClient(String zkHost, LBHttpSolrClient lbClient, boolean updatesToLeaders) {
+    this.zkHost = zkHost;
+    this.lbClient = lbClient;
+    this.updatesToLeaders = updatesToLeaders;
+    shutdownLBHttpSolrServer = false;
+    this.clientIsInternal = false;
+    lbClient.addQueryParams(STATE_VERSION);
+  }
+  
+  public ResponseParser getParser() {
+    return lbClient.getParser();
+  }
+  
+  /**
+   * Note: This setter method is <b>not thread-safe</b>.
+   * 
+   * @param processor
+   *          Default Response Parser chosen to parse the response if the parser
+   *          were not specified as part of the request.
+   * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser()
+   */
+  public void setParser(ResponseParser processor) {
+    lbClient.setParser(processor);
+  }
+  
+  public RequestWriter getRequestWriter() {
+    return lbClient.getRequestWriter();
+  }
+  
+  public void setRequestWriter(RequestWriter requestWriter) {
+    lbClient.setRequestWriter(requestWriter);
+  }
+
+  /**
+   * @return the zkHost value used to connect to zookeeper.
+   */
+  public String getZkHost() {
+    return zkHost;
+  }
+
+  public ZkStateReader getZkStateReader() {
+    return zkStateReader;
+  }
+
+  /**
+   * @param idField the field to route documents on.
+   */
+  public void setIdField(String idField) {
+    this.idField = idField;
+  }
+
+  /**
+   * @return the field that updates are routed on.
+   */
+  public String getIdField() {
+    return idField;
+  }
+  
+  /** Sets the default collection for request */
+  public void setDefaultCollection(String collection) {
+    this.defaultCollection = collection;
+  }
+
+  /** Gets the default collection for request */
+  public String getDefaultCollection() {
+    return defaultCollection;
+  }
+
+  /** Set the connect timeout to the zookeeper ensemble in ms */
+  public void setZkConnectTimeout(int zkConnectTimeout) {
+    this.zkConnectTimeout = zkConnectTimeout;
+  }
+
+  /** Set the timeout to the zookeeper ensemble in ms */
+  public void setZkClientTimeout(int zkClientTimeout) {
+    this.zkClientTimeout = zkClientTimeout;
+  }
+
+  /**
+   * Connect to the zookeeper ensemble.
+   * This is an optional method that may be used to force a connect before any other requests are sent.
+   *
+   */
+  public void connect() {
+    if (zkStateReader == null) {
+      synchronized (this) {
+        if (zkStateReader == null) {
+          ZkStateReader zk = null;
+          try {
+            zk = new ZkStateReader(zkHost, zkClientTimeout,
+                zkConnectTimeout);
+            zk.createClusterStateWatchersAndUpdate();
+            zkStateReader = zk;
+          } catch (InterruptedException e) {
+            if (zk != null) zk.close();
+            Thread.currentThread().interrupt();
+            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                "", e);
+          } catch (KeeperException e) {
+            if (zk != null) zk.close();
+            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                "", e);
+          } catch (IOException e) {
+            if (zk != null) zk.close();
+            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                "", e);
+          } catch (TimeoutException e) {
+            if (zk != null) zk.close();
+            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                "", e);
+          } catch (Exception e) {
+            if (zk != null) zk.close();
+            // do not wrap because clients may be relying on the underlying exception being thrown
+            throw e;
+          }
+        }
+      }
+    }
+  }
+
+  public void setParallelUpdates(boolean parallelUpdates) {
+    this.parallelUpdates = parallelUpdates;
+  }
+
+  private NamedList<Object> directUpdate(AbstractUpdateRequest request, ClusterState clusterState) throws SolrServerException {
+    UpdateRequest updateRequest = (UpdateRequest) request;
+    ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
+    ModifiableSolrParams routableParams = new ModifiableSolrParams();
+    ModifiableSolrParams nonRoutableParams = new ModifiableSolrParams();
+
+    if(params != null) {
+      nonRoutableParams.add(params);
+      routableParams.add(params);
+      for(String param : NON_ROUTABLE_PARAMS) {
+        routableParams.remove(param);
+      }
+    }
+
+    String collection = nonRoutableParams.get(UpdateParams.COLLECTION, defaultCollection);
+    if (collection == null) {
+      throw new SolrServerException("No collection param specified on request and no default collection has been set.");
+    }
+
+
+    //Check to see if the collection is an alias.
+    Aliases aliases = zkStateReader.getAliases();
+    if(aliases != null) {
+      Map<String, String> collectionAliases = aliases.getCollectionAliasMap();
+      if(collectionAliases != null && collectionAliases.containsKey(collection)) {
+        collection = collectionAliases.get(collection);
+      }
+    }
+
+    DocCollection col = getDocCollection(clusterState, collection);
+
+    DocRouter router = col.getRouter();
+    
+    if (router instanceof ImplicitDocRouter) {
+      // short circuit as optimization
+      return null;
+    }
+
+    //Create the URL map, which is keyed on slice name.
+    //The value is a list of URLs for each replica in the slice.
+    //The first value in the list is the leader for the slice.
+    Map<String,List<String>> urlMap = buildUrlMap(col);
+    if (urlMap == null) {
+      // we could not find a leader yet - use unoptimized general path
+      return null;
+    }
+
+    NamedList<Throwable> exceptions = new NamedList<>();
+    NamedList<NamedList> shardResponses = new NamedList<>();
+
+    Map<String, LBHttpSolrClient.Req> routes = updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField);
+    if (routes == null) {
+      return null;
+    }
+
+    long start = System.nanoTime();
+
+    if (parallelUpdates) {
+      final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<>(routes.size());
+      for (final Map.Entry<String, LBHttpSolrClient.Req> entry : routes.entrySet()) {
+        final String url = entry.getKey();
+        final LBHttpSolrClient.Req lbRequest = entry.getValue();
+        responseFutures.put(url, threadPool.submit(new Callable<NamedList<?>>() {
+          @Override
+          public NamedList<?> call() throws Exception {
+            return lbClient.request(lbRequest).getResponse();
+          }
+        }));
+      }
+
+      for (final Map.Entry<String, Future<NamedList<?>>> entry: responseFutures.entrySet()) {
+        final String url = entry.getKey();
+        final Future<NamedList<?>> responseFuture = entry.getValue();
+        try {
+          shardResponses.add(url, responseFuture.get());
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException(e);
+        } catch (ExecutionException e) {
+          exceptions.add(url, e.getCause());
+        }
+      }
+
+      if (exceptions.size() > 0) {
+        throw new RouteException(ErrorCode.SERVER_ERROR, exceptions, routes);
+      }
+    } else {
+      for (Map.Entry<String, LBHttpSolrClient.Req> entry : routes.entrySet()) {
+        String url = entry.getKey();
+        LBHttpSolrClient.Req lbRequest = entry.getValue();
+        try {
+          NamedList<Object> rsp = lbClient.request(lbRequest).getResponse();
+          shardResponses.add(url, rsp);
+        } catch (Exception e) {
+          throw new SolrServerException(e);
+        }
+      }
+    }
+
+    UpdateRequest nonRoutableRequest = null;
+    List<String> deleteQuery = updateRequest.getDeleteQuery();
+    if (deleteQuery != null && deleteQuery.size() > 0) {
+      UpdateRequest deleteQueryRequest = new UpdateRequest();
+      deleteQueryRequest.setDeleteQuery(deleteQuery);
+      nonRoutableRequest = deleteQueryRequest;
+    }
+    
+    Set<String> paramNames = nonRoutableParams.getParameterNames();
+    
+    Set<String> intersection = new HashSet<>(paramNames);
+    intersection.retainAll(NON_ROUTABLE_PARAMS);
+    
+    if (nonRoutableRequest != null || intersection.size() > 0) {
+      if (nonRoutableRequest == null) {
+        nonRoutableRequest = new UpdateRequest();
+      }
+      nonRoutableRequest.setParams(nonRoutableParams);
+      List<String> urlList = new ArrayList<>();
+      urlList.addAll(routes.keySet());
+      Collections.shuffle(urlList, rand);
+      LBHttpSolrClient.Req req = new LBHttpSolrClient.Req(nonRoutableRequest, urlList);
+      try {
+        LBHttpSolrClient.Rsp rsp = lbClient.request(req);
+        shardResponses.add(urlList.get(0), rsp.getResponse());
+      } catch (Exception e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, urlList.get(0), e);
+      }
+    }
+
+    long end = System.nanoTime();
+
+    RouteResponse rr =  condenseResponse(shardResponses, (long)((end - start)/1000000));
+    rr.setRouteResponses(shardResponses);
+    rr.setRoutes(routes);
+    return rr;
+  }
+
+  private Map<String,List<String>> buildUrlMap(DocCollection col) {
+    Map<String, List<String>> urlMap = new HashMap<>();
+    Collection<Slice> slices = col.getActiveSlices();
+    Iterator<Slice> sliceIterator = slices.iterator();
+    while (sliceIterator.hasNext()) {
+      Slice slice = sliceIterator.next();
+      String name = slice.getName();
+      List<String> urls = new ArrayList<>();
+      Replica leader = slice.getLeader();
+      if (leader == null) {
+        // take unoptimized general path - we cannot find a leader yet
+        return null;
+      }
+      ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
+      String url = zkProps.getCoreUrl();
+      urls.add(url);
+      Collection<Replica> replicas = slice.getReplicas();
+      Iterator<Replica> replicaIterator = replicas.iterator();
+      while (replicaIterator.hasNext()) {
+        Replica replica = replicaIterator.next();
+        if (!replica.getNodeName().equals(leader.getNodeName()) &&
+            !replica.getName().equals(leader.getName())) {
+          ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
+          String url1 = zkProps1.getCoreUrl();
+          urls.add(url1);
+        }
+      }
+      urlMap.put(name, urls);
+    }
+    return urlMap;
+  }
+
+  public RouteResponse condenseResponse(NamedList response, long timeMillis) {
+    RouteResponse condensed = new RouteResponse();
+    int status = 0;
+    Integer rf = null;
+    Integer minRf = null;
+    for(int i=0; i<response.size(); i++) {
+      NamedList shardResponse = (NamedList)response.getVal(i);
+      NamedList header = (NamedList)shardResponse.get("responseHeader");      
+      Integer shardStatus = (Integer)header.get("status");
+      int s = shardStatus.intValue();
+      if(s > 0) {
+          status = s;
+      }
+      Object rfObj = header.get(UpdateRequest.REPFACT);
+      if (rfObj != null && rfObj instanceof Integer) {
+        Integer routeRf = (Integer)rfObj;
+        if (rf == null || routeRf < rf)
+          rf = routeRf;
+      }
+      minRf = (Integer)header.get(UpdateRequest.MIN_REPFACT);
+    }
+
+    NamedList cheader = new NamedList();
+    cheader.add("status", status);
+    cheader.add("QTime", timeMillis);
+    if (rf != null)
+      cheader.add(UpdateRequest.REPFACT, rf);
+    if (minRf != null)
+      cheader.add(UpdateRequest.MIN_REPFACT, minRf);
+    
+    condensed.add("responseHeader", cheader);
+    return condensed;
+  }
+
+  public static class RouteResponse extends NamedList {
+    private NamedList routeResponses;
+    private Map<String, LBHttpSolrClient.Req> routes;
+
+    public void setRouteResponses(NamedList routeResponses) {
+      this.routeResponses = routeResponses;
+    }
+
+    public NamedList getRouteResponses() {
+      return routeResponses;
+    }
+
+    public void setRoutes(Map<String, LBHttpSolrClient.Req> routes) {
+      this.routes = routes;
+    }
+
+    public Map<String, LBHttpSolrClient.Req> getRoutes() {
+      return routes;
+    }
+
+  }
+
+  public static class RouteException extends SolrException {
+
+    private NamedList<Throwable> throwables;
+    private Map<String, LBHttpSolrClient.Req> routes;
+
+    public RouteException(ErrorCode errorCode, NamedList<Throwable> throwables, Map<String, LBHttpSolrClient.Req> routes){
+      super(errorCode, throwables.getVal(0).getMessage(), throwables.getVal(0));
+      this.throwables = throwables;
+      this.routes = routes;
+    }
+
+    public NamedList<Throwable> getThrowables() {
+      return throwables;
+    }
+
+    public Map<String, LBHttpSolrClient.Req> getRoutes() {
+      return this.routes;
+    }
+  }
+
+  @Override
+  public NamedList<Object> request(SolrRequest request) throws SolrServerException, IOException {
+    SolrParams reqParams = request.getParams();
+    String collection = (reqParams != null) ? reqParams.get("collection", getDefaultCollection()) : getDefaultCollection();
+    return requestWithRetryOnStaleState(request, 0, collection);
+  }
+
+  /**
+   * As this class doesn't watch external collections on the client side,
+   * there's a chance that the request will fail due to cached stale state,
+   * which means the state must be refreshed from ZK and retried.
+   */
+  protected NamedList<Object> requestWithRetryOnStaleState(SolrRequest request, int retryCount, String collection)
+      throws SolrServerException, IOException {
+
+    connect(); // important to call this before you start working with the ZkStateReader
+
+    // build up a _stateVer_ param to pass to the server containing all of the
+    // external collection state versions involved in this request, which allows
+    // the server to notify us that our cached state for one or more of the external
+    // collections is stale and needs to be refreshed ... this code has no impact on internal collections
+    String stateVerParam = null;
+    List<DocCollection> requestedCollections = null;
+    if (collection != null && !request.getPath().startsWith("/admin")) { // don't do _stateVer_ checking for admin requests
+      Set<String> requestedCollectionNames = getCollectionList(getZkStateReader().getClusterState(), collection);
+
+      StringBuilder stateVerParamBuilder = null;
+      for (String requestedCollection : requestedCollectionNames) {
+        // track the version of state we're using on the client side using the _stateVer_ param
+        DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection);
+        int collVer = coll.getZNodeVersion();
+        if (coll.getStateFormat()>1) {
+          if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
+          requestedCollections.add(coll);
+
+          if (stateVerParamBuilder == null) {
+            stateVerParamBuilder = new StringBuilder();
+          } else {
+            stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
+          }
+
+          stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
+        }
+      }
+
+      if (stateVerParamBuilder != null) {
+        stateVerParam = stateVerParamBuilder.toString();
+      }
+    }
+
+    if (request.getParams() instanceof ModifiableSolrParams) {
+      ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
+      if (stateVerParam != null) {
+        params.set(STATE_VERSION, stateVerParam);
+      } else {
+        params.remove(STATE_VERSION);
+      }
+    } // else: ??? how to set this ???
+
+    NamedList<Object> resp = null;
+    try {
+      resp = sendRequest(request);
+    } catch (Exception exc) {
+
+      Throwable rootCause = SolrException.getRootCause(exc);
+      // don't do retry support for admin requests or if the request doesn't have a collection specified
+      if (collection == null || request.getPath().startsWith("/admin")) {
+        if (exc instanceof SolrServerException) {
+          throw (SolrServerException)exc;
+        } else if (exc instanceof IOException) {
+          throw (IOException)exc;
+        }else if (exc instanceof RuntimeException) {
+          throw (RuntimeException) exc;
+        }
+        else {
+          throw new SolrServerException(rootCause);
+        }
+      }
+
+      int errorCode = (rootCause instanceof SolrException) ?
+          ((SolrException)rootCause).code() : SolrException.ErrorCode.UNKNOWN.code;
+
+      log.error("Request to collection {} failed due to ("+errorCode+
+          ") {}, retry? "+retryCount, collection, rootCause.toString());
+
+      boolean wasCommError =
+          (rootCause instanceof ConnectException ||
+              rootCause instanceof ConnectTimeoutException ||
+              rootCause instanceof NoHttpResponseException ||
+              rootCause instanceof SocketException);
+
+      boolean stateWasStale = false;
+      if (retryCount < MAX_STALE_RETRIES  &&
+          requestedCollections != null    &&
+          !requestedCollections.isEmpty() &&
+          SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE)
+      {
+        // cached state for one or more external collections was stale
+        // re-issue request using updated state
+        stateWasStale = true;
+
+        // just re-read state for all of them, which is a little heavy handed but hopefully a rare occurrence
+        for (DocCollection ext : requestedCollections) {
+          collectionStateCache.remove(ext.getName());
+        }
+      }
+
+      // if we experienced a communication error, it's worth checking the state
+      // with ZK just to make sure the node we're trying to hit is still part of the collection
+      if (retryCount < MAX_STALE_RETRIES &&
+          !stateWasStale &&
+          requestedCollections != null &&
+          !requestedCollections.isEmpty() &&
+          wasCommError) {
+        for (DocCollection ext : requestedCollections) {
+          DocCollection latestStateFromZk = getDocCollection(zkStateReader.getClusterState(), ext.getName());
+          if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
+            // looks like we couldn't reach the server because the state was stale == retry
+            stateWasStale = true;
+            // we just pulled state from ZK, so update the cache so that the retry uses it
+            collectionStateCache.put(ext.getName(), new ExpiringCachedDocCollection(latestStateFromZk));
+          }
+        }
+      }
+
+      if (requestedCollections != null) {
+        requestedCollections.clear(); // done with this
+      }
+
+      // if the state was stale, then we retry the request once with new state pulled from Zk
+      if (stateWasStale) {
+        log.warn("Re-trying request to  collection(s) "+collection+" after stale state error from server.");
+        resp = requestWithRetryOnStaleState(request, retryCount+1, collection);
+      } else {
+        if (exc instanceof SolrServerException) {
+          throw (SolrServerException)exc;
+        } else if (exc instanceof IOException) {
+          throw (IOException)exc;
+        } else {
+          throw new SolrServerException(rootCause);
+        }
+      }
+    }
+
+    return resp;
+  }
+
+  protected NamedList<Object> sendRequest(SolrRequest request)
+      throws SolrServerException, IOException {
+    connect();
+    
+    ClusterState clusterState = zkStateReader.getClusterState();
+    
+    boolean sendToLeaders = false;
+    List<String> replicas = null;
+    
+    if (request instanceof IsUpdateRequest) {
+      if (request instanceof UpdateRequest) {
+        NamedList<Object> response = directUpdate((AbstractUpdateRequest) request,
+            clusterState);
+        if (response != null) {
+          return response;
+        }
+      }
+      sendToLeaders = true;
+      replicas = new ArrayList<>();
+    }
+    
+    SolrParams reqParams = request.getParams();
+    if (reqParams == null) {
+      reqParams = new ModifiableSolrParams();
+    }
+    List<String> theUrlList = new ArrayList<>();
+    if (request.getPath().equals("/admin/collections")
+        || request.getPath().equals("/admin/cores")) {
+      Set<String> liveNodes = clusterState.getLiveNodes();
+      for (String liveNode : liveNodes) {
+        theUrlList.add(zkStateReader.getBaseUrlForNodeName(liveNode));
+      }
+    } else {
+      String collection = reqParams.get(UpdateParams.COLLECTION, defaultCollection);
+      
+      if (collection == null) {
+        throw new SolrServerException(
+            "No collection param specified on request and no default collection has been set.");
+      }
+      
+      Set<String> collectionsList = getCollectionList(clusterState, collection);
+      if (collectionsList.size() == 0) {
+        throw new SolrException(ErrorCode.BAD_REQUEST,
+            "Could not find collection: " + collection);
+      }
+
+      String shardKeys =  reqParams.get(ShardParams._ROUTE_);
+      if(shardKeys == null) {
+        shardKeys = reqParams.get(ShardParams.SHARD_KEYS); // deprecated
+      }
+
+      // TODO: not a big deal because of the caching, but we could avoid looking
+      // at every shard
+      // when getting leaders if we tweaked some things
+      
+      // Retrieve slices from the cloud state and, for each collection
+      // specified,
+      // add it to the Map of slices.
+      Map<String,Slice> slices = new HashMap<>();
+      for (String collectionName : collectionsList) {
+        DocCollection col = getDocCollection(clusterState, collectionName);
+        Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col);
+        ClientUtils.addSlices(slices, collectionName, routeSlices, true);
+      }
+      Set<String> liveNodes = clusterState.getLiveNodes();
+
+      List<String> leaderUrlList = null;
+      List<String> urlList = null;
+      List<String> replicasList = null;
+      
+      // build a map of unique nodes
+      // TODO: allow filtering by group, role, etc
+      Map<String,ZkNodeProps> nodes = new HashMap<>();
+      List<String> urlList2 = new ArrayList<>();
+      for (Slice slice : slices.values()) {
+        for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) {
+          ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
+          String node = coreNodeProps.getNodeName();
+          if (!liveNodes.contains(coreNodeProps.getNodeName())
+              || !coreNodeProps.getState().equals(ZkStateReader.ACTIVE)) continue;
+          if (nodes.put(node, nodeProps) == null) {
+            if (!sendToLeaders || (sendToLeaders && coreNodeProps.isLeader())) {
+              String url;
+              if (reqParams.get(UpdateParams.COLLECTION) == null) {
+                url = ZkCoreNodeProps.getCoreUrl(
+                    nodeProps.getStr(ZkStateReader.BASE_URL_PROP),
+                    defaultCollection);
+              } else {
+                url = coreNodeProps.getCoreUrl();
+              }
+              urlList2.add(url);
+            } else if (sendToLeaders) {
+              String url;
+              if (reqParams.get(UpdateParams.COLLECTION) == null) {
+                url = ZkCoreNodeProps.getCoreUrl(
+                    nodeProps.getStr(ZkStateReader.BASE_URL_PROP),
+                    defaultCollection);
+              } else {
+                url = coreNodeProps.getCoreUrl();
+              }
+              replicas.add(url);
+            }
+          }
+        }
+      }
+      
+      if (sendToLeaders) {
+        leaderUrlList = urlList2;
+        replicasList = replicas;
+      } else {
+        urlList = urlList2;
+      }
+      
+      if (sendToLeaders) {
+        theUrlList = new ArrayList<>(leaderUrlList.size());
+        theUrlList.addAll(leaderUrlList);
+      } else {
+        theUrlList = new ArrayList<>(urlList.size());
+        theUrlList.addAll(urlList);
+      }
+      if(theUrlList.isEmpty()) {
+        throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Not enough nodes to handle the request");
+      }
+
+      Collections.shuffle(theUrlList, rand);
+      if (sendToLeaders) {
+        ArrayList<String> theReplicas = new ArrayList<>(
+            replicasList.size());
+        theReplicas.addAll(replicasList);
+        Collections.shuffle(theReplicas, rand);
+        theUrlList.addAll(theReplicas);
+      }
+      
+    }
+    
+    LBHttpSolrClient.Req req = new LBHttpSolrClient.Req(request, theUrlList);
+    LBHttpSolrClient.Rsp rsp = lbClient.request(req);
+    return rsp.getResponse();
+  }
+
+  private Set<String> getCollectionList(ClusterState clusterState,
+      String collection) {
+    // Extract each comma separated collection name and store in a List.
+    List<String> rawCollectionsList = StrUtils.splitSmart(collection, ",", true);
+    Set<String> collectionsList = new HashSet<>();
+    // validate collections
+    for (String collectionName : rawCollectionsList) {
+      if (!clusterState.getCollections().contains(collectionName)) {
+        Aliases aliases = zkStateReader.getAliases();
+        String alias = aliases.getCollectionAlias(collectionName);
+        if (alias != null) {
+          List<String> aliasList = StrUtils.splitSmart(alias, ",", true);
+          collectionsList.addAll(aliasList);
+          continue;
+        }
+
+          throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
+        }
+
+      collectionsList.add(collectionName);
+    }
+    return collectionsList;
+  }
+
+  @Override
+  public void shutdown() {
+    if (zkStateReader != null) {
+      synchronized(this) {
+        if (zkStateReader!= null)
+          zkStateReader.close();
+        zkStateReader = null;
+      }
+    }
+    
+    if (shutdownLBHttpSolrServer) {
+      lbClient.shutdown();
+    }
+    
+    if (clientIsInternal && myClient!=null) {
+      myClient.getConnectionManager().shutdown();
+    }
+
+    if(this.threadPool != null && !this.threadPool.isShutdown()) {
+      this.threadPool.shutdown();
+    }
+  }
+
+  public LBHttpSolrClient getLbClient() {
+    return lbClient;
+  }
+  
+  public boolean isUpdatesToLeaders() {
+    return updatesToLeaders;
+  }
+
+  protected DocCollection getDocCollection(ClusterState clusterState, String collection) throws SolrException {
+    ExpiringCachedDocCollection cachedState = collectionStateCache != null ? collectionStateCache.get(collection) : null;
+    if (cachedState != null && cachedState.cached != null) {
+      return cachedState.cached;
+    }
+
+    DocCollection col = clusterState.getCollectionOrNull(collection);
+    if(col == null ) return  null;
+    if(col.getStateFormat() >1) collectionStateCache.put(collection, new ExpiringCachedDocCollection(col));
+    return col;
+  }
+
+
+  /**
+   * Useful for determining the minimum achieved replication factor across
+   * all shards involved in processing an update request, typically useful
+   * for gauging the replication factor of a batch. 
+   */
+  @SuppressWarnings("rawtypes")
+  public int getMinAchievedReplicationFactor(String collection, NamedList resp) {
+    // it's probably already on the top-level header set by condense
+    NamedList header = (NamedList)resp.get("responseHeader");
+    Integer achRf = (Integer)header.get(UpdateRequest.REPFACT);
+    if (achRf != null)
+      return achRf.intValue();
+
+    // not on the top-level header, walk the shard route tree
+    Map<String,Integer> shardRf = getShardReplicationFactor(collection, resp);
+    for (Integer rf : shardRf.values()) {
+      if (achRf == null || rf < achRf) {
+        achRf = rf;
+      }
+    }    
+    return (achRf != null) ? achRf.intValue() : -1;
+  }
+  
+  /**
+   * Walks the NamedList response after performing an update request looking for
+   * the replication factor that was achieved in each shard involved in the request.
+   * For single doc updates, there will be only one shard in the return value. 
+   */
+  @SuppressWarnings("rawtypes")
+  public Map<String,Integer> getShardReplicationFactor(String collection, NamedList resp) {
+    connect();
+    
+    Map<String,Integer> results = new HashMap<String,Integer>();
+    if (resp instanceof CloudSolrClient.RouteResponse) {
+      NamedList routes = ((CloudSolrClient.RouteResponse)resp).getRouteResponses();
+      ClusterState clusterState = zkStateReader.getClusterState();     
+      Map<String,String> leaders = new HashMap<String,String>();
+      for (Slice slice : clusterState.getActiveSlices(collection)) {
+        Replica leader = slice.getLeader();
+        if (leader != null) {
+          ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
+          String leaderUrl = zkProps.getBaseUrl() + "/" + zkProps.getCoreName();
+          leaders.put(leaderUrl, slice.getName());
+          String altLeaderUrl = zkProps.getBaseUrl() + "/" + collection;
+          leaders.put(altLeaderUrl, slice.getName());
+        }
+      }
+      
+      Iterator<Map.Entry<String,Object>> routeIter = routes.iterator();
+      while (routeIter.hasNext()) {
+        Map.Entry<String,Object> next = routeIter.next();
+        String host = next.getKey();
+        NamedList hostResp = (NamedList)next.getValue();
+        Integer rf = (Integer)((NamedList)hostResp.get("responseHeader")).get(UpdateRequest.REPFACT);
+        if (rf != null) {
+          String shard = leaders.get(host);
+          if (shard == null) {
+            if (host.endsWith("/"))
+              shard = leaders.get(host.substring(0,host.length()-1));
+            if (shard == null) {
+              shard = host;
+            }
+          }
+          results.put(shard, rf);
+        }
+      }
+    }    
+    return results;
+  }
+}