You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2016/11/24 19:23:03 UTC

[2/5] lucene-solr:master: SOLR-9784: Refactor CloudSolrClient to eliminate direct dependency on ZK SOLR-9512: CloudSolrClient's cluster state cache can break direct updates to leaders

SOLR-9784: Refactor CloudSolrClient to eliminate direct dependency on ZK
SOLR-9512: CloudSolrClient's cluster state cache can break direct updates to leaders


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/5650939a
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/5650939a
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/5650939a

Branch: refs/heads/master
Commit: 5650939a8d41b7bad584947a2c9dcedf3774b8de
Parents: d87ffa4
Author: Noble Paul <no...@apache.org>
Authored: Fri Nov 25 00:51:38 2016 +0530
Committer: Noble Paul <no...@apache.org>
Committed: Fri Nov 25 00:51:38 2016 +0530

----------------------------------------------------------------------
 .../solr/client/solrj/impl/CloudSolrClient.java | 313 +++++++++++--------
 1 file changed, 191 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5650939a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index 9bc4529..241e2a1 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.client.solrj.impl;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.net.ConnectException;
@@ -37,6 +38,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -56,7 +58,6 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.ToleratedUpdateError;
-import org.apache.solr.common.cloud.Aliases;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.CollectionStatePredicate;
 import org.apache.solr.common.cloud.CollectionStateWatcher;
@@ -68,7 +69,6 @@ import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.ShardParams;
 import org.apache.solr.common.params.SolrParams;
@@ -79,7 +79,6 @@ import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.common.util.SolrjNamedThreadFactory;
 import org.apache.solr.common.util.StrUtils;
-import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -101,10 +100,7 @@ public class CloudSolrClient extends SolrClient {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private volatile ZkStateReader zkStateReader;
-  private String zkHost; // the zk server connect string
-  private int zkConnectTimeout = 10000;
-  private int zkClientTimeout = 10000;
+  private final ClusterStateProvider stateProvider;
   private volatile String defaultCollection;
   private final LBHttpSolrClient lbClient;
   private final boolean shutdownLBHttpSolrServer;
@@ -122,6 +118,7 @@ public class CloudSolrClient extends SolrClient {
           "CloudSolrClient ThreadPool"));
   private String idField = "id";
   public static final String STATE_VERSION = "_stateVer_";
+  private long retryExpiryTime = TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS);//3 seconds or 3 million nanos
   private final Set<String> NON_ROUTABLE_PARAMS;
   {
     NON_ROUTABLE_PARAMS = new HashSet<>();
@@ -139,12 +136,15 @@ public class CloudSolrClient extends SolrClient {
     // NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
 
   }
-  private volatile long timeToLive = 60* 1000L;
   private volatile List<Object> locks = objectList(3);
 
 
-  protected final Map<String, ExpiringCachedDocCollection> collectionStateCache = new ConcurrentHashMap<String, ExpiringCachedDocCollection>(){
+  static class StateCache extends ConcurrentHashMap<String, ExpiringCachedDocCollection> {
+    final AtomicLong puts = new AtomicLong();
+    final AtomicLong hits = new AtomicLong();
     final Lock evictLock = new ReentrantLock(true);
+    private volatile long timeToLive = 60 * 1000L;
+
     @Override
     public ExpiringCachedDocCollection get(Object key) {
       ExpiringCachedDocCollection val = super.get(key);
@@ -158,9 +158,16 @@ public class CloudSolrClient extends SolrClient {
         super.remove(key);
         return null;
       }
+      hits.incrementAndGet();
       return val;
     }
 
+    @Override
+    public ExpiringCachedDocCollection put(String key, ExpiringCachedDocCollection value) {
+      puts.incrementAndGet();
+      return super.put(key, value);
+    }
+
     void evictStale() {
       if(!evictLock.tryLock()) return;
       try {
@@ -174,11 +181,30 @@ public class CloudSolrClient extends SolrClient {
       }
     }
 
-  };
+  }
+
+  /**
+   * This is the time to wait to refetch the state
+   * after getting the same state version from ZK
+   * <p>
+   * secs
+   */
+  public void setRetryExpiryTime(int secs) {
+    this.retryExpiryTime = TimeUnit.NANOSECONDS.convert(secs, TimeUnit.SECONDS);
+  }
+
+  public void setSoTimeout(int timeout) {
+    lbClient.setSoTimeout(timeout);
+  }
 
+  protected final StateCache collectionStateCache = new StateCache();
   class ExpiringCachedDocCollection {
     final DocCollection cached;
-    long cachedAt;
+    final long cachedAt;
+    //This is the time at which the collection is retried and got the same old version
+    long retriedAt = -1;
+    //flag that suggests that this is potentially to be rechecked
+    boolean maybeStale = false;
 
     ExpiringCachedDocCollection(DocCollection cached) {
       this.cached = cached;
@@ -189,6 +215,21 @@ public class CloudSolrClient extends SolrClient {
       return (System.nanoTime() - cachedAt)
           > TimeUnit.NANOSECONDS.convert(timeToLiveMs, TimeUnit.MILLISECONDS);
     }
+
+    boolean shoulRetry() {
+      if (maybeStale) {// we are not sure if it is stale so check with retry time
+        if ((retriedAt == -1 ||
+            (System.nanoTime() - retriedAt) > retryExpiryTime)) {
+          return true;// we retried a while back. and we could not get anything new.
+          //it's likely that it is not going to be available now also.
+        }
+      }
+      return false;
+    }
+
+    void setRetriedAt() {
+      retriedAt = System.nanoTime();
+    }
   }
 
   /**
@@ -215,7 +256,7 @@ public class CloudSolrClient extends SolrClient {
    */
   @Deprecated
   public CloudSolrClient(String zkHost) {
-      this.zkHost = zkHost;
+    this.stateProvider = new ZkClientClusterStateProvider(zkHost);
       this.clientIsInternal = true;
       this.myClient = HttpClientUtil.createClient(null);
       this.lbClient = new LBHttpSolrClient.Builder()
@@ -255,8 +296,8 @@ public class CloudSolrClient extends SolrClient {
    * @deprecated use {@link Builder} instead.
    */
   @Deprecated
-  public CloudSolrClient(String zkHost, HttpClient httpClient)  {
-    this.zkHost = zkHost;
+  public CloudSolrClient(String zkHost, HttpClient httpClient) {
+    this.stateProvider = new ZkClientClusterStateProvider(zkHost);
     this.clientIsInternal = httpClient == null;
     this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
     this.lbClient = createLBHttpSolrClient(myClient);
@@ -314,7 +355,7 @@ public class CloudSolrClient extends SolrClient {
    */
   @Deprecated
   public CloudSolrClient(Collection<String> zkHosts, String chroot, HttpClient httpClient) {
-    this.zkHost = buildZkHostString(zkHosts, chroot);
+    this.stateProvider = new ZkClientClusterStateProvider(zkHosts, chroot);
     this.clientIsInternal = httpClient == null;
     this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
     this.lbClient = createLBHttpSolrClient(myClient);
@@ -350,7 +391,7 @@ public class CloudSolrClient extends SolrClient {
    */
   @Deprecated
   public CloudSolrClient(Collection<String> zkHosts, String chroot, HttpClient httpClient, LBHttpSolrClient lbSolrClient, boolean updatesToLeaders) {
-    this(zkHosts, chroot, httpClient, lbSolrClient, null, updatesToLeaders, false);
+    this(zkHosts, chroot, httpClient, lbSolrClient, null, updatesToLeaders, false, null);
   }
 
   /**
@@ -385,8 +426,15 @@ public class CloudSolrClient extends SolrClient {
                           LBHttpSolrClient lbSolrClient,
                           LBHttpSolrClient.Builder lbHttpSolrClientBuilder,
                           boolean updatesToLeaders,
-                          boolean directUpdatesToLeadersOnly) {
-    this.zkHost = buildZkHostString(zkHosts, chroot);
+                          boolean directUpdatesToLeadersOnly,
+                          ClusterStateProvider stateProvider
+
+  ) {
+    if (stateProvider == null) {
+      this.stateProvider = new ZkClientClusterStateProvider(zkHosts, chroot);
+    } else {
+      this.stateProvider = stateProvider;
+    }
     this.clientIsInternal = httpClient == null;
     this.shutdownLBHttpSolrServer = lbSolrClient == null;
     if(lbHttpSolrClientBuilder != null) lbSolrClient = lbHttpSolrClientBuilder.build();
@@ -424,7 +472,7 @@ public class CloudSolrClient extends SolrClient {
    */
   @Deprecated
   public CloudSolrClient(String zkHost, boolean updatesToLeaders, HttpClient httpClient) {
-    this.zkHost = zkHost;
+    this.stateProvider = new ZkClientClusterStateProvider(zkHost);
     this.clientIsInternal = httpClient == null;
     this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
     this.lbClient = new LBHttpSolrClient.Builder()
@@ -443,7 +491,7 @@ public class CloudSolrClient extends SolrClient {
    */
   public void setCollectionCacheTTl(int seconds){
     assert seconds > 0;
-    timeToLive = seconds*1000L;
+    this.collectionStateCache.timeToLive = seconds * 1000L;
   }
 
   /**
@@ -471,8 +519,8 @@ public class CloudSolrClient extends SolrClient {
    */
   @Deprecated
   public CloudSolrClient(String zkHost, LBHttpSolrClient lbClient, boolean updatesToLeaders) {
-    this.zkHost = zkHost;
     this.lbClient = lbClient;
+    this.stateProvider = new ZkClientClusterStateProvider(zkHost);
     this.updatesToLeaders = updatesToLeaders;
     this.directUpdatesToLeadersOnly = false;
     shutdownLBHttpSolrServer = false;
@@ -508,11 +556,15 @@ public class CloudSolrClient extends SolrClient {
    * @return the zkHost value used to connect to zookeeper.
    */
   public String getZkHost() {
-    return zkHost;
+    return assertZKStateProvider().zkHost;
   }
 
   public ZkStateReader getZkStateReader() {
-    return zkStateReader;
+    if (stateProvider instanceof ZkClientClusterStateProvider) {
+      ZkClientClusterStateProvider provider = (ZkClientClusterStateProvider) stateProvider;
+      return provider.zkStateReader;
+    }
+    throw new IllegalStateException("This has no Zk stateReader");
   }
 
   /**
@@ -541,12 +593,12 @@ public class CloudSolrClient extends SolrClient {
 
   /** Set the connect timeout to the zookeeper ensemble in ms */
   public void setZkConnectTimeout(int zkConnectTimeout) {
-    this.zkConnectTimeout = zkConnectTimeout;
+    assertZKStateProvider().zkConnectTimeout = zkConnectTimeout;
   }
 
   /** Set the timeout to the zookeeper ensemble in ms */
   public void setZkClientTimeout(int zkClientTimeout) {
-    this.zkClientTimeout = zkClientTimeout;
+    assertZKStateProvider().zkClientTimeout = zkClientTimeout;
   }
 
   /**
@@ -555,29 +607,7 @@ public class CloudSolrClient extends SolrClient {
    *
    */
   public void connect() {
-    if (zkStateReader == null) {
-      synchronized (this) {
-        if (zkStateReader == null) {
-          ZkStateReader zk = null;
-          try {
-            zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout);
-            zk.createClusterStateWatchersAndUpdate();
-            zkStateReader = zk;
-          } catch (InterruptedException e) {
-            zk.close();
-            Thread.currentThread().interrupt();
-            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
-          } catch (KeeperException e) {
-            zk.close();
-            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
-          } catch (Exception e) {
-            if (zk != null) zk.close();
-            // do not wrap because clients may be relying on the underlying exception being thrown
-            throw e;
-          }
-        }
-      }
-    }
+    stateProvider.connect();
   }
 
   /**
@@ -588,12 +618,12 @@ public class CloudSolrClient extends SolrClient {
    * @throws InterruptedException if the wait is interrupted
    */
   public void connect(long duration, TimeUnit timeUnit) throws TimeoutException, InterruptedException {
-    log.info("Waiting for {} {} for cluster at {} to be ready", duration, timeUnit, zkHost);
+    log.info("Waiting for {} {} for cluster at {} to be ready", duration, timeUnit, stateProvider);
     long timeout = System.nanoTime() + timeUnit.toNanos(duration);
     while (System.nanoTime() < timeout) {
       try {
         connect();
-        log.info("Cluster at {} ready", zkHost);
+        log.info("Cluster at {} ready", stateProvider);
         return;
       }
       catch (RuntimeException e) {
@@ -620,8 +650,16 @@ public class CloudSolrClient extends SolrClient {
    * @throws IOException if an IO error occurs
    */
   public void uploadConfig(Path configPath, String configName) throws IOException {
-    connect();
-    zkStateReader.getConfigManager().uploadConfigDir(configPath, configName);
+    stateProvider.connect();
+    assertZKStateProvider().uploadConfig(configPath, configName);
+  }
+
+  private ZkClientClusterStateProvider assertZKStateProvider() {
+    if (stateProvider instanceof ZkClientClusterStateProvider) {
+      return (ZkClientClusterStateProvider) stateProvider;
+    }
+    throw new IllegalArgumentException("This client does not use ZK");
+
   }
 
   /**
@@ -631,8 +669,7 @@ public class CloudSolrClient extends SolrClient {
    * @throws IOException  if an I/O exception occurs
    */
   public void downloadConfig(String configName, Path downloadPath) throws IOException {
-    connect();
-    zkStateReader.getConfigManager().downloadConfigDir(configName, downloadPath);
+    assertZKStateProvider().downloadConfig(configName, downloadPath);
   }
 
   /**
@@ -650,8 +687,8 @@ public class CloudSolrClient extends SolrClient {
    */
   public void waitForState(String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
       throws InterruptedException, TimeoutException {
-    connect();
-    zkStateReader.waitForState(collection, wait, unit, predicate);
+    stateProvider.connect();
+    assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, predicate);
   }
 
   /**
@@ -665,11 +702,11 @@ public class CloudSolrClient extends SolrClient {
    * @param watcher    a watcher that will be called when the state changes
    */
   public void registerCollectionStateWatcher(String collection, CollectionStateWatcher watcher) {
-    connect();
-    zkStateReader.registerCollectionStateWatcher(collection, watcher);
+    stateProvider.connect();
+    assertZKStateProvider().zkStateReader.registerCollectionStateWatcher(collection, watcher);
   }
 
-  private NamedList<Object> directUpdate(AbstractUpdateRequest request, String collection, ClusterState clusterState) throws SolrServerException {
+  private NamedList<Object> directUpdate(AbstractUpdateRequest request, String collection) throws SolrServerException {
     UpdateRequest updateRequest = (UpdateRequest) request;
     ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
     ModifiableSolrParams routableParams = new ModifiableSolrParams();
@@ -689,15 +726,9 @@ public class CloudSolrClient extends SolrClient {
 
 
     //Check to see if the collection is an alias.
-    Aliases aliases = zkStateReader.getAliases();
-    if(aliases != null) {
-      Map<String, String> collectionAliases = aliases.getCollectionAliasMap();
-      if(collectionAliases != null && collectionAliases.containsKey(collection)) {
-        collection = collectionAliases.get(collection);
-      }
-    }
+    collection = stateProvider.getCollectionName(collection);
 
-    DocCollection col = getDocCollection(clusterState, collection,null);
+    DocCollection col = getDocCollection(collection, null);
 
     DocRouter router = col.getRouter();
     
@@ -1018,12 +1049,12 @@ public class CloudSolrClient extends SolrClient {
     List<DocCollection> requestedCollections = null;
     boolean isAdmin = ADMIN_PATHS.contains(request.getPath());
     if (collection != null &&  !isAdmin) { // don't do _stateVer_ checking for admin requests
-      Set<String> requestedCollectionNames = getCollectionNames(getZkStateReader().getClusterState(), collection);
+      Set<String> requestedCollectionNames = getCollectionNames(collection);
 
       StringBuilder stateVerParamBuilder = null;
       for (String requestedCollection : requestedCollectionNames) {
         // track the version of state we're using on the client side using the _stateVer_ param
-        DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection,null);
+        DocCollection coll = getDocCollection(requestedCollection, null);
         int collVer = coll.getZNodeVersion();
         if (coll.getStateFormat()>1) {
           if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
@@ -1064,7 +1095,7 @@ public class CloudSolrClient extends SolrClient {
         Map invalidStates = (Map) o;
         for (Object invalidEntries : invalidStates.entrySet()) {
           Map.Entry e = (Map.Entry) invalidEntries;
-          getDocCollection(getZkStateReader().getClusterState(),(String)e.getKey(), (Integer)e.getValue());
+          getDocCollection((String) e.getKey(), (Integer) e.getValue());
         }
 
       }
@@ -1097,6 +1128,26 @@ public class CloudSolrClient extends SolrClient {
               rootCause instanceof NoHttpResponseException ||
               rootCause instanceof SocketException);
 
+      if (wasCommError) {
+        // it was a communication error. it is likely that
+        // the node to which the request to be sent is down . So , expire the state
+        // so that the next attempt would fetch the fresh state
+        // just re-read state for all of them, if it has not been retired
+        // in retryExpiryTime time
+        for (DocCollection ext : requestedCollections) {
+          ExpiringCachedDocCollection cacheEntry = collectionStateCache.get(ext.getName());
+          if (cacheEntry == null) continue;
+          cacheEntry.maybeStale = true;
+        }
+        if (retryCount < MAX_STALE_RETRIES) {//if it is a communication error , we must try again
+          //may be, we have a stale version of the collection state
+          // and we could not get any information from the server
+          //it is probably not worth trying again and again because
+          // the state would not have been updated
+          return requestWithRetryOnStaleState(request, retryCount + 1, collection);
+        }
+      }
+
       boolean stateWasStale = false;
       if (retryCount < MAX_STALE_RETRIES  &&
           requestedCollections != null    &&
@@ -1121,7 +1172,7 @@ public class CloudSolrClient extends SolrClient {
           !requestedCollections.isEmpty() &&
           wasCommError) {
         for (DocCollection ext : requestedCollections) {
-          DocCollection latestStateFromZk = getDocCollection(zkStateReader.getClusterState(), ext.getName(),null);
+          DocCollection latestStateFromZk = getDocCollection(ext.getName(), null);
           if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
             // looks like we couldn't reach the server because the state was stale == retry
             stateWasStale = true;
@@ -1158,15 +1209,13 @@ public class CloudSolrClient extends SolrClient {
   protected NamedList<Object> sendRequest(SolrRequest request, String collection)
       throws SolrServerException, IOException {
     connect();
-    
-    ClusterState clusterState = zkStateReader.getClusterState();
-    
+
     boolean sendToLeaders = false;
     List<String> replicas = null;
     
     if (request instanceof IsUpdateRequest) {
       if (request instanceof UpdateRequest) {
-        NamedList<Object> response = directUpdate((AbstractUpdateRequest) request, collection, clusterState);
+        NamedList<Object> response = directUpdate((AbstractUpdateRequest) request, collection);
         if (response != null) {
           return response;
         }
@@ -1181,9 +1230,10 @@ public class CloudSolrClient extends SolrClient {
     }
     List<String> theUrlList = new ArrayList<>();
     if (ADMIN_PATHS.contains(request.getPath())) {
-      Set<String> liveNodes = clusterState.getLiveNodes();
+      Set<String> liveNodes = stateProvider.liveNodes();
       for (String liveNode : liveNodes) {
-        theUrlList.add(zkStateReader.getBaseUrlForNodeName(liveNode));
+        theUrlList.add(ZkStateReader.getBaseUrlForNodeName(liveNode,
+            (String) stateProvider.getClusterProperties().getOrDefault(ZkStateReader.URL_SCHEME,"http")));
       }
     } else {
       
@@ -1191,8 +1241,8 @@ public class CloudSolrClient extends SolrClient {
         throw new SolrServerException(
             "No collection param specified on request and no default collection has been set.");
       }
-      
-      Set<String> collectionNames = getCollectionNames(clusterState, collection);
+
+      Set<String> collectionNames = getCollectionNames(collection);
       if (collectionNames.size() == 0) {
         throw new SolrException(ErrorCode.BAD_REQUEST,
             "Could not find collection: " + collection);
@@ -1209,11 +1259,11 @@ public class CloudSolrClient extends SolrClient {
       // add it to the Map of slices.
       Map<String,Slice> slices = new HashMap<>();
       for (String collectionName : collectionNames) {
-        DocCollection col = getDocCollection(clusterState, collectionName, null);
+        DocCollection col = getDocCollection(collectionName, null);
         Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col);
         ClientUtils.addSlices(slices, collectionName, routeSlices, true);
       }
-      Set<String> liveNodes = clusterState.getLiveNodes();
+      Set<String> liveNodes = stateProvider.liveNodes();
 
       List<String> leaderUrlList = null;
       List<String> urlList = null;
@@ -1289,16 +1339,14 @@ public class CloudSolrClient extends SolrClient {
     return rsp.getResponse();
   }
 
-  private Set<String> getCollectionNames(ClusterState clusterState,
-                                         String collection) {
+  Set<String> getCollectionNames(String collection) {
     // Extract each comma separated collection name and store in a List.
     List<String> rawCollectionsList = StrUtils.splitSmart(collection, ",", true);
     Set<String> collectionNames = new HashSet<>();
     // validate collections
     for (String collectionName : rawCollectionsList) {
-      if (!clusterState.hasCollection(collectionName)) {
-        Aliases aliases = zkStateReader.getAliases();
-        String alias = aliases.getCollectionAlias(collectionName);
+      if (stateProvider.getState(collectionName) == null) {
+        String alias = stateProvider.getAlias(collection);
         if (alias != null) {
           List<String> aliasList = StrUtils.splitSmart(alias, ",", true);
           collectionNames.addAll(aliasList);
@@ -1315,13 +1363,7 @@ public class CloudSolrClient extends SolrClient {
 
   @Override
   public void close() throws IOException {
-    if (zkStateReader != null) {
-      synchronized(this) {
-        if (zkStateReader!= null)
-          zkStateReader.close();
-        zkStateReader = null;
-      }
-    }
+    stateProvider.close();
     
     if (shutdownLBHttpSolrServer) {
       lbClient.close();
@@ -1367,15 +1409,17 @@ public class CloudSolrClient extends SolrClient {
   }
 
 
-  protected DocCollection getDocCollection(ClusterState clusterState, String collection, Integer expectedVersion) throws SolrException {
+  protected DocCollection getDocCollection(String collection, Integer expectedVersion) throws SolrException {
+    if (expectedVersion == null) expectedVersion = -1;
     if (collection == null) return null;
-    DocCollection col = getFromCache(collection);
+    ExpiringCachedDocCollection cacheEntry = collectionStateCache.get(collection);
+    DocCollection col = cacheEntry == null ? null : cacheEntry.cached;
     if (col != null) {
-      if (expectedVersion == null) return col;
-      if (expectedVersion.intValue() == col.getZNodeVersion()) return col;
+      if (expectedVersion <= col.getZNodeVersion()
+          && !cacheEntry.shoulRetry()) return col;
     }
 
-    ClusterState.CollectionRef ref = clusterState.getCollectionRef(collection);
+    ClusterState.CollectionRef ref = getCollectionRef(collection);
     if (ref == null) {
       //no such collection exists
       return null;
@@ -1386,30 +1430,34 @@ public class CloudSolrClient extends SolrClient {
     }
     List locks = this.locks;
     final Object lock = locks.get(Math.abs(Hash.murmurhash3_x86_32(collection, 0, collection.length(), 0) % locks.size()));
+    DocCollection fetchedCol = null;
     synchronized (lock) {
-      //we have waited for sometime just check once again
-      col = getFromCache(collection);
+      /*we have waited for sometime just check once again*/
+      cacheEntry = collectionStateCache.get(collection);
+      col = cacheEntry == null ? null : cacheEntry.cached;
       if (col != null) {
-        if (expectedVersion == null) return col;
-        if (expectedVersion.intValue() == col.getZNodeVersion()) {
-          return col;
-        } else {
-          collectionStateCache.remove(collection);
-        }
+        if (expectedVersion <= col.getZNodeVersion()
+            && !cacheEntry.shoulRetry()) return col;
+      }
+      // We are going to fetch a new version
+      // we MUST try to get a new version
+      fetchedCol = ref.get();//this is a call to ZK
+      if (fetchedCol == null) return null;// this collection no more exists
+      if (col != null && fetchedCol.getZNodeVersion() == col.getZNodeVersion()) {
+        cacheEntry.setRetriedAt();//we retried and found that it is the same version
+        cacheEntry.maybeStale = false;
+      } else {
+        if (fetchedCol.getStateFormat() > 1)
+          collectionStateCache.put(collection, new ExpiringCachedDocCollection(fetchedCol));
       }
-      col = ref.get();//this is a call to ZK
+      return fetchedCol;
     }
-    if (col == null) return null;
-    if (col.getStateFormat() > 1) collectionStateCache.put(collection, new ExpiringCachedDocCollection(col));
-    return col;
   }
 
-  private DocCollection getFromCache(String c){
-    ExpiringCachedDocCollection cachedState = collectionStateCache.get(c);
-    return cachedState != null ? cachedState.cached : null;
+  ClusterState.CollectionRef getCollectionRef(String collection) {
+    return stateProvider.getState(collection);
   }
 
-
   /**
    * Useful for determining the minimum achieved replication factor across
    * all shards involved in processing an update request, typically useful
@@ -1445,9 +1493,9 @@ public class CloudSolrClient extends SolrClient {
     Map<String,Integer> results = new HashMap<String,Integer>();
     if (resp instanceof CloudSolrClient.RouteResponse) {
       NamedList routes = ((CloudSolrClient.RouteResponse)resp).getRouteResponses();
-      ClusterState clusterState = zkStateReader.getClusterState();     
+      DocCollection coll = getDocCollection(collection, null);
       Map<String,String> leaders = new HashMap<String,String>();
-      for (Slice slice : clusterState.getActiveSlices(collection)) {
+      for (Slice slice : coll.getActiveSlices()) {
         Replica leader = slice.getLeader();
         if (leader != null) {
           ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
@@ -1484,10 +1532,6 @@ public class CloudSolrClient extends SolrClient {
     this.lbClient.setConnectionTimeout(timeout); 
   }
 
-  public void setSoTimeout(int timeout) {
-    this.lbClient.setSoTimeout(timeout);
-  }
-
   private static boolean hasInfoToFindLeaders(UpdateRequest updateRequest, String idField) {
     final Map<SolrInputDocument,Map<String,Object>> documents = updateRequest.getDocumentsMap();
     final Map<String,Map<String,Object>> deleteById = updateRequest.getDeleteByIdMap();
@@ -1564,7 +1608,9 @@ public class CloudSolrClient extends SolrClient {
     private LBHttpSolrClient.Builder lbClientBuilder;
     private boolean shardLeadersOnly;
     private boolean directUpdatesToLeadersOnly;
-    
+    private ClusterStateProvider stateProvider;
+
+
     public Builder() {
       this.zkHosts = new ArrayList();
       this.shardLeadersOnly = true;
@@ -1666,12 +1712,35 @@ public class CloudSolrClient extends SolrClient {
       return this;
     }
 
+    public Builder withClusterStateProvider(ClusterStateProvider stateProvider) {
+      this.stateProvider = stateProvider;
+      return this;
+    }
+
     /**
      * Create a {@link CloudSolrClient} based on the provided configuration.
      */
     public CloudSolrClient build() {
+      if (stateProvider == null) {
+        stateProvider = new ZkClientClusterStateProvider(zkHosts, zkChroot);
+      }
       return new CloudSolrClient(zkHosts, zkChroot, httpClient, loadBalancedSolrClient, lbClientBuilder,
-          shardLeadersOnly, directUpdatesToLeadersOnly);
+          shardLeadersOnly, directUpdatesToLeadersOnly, stateProvider);
     }
   }
+
+  interface ClusterStateProvider extends Closeable {
+
+    ClusterState.CollectionRef getState(String collection);
+
+    Set<String> liveNodes();
+
+    String getAlias(String collection);
+
+    String getCollectionName(String name);
+
+    Map<String, Object> getClusterProperties();
+
+    void connect();
+  }
 }