You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2014/06/19 18:25:36 UTC

svn commit: r1603938 [5/5] - in /lucene/dev/branches/solr-5473: ./ dev-tools/ lucene/ lucene/analysis/ lucene/analysis/common/ lucene/analysis/common/src/java/org/apache/lucene/analysis/hunspell/ lucene/analysis/common/src/test/org/apache/lucene/analys...

Modified: lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/AssignTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/AssignTest.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/AssignTest.java (original)
+++ lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/AssignTest.java Thu Jun 19 16:25:31 2014
@@ -86,7 +86,7 @@ public class AssignTest extends SolrTest
     collectionStates.put(cname, docCollection);
     
     Set<String> liveNodes = new HashSet<>();
-    ClusterState state = new ClusterState(-1,liveNodes, collectionStates);
+    ClusterState state = new ClusterState(-1,liveNodes, collectionStates, ClusterStateTest.getMockZkStateReader(collectionStates.keySet()));
     String nodeName = Assign.assignNode("collection1", state);
     
     assertEquals("core_node2", nodeName);

Modified: lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java (original)
+++ lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java Thu Jun 19 16:25:31 2014
@@ -62,10 +62,10 @@ public class ClusterStateTest extends So
     collectionStates.put("collection2", new DocCollection("collection2", slices, null, DocRouter.DEFAULT));
     ZkStateReader zkStateReaderMock = getMockZkStateReader(collectionStates.keySet());
     
-    ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates);
+    ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates,zkStateReaderMock);
     byte[] bytes = ZkStateReader.toJSON(clusterState);
     // System.out.println("#################### " + new String(bytes));
-    ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes);
+    ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes,zkStateReaderMock,null);
     
     assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
         .getLiveNodes().size());
@@ -73,13 +73,13 @@ public class ClusterStateTest extends So
     assertEquals("Poperties not copied properly", replica.getStr("prop1"), loadedClusterState.getSlice("collection1", "shard1").getReplicasMap().get("node1").getStr("prop1"));
     assertEquals("Poperties not copied properly", replica.getStr("prop2"), loadedClusterState.getSlice("collection1", "shard1").getReplicasMap().get("node1").getStr("prop2"));
 
-    loadedClusterState = ClusterState.load(-1, new byte[0], liveNodes);
+    loadedClusterState = ClusterState.load(-1, new byte[0], liveNodes, getMockZkStateReader(Collections.<String>emptySet()),null );
     
     assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
         .getLiveNodes().size());
     assertEquals("Should not have collections", 0, loadedClusterState.getCollections().size());
 
-    loadedClusterState = ClusterState.load(-1, (byte[])null, liveNodes);
+    loadedClusterState = ClusterState.load(-1, (byte[])null, liveNodes,getMockZkStateReader(Collections.<String>emptySet()),null);
     
     assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
         .getLiveNodes().size());
@@ -89,6 +89,13 @@ public class ClusterStateTest extends So
   public static ZkStateReader getMockZkStateReader(final Set<String> collections) {
     ZkStateReader mock = createMock(ZkStateReader.class);
     EasyMock.reset(mock);
+    mock.getAllCollections();
+    EasyMock.expectLastCall().andAnswer(new IAnswer<Set<String>>() {
+      @Override
+      public Set<String> answer() throws Throwable {
+        return collections;
+      }
+    }).anyTimes();
     EasyMock.replay(mock);
 
     return mock;

Added: lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/ExternalCollectionsTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/ExternalCollectionsTest.java?rev=1603938&view=auto
==============================================================================
--- lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/ExternalCollectionsTest.java (added)
+++ lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/ExternalCollectionsTest.java Thu Jun 19 16:25:31 2014
@@ -0,0 +1,119 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.impl.CloudSolrServer;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.zookeeper.data.Stat;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+public class ExternalCollectionsTest extends AbstractFullDistribZkTestBase {
+  private CloudSolrServer client;
+
+  @BeforeClass
+  public static void beforeThisClass2() throws Exception {
+
+  }
+
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    System.setProperty("numShards", Integer.toString(sliceCount));
+    System.setProperty("solr.xml.persist", "true");
+    client = createCloudClient(null);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    super.tearDown();
+    client.shutdown();
+  }
+
+  protected String getSolrXml() {
+    return "solr-no-core.xml";
+  }
+
+  public ExternalCollectionsTest() {
+    fixShardCount = true;
+
+    sliceCount = 2;
+    shardCount = 4;
+
+    checkCreatedVsState = false;
+  }
+
+
+  @Override
+  public void doTest() throws Exception {
+    testZkNodeLocation();
+  }
+
+
+  boolean externalColl = false;
+
+  @Override
+  protected int getStateFormat() {
+    return externalColl ? 2:1;
+  }
+
+  private void testZkNodeLocation() throws Exception{
+    externalColl=true;
+
+    String collectionName = "myExternColl";
+
+    createCollection(collectionName, client, 2, 2);
+
+    waitForRecoveriesToFinish(collectionName, false);
+    assertTrue("does not exist collection state externally",
+        cloudClient.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collectionName), true));
+    Stat stat = new Stat();
+    byte[] data = cloudClient.getZkStateReader().getZkClient().getData(ZkStateReader.getCollectionPath(collectionName), null, stat, true);
+    DocCollection c = ZkStateReader.getCollectionLive(cloudClient.getZkStateReader(), collectionName);
+    ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+    assertEquals("The zkversion of the nodes must be same zkver:" + stat.getVersion() , stat.getVersion(),clusterState.getCollection(collectionName).getZNodeVersion() );
+    assertTrue("DocCllection#getStateFormat() must be > 1", cloudClient.getZkStateReader().getClusterState().getCollection(collectionName).getStateFormat() > 1);
+
+
+    // remove collection
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set("action", CollectionParams.CollectionAction.DELETE.toString());
+    params.set("name", collectionName);
+    QueryRequest request = new QueryRequest(params);
+    request.setPath("/admin/collections");
+    if (client == null) {
+      client = createCloudClient(null);
+    }
+
+    client.request(request);
+
+    checkForMissingCollection(collectionName);
+    assertFalse("collection state should not exist externally", cloudClient.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collectionName), true));
+
+  }
+}
+
+
+

Modified: lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java (original)
+++ lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java Thu Jun 19 16:25:31 2014
@@ -50,9 +50,9 @@ public class SliceStateTest extends Solr
     collectionStates.put("collection1", new DocCollection("collection1", slices, null, DocRouter.DEFAULT));
 
     ZkStateReader mockZkStateReader = ClusterStateTest.getMockZkStateReader(collectionStates.keySet());
-    ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates);
+    ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates, mockZkStateReader);
     byte[] bytes = ZkStateReader.toJSON(clusterState);
-    ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes);
+    ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes, mockZkStateReader,null);
 
     assertEquals("Default state not set to active", "active", loadedClusterState.getSlice("collection1", "shard1").getState());
   }

Modified: lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/SliceStateUpdateTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/SliceStateUpdateTest.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/SliceStateUpdateTest.java (original)
+++ lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/SliceStateUpdateTest.java Thu Jun 19 16:25:31 2014
@@ -30,6 +30,7 @@ import org.apache.solr.common.cloud.ZkNo
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.core.CoreContainer;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -39,7 +40,6 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.Map;
 
 @Slow
@@ -181,8 +181,22 @@ public class SliceStateUpdateTest extend
     assertEquals("shard1", slices.get("shard1").getName());
     assertEquals("inactive", slices.get("shard1").getState());
 
-    container1.getZkController().getOverseerElector().getContext().cancelElection();
-    container2.getZkController().getOverseerElector().getContext().cancelElection();
+
+    cancelElection(container1);
+    cancelElection(container2);
+  }
+
+  private void cancelElection(CoreContainer cc) throws InterruptedException, KeeperException {
+    for(int i=0;i<10;) {
+      if(cc.getZkController().getOverseerElector().getContext().leaderSeqPath ==null){
+        Thread.sleep(20);
+        continue;
+      }else{
+        break;
+      }
+    }
+    cc.getZkController().getOverseerElector().getContext().cancelElection();
+
   }
 
   private void closeThread(OverseerThread updaterThread) {

Modified: lucene/dev/branches/solr-5473/solr/example/solr/collection1/conf/schema.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/example/solr/collection1/conf/schema.xml?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/example/solr/collection1/conf/schema.xml (original)
+++ lucene/dev/branches/solr-5473/solr/example/solr/collection1/conf/schema.xml Thu Jun 19 16:25:31 2014
@@ -1106,9 +1106,8 @@
     <!-- Thai -->
     <fieldType name="text_th" class="solr.TextField" positionIncrementGap="100">
       <analyzer> 
-        <tokenizer class="solr.StandardTokenizerFactory"/>
+        <tokenizer class="solr.ThaiTokenizerFactory"/>
         <filter class="solr.LowerCaseFilterFactory"/>
-        <filter class="solr.ThaiWordFilterFactory"/>
         <filter class="solr.StopFilterFactory" ignoreCase="true" words="lang/stopwords_th.txt" />
       </analyzer>
     </fieldType>

Modified: lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java (original)
+++ lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java Thu Jun 19 16:25:31 2014
@@ -18,7 +18,9 @@ package org.apache.solr.client.solrj.imp
  */
 
 import java.io.IOException;
+import java.net.ConnectException;
 import java.net.MalformedURLException;
+import java.net.SocketException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -30,13 +32,16 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.http.NoHttpResponseException;
 import org.apache.http.client.HttpClient;
+import org.apache.http.conn.ConnectTimeoutException;
 import org.apache.solr.client.solrj.ResponseParser;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServer;
@@ -67,6 +72,8 @@ import org.apache.solr.common.util.Named
 import org.apache.solr.common.util.SolrjNamedThreadFactory;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * SolrJ client class to communicate with SolrCloud.
@@ -79,6 +86,8 @@ import org.apache.zookeeper.KeeperExcept
  * with {@link #setIdField(String)}.
  */
 public class CloudSolrServer extends SolrServer {
+  private static final Logger log = LoggerFactory.getLogger(CloudSolrServer.class);
+
   private volatile ZkStateReader zkStateReader;
   private String zkHost; // the zk server address
   private int zkConnectTimeout = 10000;
@@ -87,6 +96,8 @@ public class CloudSolrServer extends Sol
   private final LBHttpSolrServer lbServer;
   private final boolean shutdownLBHttpSolrServer;
   private HttpClient myClient;
+  //no of times collection state to be reloaded if stale state error is received
+  private static final int MAX_STALE_RETRIES = 5;
   Random rand = new Random();
   
   private final boolean updatesToLeaders;
@@ -95,6 +106,7 @@ public class CloudSolrServer extends Sol
       .newCachedThreadPool(new SolrjNamedThreadFactory(
           "CloudSolrServer ThreadPool"));
   private String idField = "id";
+  public static final String STATE_VERSION = "_stateVer_";
   private final Set<String> NON_ROUTABLE_PARAMS;
   {
     NON_ROUTABLE_PARAMS = new HashSet<>();
@@ -112,8 +124,36 @@ public class CloudSolrServer extends Sol
     // NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
 
   }
+  private volatile long timeToLive = 60* 1000L;
+
+
+  protected Map<String, ExpiringCachedDocCollection> collectionStateCache = new ConcurrentHashMap<String, ExpiringCachedDocCollection>(){
+    @Override
+    public ExpiringCachedDocCollection get(Object key) {
+      ExpiringCachedDocCollection val = super.get(key);
+      if(val == null) return null;
+      if(val.isExpired(timeToLive)) {
+        super.remove(key);
+        return null;
+      }
+      return val;
+    }
+
+  };
 
+  class ExpiringCachedDocCollection {
+    DocCollection cached;
+    long cachedAt;
 
+    ExpiringCachedDocCollection(DocCollection cached) {
+      this.cached = cached;
+      this.cachedAt = System.currentTimeMillis();
+    }
+
+    boolean isExpired(long timeToLive) {
+      return (System.currentTimeMillis() - cachedAt) > timeToLive;
+    }
+  }
 
   /**
    * @param zkHost The client endpoint of the zookeeper quorum containing the cloud state,
@@ -127,6 +167,8 @@ public class CloudSolrServer extends Sol
       this.lbServer.setParser(new BinaryResponseParser());
       this.updatesToLeaders = true;
       shutdownLBHttpSolrServer = true;
+      setupStateVerParamOnQueryString(lbServer);
+
   }
   
   public CloudSolrServer(String zkHost, boolean updatesToLeaders)
@@ -138,6 +180,15 @@ public class CloudSolrServer extends Sol
     this.lbServer.setParser(new BinaryResponseParser());
     this.updatesToLeaders = updatesToLeaders;
     shutdownLBHttpSolrServer = true;
+    setupStateVerParamOnQueryString(lbServer);
+  }
+
+  /**Sets the cache ttl for DocCollection Objects cached  . This is only applicable for collections which are persisted outside of clusterstate.json
+   * @param seconds ttl value in seconds
+   */
+  public void setCollectionCacheTTl(int seconds){
+    assert seconds > 0;
+    timeToLive = seconds*1000L;
   }
 
   /**
@@ -150,6 +201,7 @@ public class CloudSolrServer extends Sol
     this.lbServer = lbServer;
     this.updatesToLeaders = true;
     shutdownLBHttpSolrServer = false;
+    setupStateVerParamOnQueryString(lbServer);
   }
   
   /**
@@ -163,8 +215,24 @@ public class CloudSolrServer extends Sol
     this.lbServer = lbServer;
     this.updatesToLeaders = updatesToLeaders;
     shutdownLBHttpSolrServer = false;
+    setupStateVerParamOnQueryString(lbServer);
+
   }
   
+  /**
+   * Used internally to setup the _stateVer_ param to be sent in the query string of requests
+   * coming from this instance.
+   */
+  protected void setupStateVerParamOnQueryString(LBHttpSolrServer lbServer) {
+    // setup the stateVer param to be passed in the query string of every request
+    Set<String> queryStringParams = lbServer.getQueryParams();
+    if (queryStringParams == null) {
+      queryStringParams = new HashSet<String>(2);
+      lbServer.setQueryParams(queryStringParams);
+    }
+    queryStringParams.add("_stateVer_");
+  }
+
   public ResponseParser getParser() {
     return lbServer.getParser();
   }
@@ -238,8 +306,7 @@ public class CloudSolrServer extends Sol
         if (zkStateReader == null) {
           ZkStateReader zk = null;
           try {
-            zk = new ZkStateReader(zkHost, zkClientTimeout,
-                zkConnectTimeout);
+            zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout);
             zk.createClusterStateWatchersAndUpdate();
             zkStateReader = zk;
           } catch (InterruptedException e) {
@@ -302,7 +369,7 @@ public class CloudSolrServer extends Sol
       }
     }
 
-    DocCollection col = clusterState.getCollection(collection);
+    DocCollection col = getDocCollection(clusterState, collection);
 
     DocRouter router = col.getRouter();
     
@@ -519,7 +586,146 @@ public class CloudSolrServer extends Sol
   }
 
   @Override
-  public NamedList<Object> request(SolrRequest request)
+  public NamedList<Object> request(SolrRequest request) throws SolrServerException, IOException {
+    SolrParams reqParams = request.getParams();
+    String collection = (reqParams != null) ? reqParams.get("collection", getDefaultCollection()) : getDefaultCollection();
+    return requestWithRetryOnStaleState(request, 0, collection);
+  }
+
+  /**
+   * As this class doesn't watch external collections on the client side,
+   * there's a chance that the request will fail due to cached stale state,
+   * which means the state must be refreshed from ZK and retried.
+   */
+  protected NamedList<Object> requestWithRetryOnStaleState(SolrRequest request, int retryCount, String collection)
+      throws SolrServerException, IOException {
+
+    connect(); // important to call this before you start working with the ZkStateReader
+
+    // build up a _stateVer_ param to pass to the server containing all of the
+    // external collection state versions involved in this request, which allows
+    // the server to notify us that our cached state for one or more of the external
+    // collections is stale and needs to be refreshed ... this code has no impact on internal collections
+    String stateVerParam = null;
+    List<DocCollection> requestedExternalCollections = null;
+    if (collection != null && !request.getPath().startsWith("/admin")) { // don't do _stateVer_ checking for admin requests
+      Set<String> requestedCollectionNames = getCollectionList(getZkStateReader().getClusterState(), collection);
+
+      StringBuilder stateVerParamBuilder = null;
+      for (String requestedCollection : requestedCollectionNames) {
+        // track the version of state we're using on the client side using the _stateVer_ param
+        DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection);
+        int collVer = coll.getZNodeVersion();
+        if (coll.getStateFormat()>1) {
+          if(requestedExternalCollections == null) requestedExternalCollections = new ArrayList<>(requestedCollectionNames.size());
+          requestedExternalCollections.add(coll);
+
+          if (stateVerParamBuilder == null) {
+            stateVerParamBuilder = new StringBuilder();
+          } else {
+            stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
+          }
+
+          stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
+        }
+      }
+
+      if (stateVerParamBuilder != null) {
+        stateVerParam = stateVerParamBuilder.toString();
+      }
+    }
+
+    if (request.getParams() instanceof ModifiableSolrParams) {
+      ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
+      if (stateVerParam != null) {
+        params.set(STATE_VERSION, stateVerParam);
+      } else {
+        params.remove(STATE_VERSION);
+      }
+    } // else: ??? how to set this ???
+
+    NamedList<Object> resp = null;
+    try {
+      resp = sendRequest(request);
+    } catch (Exception exc) {
+
+      Throwable rootCause = SolrException.getRootCause(exc);
+      // don't do retry support for admin requests or if the request doesn't have a collection specified
+      if (collection == null || request.getPath().startsWith("/admin")) {
+        if (exc instanceof SolrServerException) {
+          throw (SolrServerException)exc;
+        } else if (exc instanceof IOException) {
+          throw (IOException)exc;
+        }else if (exc instanceof RuntimeException) {
+          throw (RuntimeException) exc;
+        }
+        else {
+          throw new SolrServerException(rootCause);
+        }
+      }
+
+      int errorCode = (rootCause instanceof SolrException) ?
+          ((SolrException)rootCause).code() : SolrException.ErrorCode.UNKNOWN.code;
+
+      log.error("Request to collection {} failed due to ("+errorCode+
+          ") {}, retry? "+retryCount, collection, rootCause.toString());
+
+      boolean wasCommError =
+          (rootCause instanceof ConnectException ||
+              rootCause instanceof ConnectTimeoutException ||
+              rootCause instanceof NoHttpResponseException ||
+              rootCause instanceof SocketException);
+
+      boolean stateWasStale = false;
+      if (retryCount < MAX_STALE_RETRIES  &&
+          !requestedExternalCollections.isEmpty() &&
+          SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE)
+      {
+        // cached state for one or more external collections was stale
+        // re-issue request using updated state
+        stateWasStale = true;
+
+        // just re-read state for all of them, which is a little heavy handed but hopefully a rare occurrence
+        for (DocCollection ext : requestedExternalCollections) {
+          collectionStateCache.remove(ext.getName());
+        }
+      }
+
+      // if we experienced a communication error, it's worth checking the state
+      // with ZK just to make sure the node we're trying to hit is still part of the collection
+      if (retryCount < MAX_STALE_RETRIES && !stateWasStale && !requestedExternalCollections.isEmpty() && wasCommError) {
+        for (DocCollection ext : requestedExternalCollections) {
+          DocCollection latestStateFromZk = getZkStateReader().getCollection(ext.getName());
+          if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
+            // looks like we couldn't reach the server because the state was stale == retry
+            stateWasStale = true;
+            // we just pulled state from ZK, so update the cache so that the retry uses it
+            collectionStateCache.put(ext.getName(), new ExpiringCachedDocCollection(latestStateFromZk));
+          }
+        }
+      }
+
+      requestedExternalCollections.clear(); // done with this
+
+      // if the state was stale, then we retry the request once with new state pulled from Zk
+      if (stateWasStale) {
+        log.warn("Re-trying request to  collection(s) "+collection+" after stale state error from server.");
+        resp = requestWithRetryOnStaleState(request, retryCount+1, collection);
+      } else {
+        if (exc instanceof SolrServerException) {
+          throw (SolrServerException)exc;
+        } else if (exc instanceof IOException) {
+          throw (IOException)exc;
+        } else {
+          throw new SolrServerException(rootCause);
+        }
+      }
+    }
+
+    return resp;
+  }
+
+  protected NamedList<Object> sendRequest(SolrRequest request)
       throws SolrServerException, IOException {
     connect();
     
@@ -579,7 +785,7 @@ public class CloudSolrServer extends Sol
       // add it to the Map of slices.
       Map<String,Slice> slices = new HashMap<>();
       for (String collectionName : collectionsList) {
-        DocCollection col = clusterState.getCollection(collectionName);
+        DocCollection col = getDocCollection(clusterState, collectionName);
         Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col);
         ClientUtils.addSlices(slices, collectionName, routeSlices, true);
       }
@@ -671,14 +877,17 @@ public class CloudSolrServer extends Sol
         Aliases aliases = zkStateReader.getAliases();
         String alias = aliases.getCollectionAlias(collectionName);
         if (alias != null) {
-          List<String> aliasList = StrUtils.splitSmart(alias, ",", true); 
+          List<String> aliasList = StrUtils.splitSmart(alias, ",", true);
           collectionsList.addAll(aliasList);
           continue;
         }
-        
-        throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
+
+        DocCollection docCollection = getDocCollection(clusterState, collection);
+        if (docCollection == null) {
+          throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
+        }
       }
-      
+
       collectionsList.add(collectionName);
     }
     return collectionsList;
@@ -715,6 +924,28 @@ public class CloudSolrServer extends Sol
     return updatesToLeaders;
   }
 
+  protected DocCollection getDocCollection(ClusterState clusterState, String collection) throws SolrException {
+    ExpiringCachedDocCollection cachedState = collectionStateCache != null ? collectionStateCache.get(collection) : null;
+    if (cachedState != null && cachedState.cached != null) {
+      return cachedState.cached;
+    }
+
+    DocCollection col = clusterState.getCollectionOrNull(collection);
+    if(col == null ) return  null;
+    collectionStateCache.put(collection, new ExpiringCachedDocCollection(col));
+    return col;
+  }
+
+  /**
+   * Extension point to allow sub-classes to override the ZkStateReader this class uses internally.
+   */
+  protected ZkStateReader createZkStateReader(String zkHost, int zkClientTimeout, int zkConnectTimeout)
+      throws InterruptedException, TimeoutException, IOException, KeeperException {
+    ZkStateReader zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout);
+    zk.createClusterStateWatchersAndUpdate();
+    return zk;
+  }
+
   /**
    * Useful for determining the minimum achieved replication factor across
    * all shards involved in processing an update request, typically useful

Modified: lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java (original)
+++ lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java Thu Jun 19 16:25:31 2014
@@ -46,36 +46,25 @@ public class ClusterState implements JSO
   
   private final Map<String, DocCollection> collectionStates;  // Map<collectionName, Map<sliceName,Slice>>
   private Set<String> liveNodes;
+  private final ZkStateReader stateReader;
 
-
-  /**
-   * Use this constr when ClusterState is meant for publication.
-   * 
-   * hashCode and equals will only depend on liveNodes and not clusterStateVersion.
-   */
-  @Deprecated
-  public ClusterState(Set<String> liveNodes,
-      Map<String, DocCollection> collectionStates) {
-    this(null, liveNodes, collectionStates);
-  }
-
-
-  
   /**
    * Use this constr when ClusterState is meant for consumption.
    */
   public ClusterState(Integer zkClusterStateVersion, Set<String> liveNodes,
-      Map<String, DocCollection> collectionStates) {
+      Map<String, DocCollection> collectionStates, ZkStateReader stateReader) {
+    assert stateReader != null;
     this.zkClusterStateVersion = zkClusterStateVersion;
     this.liveNodes = new HashSet<>(liveNodes.size());
     this.liveNodes.addAll(liveNodes);
     this.collectionStates = new LinkedHashMap<>(collectionStates.size());
     this.collectionStates.putAll(collectionStates);
+    this.stateReader = stateReader;
 
   }
 
   public ClusterState copyWith(Map<String,DocCollection> modified){
-    ClusterState result = new ClusterState(zkClusterStateVersion, liveNodes,collectionStates);
+    ClusterState result = new ClusterState(zkClusterStateVersion, liveNodes,collectionStates,stateReader);
     for (Entry<String, DocCollection> e : modified.entrySet()) {
       DocCollection c = e.getValue();
       if(c == null) {
@@ -98,17 +87,10 @@ public class ClusterState implements JSO
     if (slice == null) return null;
     return slice.getLeader();
   }
-  private Replica getReplica(DocCollection coll, String replicaName) {
-    if (coll == null) return null;
-    for (Slice slice : coll.getSlices()) {
-      Replica replica = slice.getReplica(replicaName);
-      if (replica != null) return replica;
-    }
-    return null;
-  }
 
   public boolean hasCollection(String coll) {
-    return  collectionStates.containsKey(coll) ;
+    if (collectionStates.containsKey(coll)) return true;
+    return stateReader.getAllCollections().contains(coll);
   }
 
   /**
@@ -116,8 +98,9 @@ public class ClusterState implements JSO
    * If the slice is known, do not use this method.
    * coreNodeName is the same as replicaName
    */
-  public Replica getReplica(final String collection, final String coreNodeName) {
-    return getReplica(collectionStates.get(collection), coreNodeName);
+  public Replica getReplica(final String collection, final String coreNodeName, boolean cachedOnly) {
+    DocCollection coll = stateReader.getCollection(collection,cachedOnly);
+    return coll == null? null: coll.getReplica(coreNodeName);
   }
 
   /**
@@ -153,6 +136,35 @@ public class ClusterState implements JSO
     return coll.getActiveSlices();
   }
 
+  /**
+   * Get the {@code DocCollection} object if available. This method will
+   * never hit ZooKeeper and attempt to fetch collection from locally available
+   * state only.
+   *
+   * @param collection the name of the collection
+   * @return the {@link org.apache.solr.common.cloud.DocCollection} or null if not found
+   */
+  private DocCollection getCachedCollection(String collection) {
+    DocCollection c = collectionStates.get(collection);
+    if (c != null)  return c;
+    if (!stateReader.getAllCollections().contains(collection)) return null;
+    return stateReader.getCollection(collection, true); // return from cache
+  }
+
+  /** expert internal use only
+   * Gets the replica from caches by the core name (assuming the slice is unknown) or null if replica is not found.
+   * If the slice is known, do not use this method.
+   * coreNodeName is the same as replicaName
+   */
+  public Replica getCachedReplica(String collectionName, String coreNodeName) {
+    DocCollection c = getCachedCollection(collectionName);
+    if (c == null) return null;
+    for (Slice slice : c.getSlices()) {
+      Replica replica = slice.getReplica(coreNodeName);
+      if (replica != null) return replica;
+    }
+    return null;
+  }
 
   /**
    * Get the named DocCollection object, or throw an exception if it doesn't exist.
@@ -163,23 +175,26 @@ public class ClusterState implements JSO
     return coll;
   }
 
-
   public DocCollection getCollectionOrNull(String coll) {
-    return collectionStates.get(coll);
+    DocCollection c = collectionStates.get(coll);
+    if (c != null) return c;
+    if (!stateReader.getAllCollections().contains(coll)) return null;
+    return stateReader.getCollection(coll);
   }
 
   /**
    * Get collection names.
    */
   public Set<String> getCollections() {
-    return collectionStates.keySet();
+    return stateReader.getAllCollections();
   }
 
   /**
+   * For internal use only
    * @return Map&lt;collectionName, Map&lt;sliceName,Slice&gt;&gt;
    */
-  public Map<String, DocCollection> getCollectionStates() {
-    return Collections.unmodifiableMap(collectionStates);
+  Map<String, DocCollection> getCollectionStates() {
+    return collectionStates;
   }
 
   /**
@@ -238,7 +253,7 @@ public class ClusterState implements JSO
     Stat stat = new Stat();
     byte[] state = zkClient.getData(ZkStateReader.CLUSTER_STATE,
         null, stat, true);
-    return load(stat.getVersion(), state, liveNodes);
+    return load(stat.getVersion(), state, liveNodes, stateReader, ZkStateReader.CLUSTER_STATE);
   }
   
  
@@ -250,25 +265,34 @@ public class ClusterState implements JSO
    * @param version zk version of the clusterstate.json file (bytes)
    * @param bytes clusterstate.json as a byte array
    * @param liveNodes list of live nodes
+   * @param stateReader The ZkStateReader for this clusterstate
+   * @param znode the znode from which this data is read from
    * @return the ClusterState
    */
-  public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes) {
+  public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes, ZkStateReader stateReader, String znode) {
     // System.out.println("######## ClusterState.load:" + (bytes==null ? null : new String(bytes)));
     if (bytes == null || bytes.length == 0) {
-      return new ClusterState(version, liveNodes, Collections.<String, DocCollection>emptyMap());
+      return new ClusterState(version, liveNodes, Collections.<String, DocCollection>emptyMap(),stateReader);
     }
     Map<String, Object> stateMap = (Map<String, Object>) ZkStateReader.fromJSON(bytes);
     Map<String,DocCollection> collections = new LinkedHashMap<>(stateMap.size());
     for (Entry<String, Object> entry : stateMap.entrySet()) {
       String collectionName = entry.getKey();
-      DocCollection coll = collectionFromObjects(collectionName, (Map<String,Object>)entry.getValue(), version);
+      DocCollection coll = collectionFromObjects(collectionName, (Map<String,Object>)entry.getValue(), version, znode);
       collections.put(collectionName, coll);
     }
 
     // System.out.println("######## ClusterState.load result:" + collections);
-    return new ClusterState( version, liveNodes, collections);
+    return new ClusterState( version, liveNodes, collections,stateReader);
   }
 
+  /**
+   * @deprecated use {@link #load(Integer, byte[], Set, ZkStateReader, String)}
+   */
+  @Deprecated
+  public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes){
+    return load(version == null ? -1: version, bytes, liveNodes,null,null);
+  }
 
   public static Aliases load(byte[] bytes) {
     if (bytes == null || bytes.length == 0) {
@@ -279,7 +303,7 @@ public class ClusterState implements JSO
     return new Aliases(aliasMap);
   }
 
-  private static DocCollection collectionFromObjects(String name, Map<String, Object> objs, Integer version) {
+  private static DocCollection collectionFromObjects(String name, Map<String, Object> objs, Integer version, String znode) {
     Map<String,Object> props;
     Map<String,Slice> slices;
 
@@ -306,7 +330,7 @@ public class ClusterState implements JSO
       router = DocRouter.getDocRouter((String) routerProps.get("name"));
     }
 
-    return new DocCollection(name, slices, props, router, version);
+    return new DocCollection(name, slices, props, router, version,znode);
   }
 
   private static Map<String,Slice> makeSlices(Map<String,Object> genericSlices) {
@@ -334,7 +358,7 @@ public class ClusterState implements JSO
    * 
    * @return null if ClusterState was created for publication, not consumption
    */
-  public Integer getZkClusterStateVersion() {
+  public Integer getZnodeVersion() {
     return zkClusterStateVersion;
   }
 
@@ -364,6 +388,9 @@ public class ClusterState implements JSO
   }
 
 
+  public ZkStateReader getStateReader(){
+    return stateReader;
+  }
 
   /**
    * Internal API used only by ZkStateReader
@@ -372,9 +399,5 @@ public class ClusterState implements JSO
     this.liveNodes = liveNodes;
   }
 
-  public DocCollection getCommonCollection(String name){
-    return collectionStates.get(name);
-
-  }
 
 }

Modified: lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java (original)
+++ lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java Thu Jun 19 16:25:31 2014
@@ -21,7 +21,6 @@ import org.noggit.JSONUtil;
 import org.noggit.JSONWriter;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -33,15 +32,17 @@ import java.util.Map;
 public class DocCollection extends ZkNodeProps {
   public static final String DOC_ROUTER = "router";
   public static final String SHARDS = "shards";
-  private int version;
+  public static final String STATE_FORMAT = "stateFormat";
+  private int znodeVersion;
 
   private final String name;
   private final Map<String, Slice> slices;
   private final Map<String, Slice> activeSlices;
   private final DocRouter router;
+  private final String znode;
 
   public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
-    this(name, slices, props, router, -1);
+    this(name, slices, props, router, -1, ZkStateReader.CLUSTER_STATE);
   }
 
   /**
@@ -49,9 +50,9 @@ public class DocCollection extends ZkNod
    * @param slices The logical shards of the collection.  This is used directly and a copy is not made.
    * @param props  The properties of the slice.  This is used directly and a copy is not made.
    */
-  public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router, int zkVersion) {
+  public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router, int zkVersion, String znode) {
     super( props==null ? props = new HashMap<String,Object>() : props);
-    this.version = zkVersion;
+    this.znodeVersion = zkVersion;
     this.name = name;
 
     this.slices = slices;
@@ -65,10 +66,14 @@ public class DocCollection extends ZkNod
         this.activeSlices.put(slice.getKey(), slice.getValue());
     }
     this.router = router;
-
+    this.znode = znode == null? ZkStateReader.CLUSTER_STATE : znode;
     assert name != null && slices != null;
   }
 
+  public DocCollection copyWith(Map<String, Slice> slices){
+    return new DocCollection(getName(), slices, propMap, router, znodeVersion,znode);
+  }
+
 
   /**
    * Return collection name.
@@ -110,9 +115,16 @@ public class DocCollection extends ZkNod
     return activeSlices;
   }
 
-  public int getVersion(){
-    return version;
+  public int getZNodeVersion(){
+    return znodeVersion;
+  }
 
+  public int getStateFormat(){
+    return ZkStateReader.CLUSTER_STATE.equals(znode) ? 1:2;
+  }
+
+  public String getZNode(){
+    return znode;
   }
 
 
@@ -132,4 +144,12 @@ public class DocCollection extends ZkNod
     all.put(SHARDS, slices);
     jsonWriter.write(all);
   }
+
+  public Replica getReplica(String coreNodeName) {
+    for (Slice slice : slices.values()) {
+      Replica replica = slice.getReplica(coreNodeName);
+      if (replica != null) return replica;
+    }
+    return null;
+  }
 }

Modified: lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java (original)
+++ lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java Thu Jun 19 16:25:31 2014
@@ -68,6 +68,10 @@ public class SolrZkClient {
 
   private ConnectionManager connManager;
 
+  static int instcount = 0;
+
+  public int inst = instcount++;
+
   private volatile SolrZooKeeper keeper;
   
   private ZkCmdExecutor zkCmdExecutor;
@@ -75,6 +79,10 @@ public class SolrZkClient {
   private volatile boolean isClosed = false;
   private ZkClientConnectionStrategy zkClientConnectionStrategy;
   private int zkClientTimeout;
+
+//  {
+//    log.warn("created inst : "+inst, new Exception("debugcreate"));
+//  }
   
   public int getZkClientTimeout() {
     return zkClientTimeout;
@@ -564,6 +572,7 @@ public class SolrZkClient {
   }
 
   public void close() {
+//    log.warn("closed inst :"+inst, new Exception("leakdebug"));
     if (isClosed) return; // it's okay if we over close - same as solrcore
     isClosed = true;
     try {

Modified: lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Thu Jun 19 16:25:31 2014
@@ -98,10 +98,16 @@ public class ZkStateReader {
   public static final String LEADER_ELECT_ZKNODE = "/leader_elect";
 
   public static final String SHARD_LEADERS_ZKNODE = "leaders";
+  private final Set<String> watchedCollections = new HashSet<String>();
 
 
+  /**These are collections which are actively watched by this  instance .
+   *
+   */
+  private Map<String , DocCollection> watchedCollectionStates = new ConcurrentHashMap<String, DocCollection>();
+  private Set<String> allCollections = Collections.emptySet();
+
 
-  
   //
   // convenience methods... should these go somewhere else?
   //
@@ -162,7 +168,8 @@ public class ZkStateReader {
           log.info("path={} {}={} specified config exists in ZooKeeper",
               new Object[] {path, CONFIGNAME_PROP, configName});
         }
-
+      } else  {
+        throw new ZooKeeperException(ErrorCode.INVALID_STATE, "No config data found at path: " + path);
       }
     }
     catch (KeeperException e) {
@@ -251,22 +258,21 @@ public class ZkStateReader {
     return aliases;
   }
 
-  /*public Boolean checkValid(String coll, int version){
+  public Boolean checkValid(String coll, int version){
     DocCollection collection = clusterState.getCollectionOrNull(coll);
     if(collection ==null) return null;
-    if(collection.getVersion() < version){
-      log.info("server older than client {}<{}",collection.getVersion(),version);
-      DocCollection nu = getExternCollectionFresh(this, coll);
-      if(nu.getVersion()> collection.getVersion()){
-        updateExternCollection(nu);
+    if(collection.getZNodeVersion() < version){
+      log.info("server older than client {}<{}",collection.getZNodeVersion(),version);
+      DocCollection nu = getCollectionLive(this, coll);
+      if(nu.getZNodeVersion()> collection.getZNodeVersion()){
+        updateWatchedCollection(nu);
         collection = nu;
       }
     }
-    if(collection.getVersion() == version) return Boolean.TRUE;
-    log.info("wrong version from client {}!={} ",version, collection.getVersion());
+    if(collection.getZNodeVersion() == version) return Boolean.TRUE;
+    log.info("wrong version from client {}!={} ",version, collection.getZNodeVersion());
     return Boolean.FALSE;
-
-  }*/
+  }
   
   public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,
       InterruptedException {
@@ -299,10 +305,11 @@ public class ZkStateReader {
               byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, stat ,
                   true);
               Set<String> ln = ZkStateReader.this.clusterState.getLiveNodes();
-              ClusterState clusterState = ClusterState.load(stat.getVersion(), data, ln);
+              ClusterState clusterState = ClusterState.load(stat.getVersion(), data, ln,ZkStateReader.this, null);
               // update volatile
               ZkStateReader.this.clusterState = clusterState;
 
+              updateCollectionNames();
 //              HashSet<String> all = new HashSet<>(colls);;
 //              all.addAll(clusterState.getAllInternalCollections());
 //              all.remove(null);
@@ -377,6 +384,7 @@ public class ZkStateReader {
       liveNodeSet.addAll(liveNodes);
       ClusterState clusterState = ClusterState.load(zkClient, liveNodeSet, ZkStateReader.this);
       this.clusterState = clusterState;
+      updateCollectionNames();
 
       zkClient.exists(ALIASES,
           new Watcher() {
@@ -422,6 +430,40 @@ public class ZkStateReader {
           }, true);
     }
     updateAliases();
+    //on reconnect of SolrZkClient re-add watchers for the watched external collections
+    synchronized (this) {
+      for (String watchedCollection : watchedCollections) {
+        addZkWatch(watchedCollection);
+      }
+    }
+  }
+
+  public void updateCollectionNames() throws KeeperException, InterruptedException {
+    Set<String> colls = getExternColls();
+    colls.addAll(clusterState.getCollectionStates().keySet());
+    allCollections = Collections.unmodifiableSet(colls);
+  }
+
+  private Set<String> getExternColls() throws KeeperException, InterruptedException {
+    List<String> children = null;
+    try {
+      children = zkClient.getChildren(COLLECTIONS_ZKNODE, null, true);
+    } catch (KeeperException.NoNodeException e) {
+      log.warn("Error fetching collection names");
+
+      return new HashSet<>();
+    }
+    if (children == null || children.isEmpty()) return new HashSet<>();
+    HashSet<String> result = new HashSet<>(children.size());
+
+    for (String c : children) {
+      try {
+        if (zkClient.exists(getCollectionPath(c), true)) result.add(c);
+      } catch (Exception e) {
+        log.warn("Error checking external collections", e);
+      }
+    }
+    return result;
   }
 
 
@@ -449,6 +491,7 @@ public class ZkStateReader {
           clusterState.setLiveNodes(liveNodesSet);
         }
         this.clusterState = clusterState;
+        updateCollectionNames();
       }
 
     } else {
@@ -507,9 +550,13 @@ public class ZkStateReader {
         }
       }, SOLRCLOUD_UPDATE_DELAY, TimeUnit.MILLISECONDS);
     }
-
+    synchronized (this) {
+      for (String watchedCollection : watchedCollections) {
+        watchedCollectionStates.put(watchedCollection, getCollectionLive(this, watchedCollection));
+      }
+    }
   }
-   
+
   /**
    * @return information about the cluster from ZooKeeper
    */
@@ -632,6 +679,9 @@ public class ZkStateReader {
   public SolrZkClient getZkClient() {
     return zkClient;
   }
+  public Set<String> getAllCollections(){
+    return allCollections;
+  }
 
   public void updateAliases() throws KeeperException, InterruptedException {
     byte[] data = zkClient.getData(ALIASES, null, null, true);
@@ -678,4 +728,159 @@ public class ZkStateReader {
     }
   }
 
+  public void updateWatchedCollection(DocCollection c) {
+    if(watchedCollections.contains(c.getName())){
+      watchedCollectionStates.put(c.getName(), c);
+      log.info("Updated DocCollection "+c.getName()+" to: ");
+    }
+  }
+
+  /**
+   * <b>Advance usage</b>
+   * This method can be used to fetch a collection object and control whether it hits
+   * the cache only or if information can be looked up from ZooKeeper.
+   *
+   * @param coll the collection name
+   * @param cachedCopyOnly whether to fetch data from cache only or if hitting Zookeeper is acceptable
+   * @return the {@link org.apache.solr.common.cloud.DocCollection}
+   */
+  public DocCollection getCollection(String coll, boolean cachedCopyOnly) {
+    if(clusterState.getCollectionStates().get(coll) != null) {
+      //this collection resides in clusterstate.json. So it's always up-to-date
+      return clusterState.getCollectionStates().get(coll);
+    }
+    if (watchedCollections.contains(coll) || cachedCopyOnly) {
+      DocCollection c = watchedCollectionStates.get(coll);
+      if (c != null || cachedCopyOnly) return c;
+    }
+    return getCollectionLive(this, coll);
+  }
+  // this is only set by Overseer not to be set by others and only set inside the Overseer node. If Overseer has
+  // unfinished external collections which are yet to be persisted to ZK
+  // this map is populated and this class can use that information
+  public Map ephemeralCollectionData;
+
+  public static DocCollection getCollectionLive(ZkStateReader zkStateReader, String coll) {
+    String collectionPath = getCollectionPath(coll);
+    if(zkStateReader.ephemeralCollectionData !=null ){
+      ClusterState cs = (ClusterState) zkStateReader.ephemeralCollectionData.get(collectionPath);
+      if(cs !=null) {
+        return  cs.getCollectionStates().get(coll);
+      }
+    }
+    try {
+      if (!zkStateReader.getZkClient().exists(collectionPath, true)) return null;
+      Stat stat = new Stat();
+      byte[] data = zkStateReader.getZkClient().getData(collectionPath, null, stat, true);
+      ClusterState state = ClusterState.load(stat.getVersion(), data, Collections.<String>emptySet(), zkStateReader, collectionPath);
+      return state.getCollectionStates().get(coll);
+    } catch (KeeperException.NoNodeException e) {
+      log.warn("No node available : " + collectionPath, e);
+      return null;
+    } catch (KeeperException e) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK:" + coll, e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK:" + coll, e);
+    }
+  }
+
+  public DocCollection getCollection(String coll) {
+    return getCollection(coll, false);
+  }
+
+  public static String getCollectionPath(String coll) {
+    return COLLECTIONS_ZKNODE+"/"+coll + "/state.json";
+  }
+
+  public void addCollectionWatch(String coll) throws KeeperException, InterruptedException {
+    synchronized (this){
+      if(watchedCollections.contains(coll)) return;
+      else {
+        watchedCollections.add(coll);
+      }
+      addZkWatch(coll);
+    }
+
+  }
+
+  private void addZkWatch(final String coll) throws KeeperException, InterruptedException {
+    log.info("addZkWatch {}", coll);
+    final String fullpath = getCollectionPath(coll);
+    synchronized (getUpdateLock()){
+
+      cmdExecutor.ensureExists(fullpath, zkClient);
+      log.info("Updating collection state at {} from ZooKeeper... ",fullpath);
+
+      Watcher watcher = new Watcher() {
+
+        @Override
+        public void process(WatchedEvent event) {
+          // session events are not change events,
+          // and do not remove the watcher
+          if (EventType.None.equals(event.getType())) {
+            return;
+          }
+          log.info("A cluster state change: {}, has occurred - updating... ", (event), ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size());
+          try {
+
+            // delayed approach
+            // ZkStateReader.this.updateClusterState(false, false);
+            synchronized (ZkStateReader.this.getUpdateLock()) {
+              if(!watchedCollections.contains(coll)) {
+                log.info("Unwatched collection {}",coll);
+                return;
+              }
+              // remake watch
+              final Watcher thisWatch = this;
+              Stat stat = new Stat();
+              byte[] data = zkClient.getData(fullpath, thisWatch, stat, true);
+
+              if(data == null || data.length ==0){
+                log.warn("No value set for collection state : {}", coll);
+                return;
+
+              }
+              ClusterState clusterState = ClusterState.load(stat.getVersion(), data, Collections.<String>emptySet(),ZkStateReader.this,fullpath);
+              // update volatile
+
+              DocCollection newState = clusterState.getCollectionStates().get(coll);
+              watchedCollectionStates.put(coll, newState);
+              log.info("Updating data for {} to ver {} ", coll , newState.getZNodeVersion());
+
+            }
+          } catch (KeeperException e) {
+            if (e.code() == KeeperException.Code.SESSIONEXPIRED
+                || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+              log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+              return;
+            }
+            log.error("Unwatched collection :"+coll , e);
+            throw new ZooKeeperException(ErrorCode.SERVER_ERROR,
+                "", e);
+
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.error("Unwatched collection :"+coll , e);
+            return;
+          }
+        }
+
+      };
+      zkClient.exists(fullpath, watcher, true);
+    }
+
+    watchedCollectionStates.put(coll, getCollectionLive(this, coll));
+  }
+
+  /**This is not a public API. Only used by ZkController */
+  public void removeZKWatch(final String coll){
+    synchronized (this){
+      watchedCollections.remove(coll);
+    }
+  }
+
+
+
+
 }

Modified: lucene/dev/branches/solr-5473/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java (original)
+++ lucene/dev/branches/solr-5473/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java Thu Jun 19 16:25:31 2014
@@ -20,7 +20,6 @@ package org.apache.solr.client.solrj.imp
 import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -42,7 +41,6 @@ import org.apache.solr.client.solrj.requ
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
 import org.apache.solr.cloud.AbstractZkTestCase;
-import org.apache.solr.cloud.Overseer;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
@@ -51,7 +49,6 @@ import org.apache.solr.common.cloud.DocC
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
@@ -62,7 +59,6 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -125,6 +121,7 @@ public class CloudSolrServerTest extends
   @Override
   public void doTest() throws Exception {
     allTests();
+    stateVersionParamTest();
   }
 
   private void allTests() throws Exception {
@@ -346,7 +343,77 @@ public class CloudSolrServerTest extends
     SolrInputDocument doc = getDoc(fields);
     indexDoc(doc);
   }
-  
+
+  private void stateVersionParamTest() throws Exception {
+    CloudSolrServer client = createCloudClient(null);
+    try {
+      String collectionName = "checkStateVerCol";
+      createCollection(collectionName, client, 2, 2);
+      waitForRecoveriesToFinish(collectionName, false);
+      DocCollection coll = client.getZkStateReader().getClusterState().getCollection(collectionName);
+      Replica r = coll.getSlices().iterator().next().getReplicas().iterator().next();
+
+      HttpSolrServer httpSolrServer = new HttpSolrServer(r.getStr(ZkStateReader.BASE_URL_PROP) + "/"+collectionName);
+
+
+      SolrQuery q = new SolrQuery().setQuery("*:*");
+
+      log.info("should work query, result {}", httpSolrServer.query(q));
+      //no problem
+      q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+coll.getZNodeVersion());
+      log.info("2nd query , result {}", httpSolrServer.query(q));
+      //no error yet good
+
+      q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+ (coll.getZNodeVersion() -1)); //an older version expect error
+
+      HttpSolrServer.RemoteSolrException sse = null;
+      try {
+        httpSolrServer.query(q);
+        log.info("expected query error");
+      } catch (HttpSolrServer.RemoteSolrException e) {
+        sse = e;
+      }
+      httpSolrServer.shutdown();
+      assertNotNull(sse);
+      assertEquals(" Error code should be ",  sse.code() , SolrException.ErrorCode.INVALID_STATE.code);
+
+      //now send the request to another node that does n ot serve the collection
+
+      Set<String> allNodesOfColl = new HashSet<>();
+      for (Slice slice : coll.getSlices()) {
+        for (Replica replica : slice.getReplicas()) {
+          allNodesOfColl.add(replica.getStr(ZkStateReader.BASE_URL_PROP));
+        }
+      }
+      String theNode = null;
+      for (String s : client.getZkStateReader().getClusterState().getLiveNodes()) {
+        String n = client.getZkStateReader().getBaseUrlForNodeName(s);
+        if(!allNodesOfColl.contains(s)){
+          theNode = n;
+          break;
+        }
+      }
+      log.info("thenode which does not serve this collection{} ",theNode);
+      assertNotNull(theNode);
+      httpSolrServer = new HttpSolrServer(theNode + "/"+collectionName);
+
+      q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+coll.getZNodeVersion());
+
+      try {
+        httpSolrServer.query(q);
+        log.info("error was expected");
+      } catch (HttpSolrServer.RemoteSolrException e) {
+        sse = e;
+      }
+      httpSolrServer.shutdown();
+      assertNotNull(sse);
+      assertEquals(" Error code should be ",  sse.code() , SolrException.ErrorCode.INVALID_STATE.code);
+    } finally {
+      client.shutdown();
+    }
+
+  }
+
   public void testShutdown() throws MalformedURLException {
     CloudSolrServer server = new CloudSolrServer("[ff01::114]:33332");
     try {

Modified: lucene/dev/branches/solr-5473/solr/test-framework/src/java/org/apache/solr/analysis/MockTokenFilterFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/test-framework/src/java/org/apache/solr/analysis/MockTokenFilterFactory.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/test-framework/src/java/org/apache/solr/analysis/MockTokenFilterFactory.java (original)
+++ lucene/dev/branches/solr-5473/solr/test-framework/src/java/org/apache/solr/analysis/MockTokenFilterFactory.java Thu Jun 19 16:25:31 2014
@@ -62,4 +62,4 @@ public class MockTokenFilterFactory exte
   public MockTokenFilter create(TokenStream stream) {
     return new MockTokenFilter(stream, filter);
   }
-}
\ No newline at end of file
+}

Modified: lucene/dev/branches/solr-5473/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java (original)
+++ lucene/dev/branches/solr-5473/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java Thu Jun 19 16:25:31 2014
@@ -338,6 +338,19 @@ public abstract class AbstractFullDistri
     return createJettys(numJettys, false);
   }
 
+  protected int defaultStateFormat = 1 + random().nextInt(2);
+
+  protected int getStateFormat()  {
+    String stateFormat = System.getProperty("tests.solr.stateFormat", null);
+    if (stateFormat != null)  {
+      if ("2".equals(stateFormat)) {
+        return defaultStateFormat = 2;
+      } else if ("1".equals(stateFormat))  {
+        return defaultStateFormat = 1;
+      }
+    }
+    return defaultStateFormat; // random
+  }
 
   /**
    * @param checkCreatedVsState
@@ -350,6 +363,17 @@ public abstract class AbstractFullDistri
     List<SolrServer> clients = new ArrayList<>();
     StringBuilder sb = new StringBuilder();
 
+    if(getStateFormat() == 2) {
+      log.info("Creating collection1 with stateFormat=2");
+      SolrZkClient zkClient = new SolrZkClient(zkServer.getZkAddress(), AbstractZkTestCase.TIMEOUT, AbstractZkTestCase.TIMEOUT);
+      Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(ZkNodeProps.makeMap(
+          Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.CREATECOLLECTION,
+          "name", DEFAULT_COLLECTION,
+          "numShards", String.valueOf(sliceCount),
+          DocCollection.STATE_FORMAT, getStateFormat())));
+      zkClient.close();
+    }
+
     for (int i = 1; i <= numJettys; i++) {
       if (sb.length() > 0) sb.append(',');
       int cnt = this.jettyIntCntr.incrementAndGet();
@@ -1489,6 +1513,10 @@ public abstract class AbstractFullDistri
       collectionInfos.put(collectionName, list);
     }
     params.set("name", collectionName);
+    if (getStateFormat() == 2) {
+      log.info("Creating collection with stateFormat=2: " + collectionName);
+      params.set(DocCollection.STATE_FORMAT, "2");
+    }
     SolrRequest request = new QueryRequest(params);
     request.setPath("/admin/collections");