You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2015/08/07 16:07:21 UTC

svn commit: r1694692 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/cloud/overseer/ core/src/java/org/apache/solr/handler/admin/ core/src/test/org/apache/solr/cloud/ core/src/test/org/apache/solr/clou...

Author: shalin
Date: Fri Aug  7 14:07:21 2015
New Revision: 1694692

URL: http://svn.apache.org/r1694692
Log:
SOLR-5756: A utility Collection API to move a collection from stateFormat=1 to stateFormat=2
SOLR-7870: Write a test which asserts that requests to stateFormat=2 collection succeed on a node even after all local replicas of that collection have been removed

Added:
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java   (with props)
Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1694692&r1=1694691&r2=1694692&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Fri Aug  7 14:07:21 2015
@@ -93,6 +93,13 @@ Detailed Change List
 New Features
 ----------------------
 
+* SOLR-5756: A utility Collection API to move a collection from shared clusterstate.json (stateFormat=1,
+  default until 4.x) to the per-collection state.json stored in ZooKeeper (stateFormat=2,
+  default since 5.0) seamlessly without any application down-time.
+  Example:
+  http://localhost:8983/solr/admin/collections?action=MIGRATESTATEFORMAT&collection=<collection_name>
+  (Noble Paul, Scott Blum, shalin)
+
 Bug Fixes
 ----------------------
 
@@ -104,6 +111,10 @@ Other Changes
 
 * SOLR-7831: Start Scripts: Allow a configurable stack size [-Xss] (Steve Davids via Mark Miller)
 
+* SOLR-7870: Write a test which asserts that requests to stateFormat=2 collection succeed on a node
+  even after all local replicas of that collection have been removed.
+  (Scott Blum via shalin)
+
 ==================  5.3.0 ==================
 
 Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1694692&r1=1694691&r2=1694692&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java Fri Aug  7 14:07:21 2015
@@ -359,6 +359,8 @@ public class Overseer implements Closeab
           case MODIFYCOLLECTION:
             CollectionsHandler.verifyRuleParams(zkController.getCoreContainer() ,message.getProperties());
             return new CollectionMutator(reader).modifyCollection(clusterState,message);
+          case MIGRATESTATEFORMAT:
+            return new ClusterStateMutator(reader).migrateStateFormat(clusterState, message);
           default:
             throw new RuntimeException("unknown operation:" + operation
                 + " contents:" + message.getProperties());

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java?rev=1694692&r1=1694691&r2=1694692&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java Fri Aug  7 14:07:21 2015
@@ -90,7 +90,6 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.cloud.Assign.getNodesForNewReplicas;
 import static org.apache.solr.common.cloud.DocCollection.SNITCH;
-import static org.apache.solr.common.util.Utils.makeMap;
 import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
@@ -113,10 +112,12 @@ import static org.apache.solr.common.par
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATESTATEFORMAT;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
 import static org.apache.solr.common.params.CommonParams.NAME;
 import static org.apache.solr.common.util.StrUtils.formatString;
+import static org.apache.solr.common.util.Utils.makeMap;
 
 
 public class OverseerCollectionMessageHandler implements OverseerMessageHandler {
@@ -273,6 +274,9 @@ public class OverseerCollectionMessageHa
         case MODIFYCOLLECTION:
           overseer.getInQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
           break;
+        case MIGRATESTATEFORMAT:
+          migrateStateFormat(message, results);
+          break;
         default:
           throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
               + operation);
@@ -807,6 +811,36 @@ public class OverseerCollectionMessageHa
     }
   }
 
+  private void migrateStateFormat(ZkNodeProps message, NamedList results)
+      throws KeeperException, InterruptedException {
+    final String collectionName = message.getStr(COLLECTION_PROP);
+
+    // wait for a while until the state format changes
+    long now = System.nanoTime();
+    long timeout = now + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
+    boolean firstLoop = true;
+    while (System.nanoTime() < timeout) {
+      DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
+      if (collection == null) {
+        throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + collectionName + " not found");
+      }
+      if (collection.getStateFormat() == 2) {
+        // Done.
+        results.add("success", new SimpleOrderedMap<>());
+        return;
+      }
+
+      if (firstLoop) {
+        // Actually queue the migration command.
+        firstLoop = false;
+        ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, MIGRATESTATEFORMAT.toLower(), COLLECTION_PROP, collectionName);
+        Overseer.getInQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
+      }
+      Thread.sleep(100);
+    }
+    throw new SolrException(ErrorCode.SERVER_ERROR, "Could not migrate state format for collection: " + collectionName);
+  }
+
   private void createAlias(Aliases aliases, ZkNodeProps message) {
     String aliasName = message.getStr(NAME);
     String collections = message.getStr("collections");

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1694692&r1=1694691&r2=1694692&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Fri Aug  7 14:07:21 2015
@@ -1520,8 +1520,8 @@ public final class ZkController {
 
       publish(cd, Replica.State.DOWN, false, true);
       DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(cd.getCloudDescriptor().getCollectionName());
-      if (collection != null && collection.getStateFormat() > 1) {
-        log.info("Registering watch for external collection {}", cd.getCloudDescriptor().getCollectionName());
+      if (collection != null) {
+        log.info("Registering watch for collection {}", cd.getCloudDescriptor().getCollectionName());
         zkStateReader.addCollectionWatch(cd.getCloudDescriptor().getCollectionName());
       }
     } catch (KeeperException e) {

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java?rev=1694692&r1=1694691&r2=1694692&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java Fri Aug  7 14:07:21 2015
@@ -184,5 +184,16 @@ public class ClusterStateMutator {
     }
     return null;
   }
+
+  public ZkWriteCommand migrateStateFormat(ClusterState clusterState, ZkNodeProps message) {
+    final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
+    if (!CollectionMutator.checkKeyExistence(message, ZkStateReader.COLLECTION_PROP)) return ZkStateWriter.NO_OP;
+    DocCollection coll = clusterState.getCollectionOrNull(collection);
+    if (coll == null || coll.getStateFormat() == 2) return ZkStateWriter.NO_OP;
+
+    return new ZkWriteCommand(coll.getName(),
+        new DocCollection(coll.getName(), coll.getSlicesMap(), coll.getProperties(), coll.getRouter(), 0,
+            ZkStateReader.getCollectionPath(collection)));
+  }
 }
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1694692&r1=1694691&r2=1694692&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Fri Aug  7 14:07:21 2015
@@ -700,6 +700,13 @@ public class CollectionsHandler extends
         verifyRuleParams(h.coreContainer, m);
         return m;
       }
+    },
+    MIGRATESTATEFORMAT_OP(MIGRATESTATEFORMAT) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler)
+          throws Exception {
+        return req.getParams().required().getAll(null, COLLECTION_PROP);
+      }
     };
     CollectionAction action;
     long timeOut;

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java?rev=1694692&r1=1694691&r2=1694692&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java Fri Aug  7 14:07:21 2015
@@ -18,12 +18,21 @@ package org.apache.solr.cloud;
  */
 
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
 import com.google.common.collect.Lists;
+import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.ClusterState;
@@ -36,13 +45,6 @@ import org.apache.solr.common.util.Named
 import org.apache.zookeeper.KeeperException;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
 import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARD_UNIQUE;
 
 public class TestCollectionAPI extends ReplicaPropertiesBase {
@@ -77,6 +79,7 @@ public class TestCollectionAPI extends R
     clusterStatusRolesTest();
     replicaPropTest();
     clusterStatusZNodeVersion();
+    testClusterStateMigration();
   }
 
   private void clusterStatusWithCollectionAndShard() throws IOException, SolrServerException {
@@ -591,6 +594,41 @@ public class TestCollectionAPI extends R
     }
   }
 
+  private void testClusterStateMigration() throws Exception {
+    try (CloudSolrClient client = createCloudClient(null)) {
+      client.connect();
+
+      new CollectionAdminRequest.Create()
+          .setCollectionName("testClusterStateMigration")
+          .setNumShards(1)
+          .setReplicationFactor(1)
+          .setConfigName("conf1")
+          .setStateFormat(1)
+          .process(client);
+
+      waitForRecoveriesToFinish("testClusterStateMigration", true);
+
+      assertEquals(1, client.getZkStateReader().getClusterState().getCollection("testClusterStateMigration").getStateFormat());
+
+      for (int i = 0; i < 10; i++) {
+        SolrInputDocument doc = new SolrInputDocument();
+        doc.addField("id", "id_" + i);
+        client.add("testClusterStateMigration", doc);
+      }
+      client.commit("testClusterStateMigration");
+
+      new CollectionAdminRequest.MigrateClusterState()
+          .setCollectionName("testClusterStateMigration")
+          .process(client);
+
+      client.getZkStateReader().updateClusterState();
+
+      assertEquals(2, client.getZkStateReader().getClusterState().getCollection("testClusterStateMigration").getStateFormat());
+
+      QueryResponse response = client.query("testClusterStateMigration", new SolrQuery("*:*"));
+      assertEquals(10, response.getResults().getNumFound());
+    }
+  }
 
   // Expects the map will have keys, but blank values.
   private Map<String, String> getProps(CloudSolrClient client, String collectionName, String replicaName, String... props)

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java?rev=1694692&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java Fri Aug  7 14:07:21 2015
@@ -0,0 +1,170 @@
+package org.apache.solr.cloud.overseer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.lucene.util.IOUtils;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.cloud.AbstractZkTestCase;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.OverseerTest;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkTestServer;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.KeeperException;
+
+public class ZkStateReaderTest extends SolrTestCaseJ4 {
+
+  /** Uses explicit refresh to ensure latest changes are visible. */
+  public void testStateFormatUpdateWithExplicitRefresh() throws Exception {
+    testStateFormatUpdate(true);
+  }
+
+  /** ZkStateReader should automatically pick up changes based on ZK watches. */
+  public void testStateFormatUpdateWithTimeDelay() throws Exception {
+    testStateFormatUpdate(false);
+  }
+
+  public void testStateFormatUpdate(boolean explicitRefresh) throws Exception {
+    String zkDir = createTempDir("testStateFormatUpdate").toFile().getAbsolutePath();
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+
+    try {
+      server.run();
+      AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
+      AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      ZkStateReader reader = new ZkStateReader(zkClient);
+      reader.createClusterStateWatchersAndUpdate();
+      int trackedStateVersion = reader.getClusterState().getZkClusterStateVersion();
+
+      ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
+
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+      {
+        // create new collection with stateFormat = 1
+        DocCollection stateV1 = new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE);
+        ZkWriteCommand c1 = new ZkWriteCommand("c1", stateV1);
+        writer.enqueueUpdate(reader.getClusterState(), c1, null);
+        writer.writePendingUpdates();
+
+        Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
+        assertNotNull(map.get("c1"));
+        boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true);
+        assertFalse(exists);
+
+        trackedStateVersion = refresh(reader, trackedStateVersion, explicitRefresh);
+
+        DocCollection collection = reader.getClusterState().getCollection("c1");
+        assertEquals(1, collection.getStateFormat());
+      }
+
+
+      {
+        // Now update the collection to stateFormat = 2
+        DocCollection stateV2 = new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json");
+        ZkWriteCommand c2 = new ZkWriteCommand("c1", stateV2);
+        writer.enqueueUpdate(reader.getClusterState(), c2, null);
+        writer.writePendingUpdates();
+
+        Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
+        assertNull(map.get("c1"));
+        boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true);
+        assertTrue(exists);
+
+        trackedStateVersion = refresh(reader, trackedStateVersion, explicitRefresh);
+
+        DocCollection collection = reader.getClusterState().getCollection("c1");
+        assertEquals(2, collection.getStateFormat());
+      }
+    } finally {
+      IOUtils.close(zkClient);
+      server.shutdown();
+
+    }
+  }
+
+  public void testExternalCollectionWatchedNotWatched() throws Exception{
+    String zkDir = createTempDir("testExternalCollectionWatchedNotWatched").toFile().getAbsolutePath();
+    ZkTestServer server = new ZkTestServer(zkDir);
+    SolrZkClient zkClient = null;
+
+    try {
+      server.run();
+      AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
+      AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      ZkStateReader reader = new ZkStateReader(zkClient);
+      reader.createClusterStateWatchersAndUpdate();
+
+      ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
+
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+      // create new collection with stateFormat = 2
+      ZkWriteCommand c1 = new ZkWriteCommand("c1",
+          new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"));
+      writer.enqueueUpdate(reader.getClusterState(), c1, null);
+      writer.writePendingUpdates();
+      refresh(reader, 0, true);
+
+      assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
+      reader.addCollectionWatch("c1");
+      assertFalse(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
+      reader.removeZKWatch("c1");
+      assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
+
+    } finally {
+      IOUtils.close(zkClient);
+      server.shutdown();
+    }
+  }
+
+  private static int refresh(ZkStateReader reader, int trackedStateVersion, boolean explicitRefresh) throws KeeperException, InterruptedException {
+    if (explicitRefresh) {
+      reader.updateClusterState();
+      return reader.getClusterState().getZkClusterStateVersion();
+    }
+    for (int i = 0; i < 100; ++i) {
+      // Loop until we observe the change.
+      int newStateVersion = reader.getClusterState().getZkClusterStateVersion();
+      if (newStateVersion > trackedStateVersion) {
+        return newStateVersion;
+      }
+      Thread.sleep(100);
+    }
+    throw new AssertionError("Did not observe expected update");
+  }
+}

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java?rev=1694692&r1=1694691&r2=1694692&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java Fri Aug  7 14:07:21 2015
@@ -120,6 +120,47 @@ public class ZkStateWriterTest extends S
     }
   }
 
+  public void testSingleLegacyCollection() throws Exception {
+    String zkDir = createTempDir("testSingleLegacyCollection").toFile().getAbsolutePath();
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+
+    try {
+      server.run();
+      AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
+      AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      ZkStateReader reader = new ZkStateReader(zkClient);
+      reader.createClusterStateWatchersAndUpdate();
+
+      ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
+
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+      // create new collection with stateFormat = 1
+      ZkWriteCommand c1 = new ZkWriteCommand("c1",
+          new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
+
+      ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
+      writer.writePendingUpdates();
+
+      Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
+      assertNotNull(map.get("c1"));
+      boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true);
+      assertFalse(exists);
+
+    } finally {
+      IOUtils.close(zkClient);
+      server.shutdown();
+
+    }
+  }
+
   public void testSingleExternalCollection() throws Exception{
     String zkDir = createTempDir("testSingleExternalCollection").toFile().getAbsolutePath();
 

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java?rev=1694692&r1=1694691&r2=1694692&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java Fri Aug  7 14:07:21 2015
@@ -1033,6 +1033,29 @@ public abstract class CollectionAdminReq
       return this;
     }
   }
+
+  // DELETEREPLICAPROP request
+  public static class MigrateClusterState extends CollectionShardAdminRequest<MigrateClusterState> {
+
+    public MigrateClusterState() {
+      this.action = CollectionAction.MIGRATESTATEFORMAT;
+    }
+
+    @Override
+    public MigrateClusterState setShardName(String shard) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String getShardName() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected MigrateClusterState getThis() {
+      return this;
+    }
+  }
   
   // BALANCESHARDUNIQUE request
   public static class BalanceShardUnique extends CollectionAdminRequest<BalanceShardUnique> {

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java?rev=1694692&r1=1694691&r2=1694692&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java Fri Aug  7 14:07:21 2015
@@ -335,7 +335,9 @@ public class ClusterState implements JSO
    * The version of clusterstate.json in ZooKeeper.
    * 
    * @return null if ClusterState was created for publication, not consumption
+   * @deprecated true cluster state spans many ZK nodes, stop depending on the version number of the shared node!
    */
+  @Deprecated
   public Integer getZkClusterStateVersion() {
     return znodeVersion;
   }

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1694692&r1=1694691&r2=1694692&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Fri Aug  7 14:07:21 2015
@@ -22,6 +22,7 @@ import java.io.UnsupportedEncodingExcept
 import java.net.URLDecoder;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -29,8 +30,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
@@ -50,6 +49,8 @@ import org.slf4j.LoggerFactory;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.EMPTY_MAP;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
 import static java.util.Collections.unmodifiableSet;
 import static org.apache.solr.common.util.Utils.fromJSON;
 
@@ -95,7 +96,8 @@ public class ZkStateReader implements Cl
   public static final String LEGACY_CLOUD = "legacyCloud";
 
   public static final String URL_SCHEME = "urlScheme";
-  
+
+  /** A view of the current state of all collections; combines all the different state sources into a single view. */
   protected volatile ClusterState clusterState;
 
   private static final int GET_LEADER_RETRY_INTERVAL_MS = 50;
@@ -106,12 +108,21 @@ public class ZkStateReader implements Cl
   public static final String SHARD_LEADERS_ZKNODE = "leaders";
   public static final String ELECTION_NODE = "election";
 
-  private final Set<String> watchedCollections = new HashSet<String>();
+  /** Collections we actively care about, and will try to keep watch on. */
+  private final Set<String> interestingCollections = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
 
-  /**These are collections which are actively watched by this  instance .
-   *
-   */
-  private Map<String , DocCollection> watchedCollectionStates = new ConcurrentHashMap<String, DocCollection>();
+  /** Collections tracked in the legacy (shared) state format, reflects the contents of clusterstate.json. */
+  private Map<String, ClusterState.CollectionRef> legacyCollectionStates = emptyMap();
+  /** Last seen ZK version of clusterstate.json. */
+  private int legacyClusterStateVersion = 0;
+
+  /** Collections with format2 state.json, "interesting" and actively watched. */
+  private final ConcurrentHashMap<String, DocCollection> watchedCollectionStates = new ConcurrentHashMap<String, DocCollection>();
+
+  /** Collections with format2 state.json, not "interesting" and not actively watched. */
+  private volatile Map<String, ClusterState.CollectionRef> lazyCollectionStates = new HashMap<>();
+
+  private volatile Set<String> liveNodes = emptySet();
 
   private final ZkConfigManager configManager;
 
@@ -185,8 +196,6 @@ public class ZkStateReader implements Cl
   
   private final boolean closeClient;
 
-  private final ZkCmdExecutor cmdExecutor;
-
   private volatile Aliases aliases = new Aliases();
 
   private volatile boolean closed = false;
@@ -197,7 +206,6 @@ public class ZkStateReader implements Cl
 
   public ZkStateReader(SolrZkClient zkClient, Runnable securityNodeListener) {
     this.zkClient = zkClient;
-    this.cmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
     this.configManager = new ZkConfigManager(zkClient);
     this.closeClient = false;
     this.securityNodeListener = securityNodeListener;
@@ -225,7 +233,6 @@ public class ZkStateReader implements Cl
             }
           }
         });
-    this.cmdExecutor = new ZkCmdExecutor(zkClientTimeout);
     this.configManager = new ZkConfigManager(zkClient);
     this.closeClient = true;
     this.securityNodeListener = null;
@@ -234,15 +241,32 @@ public class ZkStateReader implements Cl
   public ZkConfigManager getConfigManager() {
     return configManager;
   }
-  
-  // load and publish a new CollectionInfo
+
+  /**
+   * Forcibly refresh cluster state from ZK. Do this only to avoid race conditions because it's expensive.
+   */
   public void updateClusterState() throws KeeperException, InterruptedException {
-    updateClusterState(false);
+    synchronized (getUpdateLock()) {
+      if (clusterState == null) {
+        // Never initialized, just run normal initialization.
+        createClusterStateWatchersAndUpdate();
+        return;
+      }
+      // No need to set watchers because we should already have watchers registered for everything.
+      refreshLegacyClusterState(null);
+      for (String coll : watchedCollectionStates.keySet()) {
+        DocCollection newState = fetchCollectionState(coll, null);
+        updateWatchedCollection(coll, newState);
+      }
+      refreshLazyFormat2Collections(true);
+      refreshLiveNodes(null);
+      constructState();
+    }
   }
-  
-  // load and publish a new CollectionInfo
+
+  /** Refresh the set of live nodes. */
   public void updateLiveNodes() throws KeeperException, InterruptedException {
-    updateClusterState(true);
+    refreshLiveNodes(null);
   }
   
   public Aliases getAliases() {
@@ -257,7 +281,7 @@ public class ZkStateReader implements Cl
       DocCollection nu = getCollectionLive(this, coll);
       if (nu == null) return -1 ;
       if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
-        updateWatchedCollection(nu);
+        updateWatchedCollection(coll, nu);
         collection = nu;
       }
     }
@@ -274,105 +298,23 @@ public class ZkStateReader implements Cl
   public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,
       InterruptedException {
     // We need to fetch the current cluster state and the set of live nodes
-    
-    synchronized (getUpdateLock()) {
 
-      log.info("Updating cluster state from ZooKeeper... ");
-      
-      Stat stat = zkClient.exists(CLUSTER_STATE, new Watcher() {
-        
-        @Override
-        public void process(WatchedEvent event) {
-          // session events are not change events,
-          // and do not remove the watcher
-          if (EventType.None.equals(event.getType())) {
-            return;
-          }
-          log.info("A cluster state change: {}, has occurred - updating... (live nodes size: {})", (event) , ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size());
-          try {
-            
-            // delayed approach
-            // ZkStateReader.this.updateClusterState(false, false);
-            synchronized (ZkStateReader.this.getUpdateLock()) {
-              // remake watch
-              final Watcher thisWatch = this;
-              Set<String> ln = ZkStateReader.this.clusterState.getLiveNodes();
-              // update volatile
-              ZkStateReader.this.clusterState = constructState(ln, thisWatch);
-            }
-            log.info("Updated cluster state version to " + ZkStateReader.this.clusterState.getZkClusterStateVersion());
-          } catch (KeeperException e) {
-            if (e.code() == KeeperException.Code.SESSIONEXPIRED
-                || e.code() == KeeperException.Code.CONNECTIONLOSS) {
-              log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
-              return;
-            }
-            log.error("", e);
-            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-                "", e);
-          } catch (InterruptedException e) {
-            // Restore the interrupted status
-            Thread.currentThread().interrupt();
-            log.warn("", e);
-            return;
-          }
-        }
-        
-      }, true);
+    log.info("Updating cluster state from ZooKeeper... ");
 
-      if (stat == null)
-        throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
-            "Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
+    // Sanity check ZK structure.
+    if (!zkClient.exists(CLUSTER_STATE, true)) {
+      throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
+              "Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
     }
-   
-    
-    synchronized (ZkStateReader.this.getUpdateLock()) {
-      List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE,
-          new Watcher() {
-            
-            @Override
-            public void process(WatchedEvent event) {
-              // session events are not change events,
-              // and do not remove the watcher
-              if (EventType.None.equals(event.getType())) {
-                return;
-              }
-              try {
-                // delayed approach
-                // ZkStateReader.this.updateClusterState(false, true);
-                synchronized (ZkStateReader.this.getUpdateLock()) {
-                  List<String> liveNodes = zkClient.getChildren(
-                      LIVE_NODES_ZKNODE, this, true);
-                  log.debug("Updating live nodes... ({})", liveNodes.size());
-                  Set<String> liveNodesSet = new HashSet<>();
-                  liveNodesSet.addAll(liveNodes);
 
-                  ClusterState clusterState =  ZkStateReader.this.clusterState;
+    // on reconnect of SolrZkClient force refresh and re-add watches.
+    refreshLegacyClusterState(new LegacyClusterStateWatcher());
+    refreshStateFormat2Collections();
+    refreshLazyFormat2Collections(true);
+    refreshLiveNodes(new LiveNodeWatcher());
 
-                  clusterState.setLiveNodes(liveNodesSet);
-                }
-              } catch (KeeperException e) {
-                if (e.code() == KeeperException.Code.SESSIONEXPIRED
-                    || e.code() == KeeperException.Code.CONNECTIONLOSS) {
-                  log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
-                  return;
-                }
-                log.error("", e);
-                throw new ZooKeeperException(
-                    SolrException.ErrorCode.SERVER_ERROR, "", e);
-              } catch (InterruptedException e) {
-                // Restore the interrupted status
-                Thread.currentThread().interrupt();
-                log.warn("", e);
-                return;
-              }
-            }
-            
-          }, true);
-    
-      Set<String> liveNodeSet = new HashSet<>();
-      liveNodeSet.addAll(liveNodes);
-      this.clusterState = constructState(liveNodeSet, null);
+    synchronized (ZkStateReader.this.getUpdateLock()) {
+      constructState();
 
       zkClient.exists(ALIASES,
           new Watcher() {
@@ -418,22 +360,16 @@ public class ZkStateReader implements Cl
           }, true);
     }
     updateAliases();
-    //on reconnect of SolrZkClient re-add watchers for the watched external collections
-    synchronized (this) {
-      for (String watchedCollection : watchedCollections) {
-        addZkWatch(watchedCollection);
-      }
-    }
+
     if (securityNodeListener != null) {
-      addSecuritynodeWatcher(SOLR_SECURITY_CONF_PATH,new Callable<Pair<byte[], Stat>>(){
+      addSecuritynodeWatcher(SOLR_SECURITY_CONF_PATH, new Callable<Pair<byte[], Stat>>() {
         @Override
         public void call(Pair<byte[], Stat> pair) {
           ConfigData cd = new ConfigData();
-          cd.data = pair.getKey() == null || pair.getKey() .length == 0 ? EMPTY_MAP : Utils.getDeepCopy((Map) fromJSON(pair.getKey()), 4, false);
+          cd.data = pair.getKey() == null || pair.getKey().length == 0 ? EMPTY_MAP : Utils.getDeepCopy((Map) fromJSON(pair.getKey()), 4, false);
           cd.version = pair.getValue() == null ? -1 : pair.getValue().getVersion();
           securityData = cd;
           securityNodeListener.run();
-
         }
       });
     }
@@ -486,89 +422,160 @@ public class ZkStateReader implements Cl
 
         }, true);
   }
-  private ClusterState constructState(Set<String> ln, Watcher watcher) throws KeeperException, InterruptedException {
-    Stat stat = new Stat();
-    byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat, true);
-    ClusterState loadedData = ClusterState.load(stat.getVersion(), data, ln, CLUSTER_STATE);
-
-    // first load all collections in /clusterstate.json (i.e. stateFormat=1)
-    Map<String, ClusterState.CollectionRef> result = new LinkedHashMap<>(loadedData.getCollectionStates());
-
-    for (String s : getStateFormat2CollectionNames()) {
-      synchronized (this) {
-        if (watchedCollections.contains(s)) {
-          DocCollection live = getCollectionLive(this, s);
-          if (live != null) {
-            updateWatchedCollection(live);
-            // if it is a watched collection, add too
-            result.put(s, new ClusterState.CollectionRef(live));
-          }
-        } else {
-          // if it is not collection, then just create a reference which can fetch
-          // the collection object just in time from ZK
-          // this is also cheap (lazy loaded) so we put it inside the synchronized
-          // block although it is not required
-          final String collName = s;
-          result.put(s, new ClusterState.CollectionRef(null) {
-            @Override
-            public DocCollection get() {
-              return getCollectionLive(ZkStateReader.this, collName);
-            }
 
-            @Override
-            public boolean isLazilyLoaded() {
-              return true;
-            }
-          });
-        }
+  /**
+   * Construct the total state view from all sources.
+   * Must hold {@link #getUpdateLock()} before calling this.
+   */
+  private void constructState() {
+    // Legacy clusterstate is authoritative, for backwards compatibility.
+    // To move a collection's state to format2, first create the new state2 format node, then remove legacy entry.
+    Map<String, ClusterState.CollectionRef> result = new LinkedHashMap<>(legacyCollectionStates);
+
+    // Are there any interesting collections that disappeared from the legacy cluster state?
+    for (String coll : interestingCollections) {
+      if (!result.containsKey(coll) && !watchedCollectionStates.containsKey(coll)) {
+        new StateWatcher(coll).refreshAndWatch(true);
+      }
+    }
+  
+    // Add state format2 collections, but don't override legacy collection states.
+    for (Map.Entry<String, DocCollection> entry : watchedCollectionStates.entrySet()) {
+      result.putIfAbsent(entry.getKey(), new ClusterState.CollectionRef(entry.getValue()));
+    }
+
+    // Finally, add any lazy collections that aren't already accounted for.
+    for (Map.Entry<String, ClusterState.CollectionRef> entry : lazyCollectionStates.entrySet()) {
+      result.putIfAbsent(entry.getKey(), entry.getValue());
+    }
+
+    this.clusterState = new ClusterState(liveNodes, result, legacyClusterStateVersion);
+  }
+
+  /**
+   * Refresh legacy (shared) clusterstate.json
+   */
+  private void refreshLegacyClusterState(Watcher watcher)
+      throws KeeperException, InterruptedException {
+    try {
+      Stat stat = new Stat();
+      byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat, true);
+      ClusterState loadedData = ClusterState.load(stat.getVersion(), data, emptySet(), CLUSTER_STATE);
+      synchronized (getUpdateLock()) {
+        this.legacyCollectionStates = loadedData.getCollectionStates();
+        this.legacyClusterStateVersion = stat.getVersion();
+      }
+    } catch (KeeperException.NoNodeException e) {
+      // Ignore missing legacy clusterstate.json.
+      synchronized (getUpdateLock()) {
+        this.legacyCollectionStates = emptyMap();
+        this.legacyClusterStateVersion = 0;
       }
     }
-    return new ClusterState(ln, result, stat.getVersion());
   }
 
+  /**
+   * Refresh state format2 collections.
+   */
+  private void refreshStateFormat2Collections() {
+    // It's okay if no format2 state.json exists, if one did not previous exist.
+    for (String coll : interestingCollections) {
+      new StateWatcher(coll).refreshAndWatch(watchedCollectionStates.containsKey(coll));
+    }
+  }
 
-  private Set<String> getStateFormat2CollectionNames() throws KeeperException, InterruptedException {
+  /**
+   * Search for any lazy-loadable state format2 collections.
+   */
+  private void refreshLazyFormat2Collections(boolean fullRefresh) throws KeeperException, InterruptedException {
     List<String> children = null;
     try {
       children = zkClient.getChildren(COLLECTIONS_ZKNODE, null, true);
     } catch (KeeperException.NoNodeException e) {
       log.warn("Error fetching collection names");
-      
-      return new HashSet<>();
+      // fall through
+    }
+    if (children == null || children.isEmpty()) {
+      synchronized (getUpdateLock()) {
+        this.lazyCollectionStates = new HashMap<>();
+      }
+      return;
     }
-    if (children == null || children.isEmpty()) return new HashSet<>();
-    HashSet<String> result = new HashSet<>(children.size(), 1.0f);
 
-    for (String c : children) {
-      try {
-        // this exists call is necessary because we only want to return
-        // those collections which have their own state.json.
-        // The getCollectionPath() calls returns the complete path to the
-        // collection's state.json
-        if (zkClient.exists(getCollectionPath(c), true)) {
-          result.add(c);
+    Map<String, ClusterState.CollectionRef> result = new HashMap<>();
+    for (String collName : children) {
+      if (interestingCollections.contains(collName)) {
+        // We will create an eager collection for any interesting collections.
+        continue;
+      }
+
+      if (!fullRefresh) {
+        // Try to use an already-created lazy collection if it's not a full refresh.
+        ClusterState.CollectionRef existing = lazyCollectionStates.get(collName);
+        if (existing != null) {
+          result.put(collName, existing);
+          continue;
         }
-      } catch (Exception e) {
-        log.warn("Error reading collections nodes", e);
+      }
+
+      ClusterState.CollectionRef lazyCollectionState = tryMakeLazyCollectionStateFormat2(collName);
+      if (lazyCollectionState != null) {
+        result.put(collName, lazyCollectionState);
       }
     }
-    return result;
-  }
 
-  // load and publish a new CollectionInfo
-  private void updateClusterState(boolean onlyLiveNodes) throws KeeperException, InterruptedException {
-    // build immutable CloudInfo
     synchronized (getUpdateLock()) {
-      List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE, null, true);
-      Set<String> liveNodesSet = new HashSet<>(liveNodes);
+      this.lazyCollectionStates = result;
+    }
+  }
 
-      if (!onlyLiveNodes) {
-        log.debug("Updating cloud state from ZooKeeper... ");
-        clusterState = constructState(liveNodesSet, null);
-      } else {
-        log.debug("Updating live nodes from ZooKeeper... ({})", liveNodesSet.size());
-        clusterState = this.clusterState;
-        clusterState.setLiveNodes(liveNodesSet);
+  private ClusterState.CollectionRef tryMakeLazyCollectionStateFormat2(final String collName) {
+    boolean exists = false;
+    try {
+      exists = zkClient.exists(getCollectionPath(collName), true);
+    } catch (Exception e) {
+      log.warn("Error reading collections nodes", e);
+    }
+    if (!exists) {
+      return null;
+    }
+
+    // if it is not collection, then just create a reference which can fetch
+    // the collection object just in time from ZK
+    return new ClusterState.CollectionRef(null) {
+      @Override
+      public DocCollection get() {
+        return getCollectionLive(ZkStateReader.this, collName);
+      }
+
+      @Override
+      public boolean isLazilyLoaded() {
+        return true;
+      }
+
+      @Override
+      public String toString() {
+        return "lazy DocCollection(" + collName + ")";
+      }
+    };
+  }
+
+  /**
+   * Refresh live_nodes.
+   */
+  private void refreshLiveNodes(Watcher watcher) throws KeeperException, InterruptedException {
+    Set<String> newLiveNodes;
+    try {
+      List<String> nodeList = zkClient.getChildren(LIVE_NODES_ZKNODE, watcher, true);
+      log.debug("Updating live nodes from ZooKeeper... ({})", nodeList.size());
+      newLiveNodes = new HashSet<>(nodeList);
+    } catch (KeeperException.NoNodeException e) {
+      newLiveNodes = emptySet();
+    }
+    synchronized (getUpdateLock()) {
+      this.liveNodes = newLiveNodes;
+      if (clusterState != null) {
+        clusterState.setLiveNodes(newLiveNodes);
       }
     }
   }
@@ -591,14 +598,6 @@ public class ZkStateReader implements Cl
     }
   }
   
-  abstract class RunnableWatcher implements Runnable {
-    Watcher watcher;
-    public RunnableWatcher(Watcher watcher){
-      this.watcher = watcher;
-    }
-
-  }
-  
   public String getLeaderUrl(String collection, String shard, int timeout)
       throws InterruptedException, KeeperException {
     ZkCoreNodeProps props = new ZkCoreNodeProps(getLeaderRetry(collection,
@@ -833,19 +832,152 @@ public class ZkStateReader implements Cl
     }
   }
 
+  /** Watches a single collection's format2 state.json. */
+  class StateWatcher implements Watcher {
+    private final String coll;
+
+    StateWatcher(String coll) {
+      this.coll = coll;
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+      if (!interestingCollections.contains(coll)) {
+        // This collection is no longer interesting, stop watching.
+        log.info("Uninteresting collection {}", coll);
+        return;
+      }
+
+      // session events are not change events,
+      // and do not remove the watcher
+      if (EventType.None.equals(event.getType())) {
+        return;
+      }
+
+      log.info("A cluster state change: {} for collection {} has occurred - updating... (live nodes size: {})",
+              (event), coll, ZkStateReader.this.clusterState == null ? 0
+                      : ZkStateReader.this.clusterState.getLiveNodes().size());
+
+      refreshAndWatch(true);
+      synchronized (getUpdateLock()) {
+        constructState();
+      }
+    }
+
+    /**
+     * Refresh collection state from ZK and leave a watch for future changes.
+     * As a side effect, updates {@link #clusterState} and {@link #watchedCollectionStates}
+     * with the results of the refresh.
+     *
+     * @param expectExists if true, error if no state node exists
+     */
+    public void refreshAndWatch(boolean expectExists) {
+      try {
+        DocCollection newState = fetchCollectionState(coll, this);
+        updateWatchedCollection(coll, newState);
+      } catch (KeeperException.NoNodeException e) {
+        if (expectExists) {
+          log.warn("State node vanished for collection: " + coll, e);
+        }
+      } catch (KeeperException e) {
+        if (e.code() == KeeperException.Code.SESSIONEXPIRED
+                || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+          log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+          return;
+        }
+        log.error("Unwatched collection: " + coll, e);
+        throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
+
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        log.error("Unwatched collection :" + coll, e);
+      }
+    }
+  }
+
+  /** Watches the legacy clusterstate.json. */
+  class LegacyClusterStateWatcher implements Watcher {
+
+    @Override
+    public void process(WatchedEvent event) {
+      // session events are not change events,
+      // and do not remove the watcher
+      if (EventType.None.equals(event.getType())) {
+        return;
+      }
+      log.info("A cluster state change: {}, has occurred - updating... (live nodes size: {})", (event), ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size());
+      refreshAndWatch();
+      synchronized (getUpdateLock()) {
+        constructState();
+      }
+    }
+
+    /** Must hold {@link #getUpdateLock()} before calling this method. */
+    public void refreshAndWatch() {
+      try {
+        refreshLegacyClusterState(this);
+        // Changes to clusterstate.json signal global state changes.
+        // TODO: get rid of clusterstate.json as a signaling mechanism.
+        refreshLazyFormat2Collections(false);
+      } catch (KeeperException.NoNodeException e) {
+        throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
+                "Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
+      } catch (KeeperException e) {
+        if (e.code() == KeeperException.Code.SESSIONEXPIRED
+                || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+          log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+          return;
+        }
+        log.error("", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                "", e);
+      } catch (InterruptedException e) {
+        // Restore the interrupted status
+        Thread.currentThread().interrupt();
+        log.warn("", e);
+      }
+    }
+  }
+
+  /** Watches the live_nodes and syncs changes. */
+  class LiveNodeWatcher implements Watcher {
+
+    @Override
+    public void process(WatchedEvent event) {
+      // session events are not change events,
+      // and do not remove the watcher
+      if (EventType.None.equals(event.getType())) {
+        return;
+      }
+      log.info("A live node change: {}, has occurred - updating... (live nodes size: {})", (event), liveNodes.size());
+      refreshAndWatch();
+    }
+
+    /** Must hold {@link #getUpdateLock()} before calling this method. */
+    public void refreshAndWatch() {
+      try {
+        refreshLiveNodes(this);
+      } catch (KeeperException e) {
+        if (e.code() == KeeperException.Code.SESSIONEXPIRED
+            || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+          log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+          return;
+        }
+        log.error("", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+            "", e);
+      } catch (InterruptedException e) {
+        // Restore the interrupted status
+        Thread.currentThread().interrupt();
+        log.warn("", e);
+      }
+    }
+  }
+
   public static DocCollection getCollectionLive(ZkStateReader zkStateReader,
       String coll) {
-    String collectionPath = getCollectionPath(coll);
     try {
-      Stat stat = new Stat();
-      byte[] data = zkStateReader.getZkClient().getData(collectionPath, null, stat, true);
-      ClusterState state = ClusterState.load(stat.getVersion(), data,
-          Collections.<String> emptySet(), collectionPath);
-      ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll);
-      return collectionRef == null ? null : collectionRef.get();
-    } catch (KeeperException.NoNodeException e) {
-      log.warn("No node available : " + collectionPath, e);
-      return null;
+      return zkStateReader.fetchCollectionState(coll, null);
     } catch (KeeperException e) {
       throw new SolrException(ErrorCode.BAD_REQUEST,
           "Could not load collection from ZK:" + coll, e);
@@ -856,113 +988,71 @@ public class ZkStateReader implements Cl
     }
   }
 
+  private DocCollection fetchCollectionState(String coll, Watcher watcher) throws KeeperException, InterruptedException {
+    String collectionPath = getCollectionPath(coll);
+    try {
+      Stat stat = new Stat();
+      byte[] data = zkClient.getData(collectionPath, watcher, stat, true);
+      ClusterState state = ClusterState.load(stat.getVersion(), data,
+              Collections.<String>emptySet(), collectionPath);
+      ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll);
+      return collectionRef == null ? null : collectionRef.get();
+    } catch (KeeperException.NoNodeException e) {
+      return null;
+    }
+  }
+
   public static String getCollectionPath(String coll) {
     return COLLECTIONS_ZKNODE+"/"+coll + "/state.json";
   }
 
   public void addCollectionWatch(String coll) throws KeeperException, InterruptedException {
-    synchronized (this) {
-      if (watchedCollections.contains(coll)) return;
-      else {
-        watchedCollections.add(coll);
+    if (interestingCollections.add(coll)) {
+      log.info("addZkWatch {}", coll);
+      new StateWatcher(coll).refreshAndWatch(false);
+      synchronized (getUpdateLock()) {
+        constructState();
       }
-      addZkWatch(coll);
     }
   }
 
-  private void addZkWatch(final String coll) throws KeeperException,
-      InterruptedException {
-    log.info("addZkWatch {}", coll);
-    final String fullpath = getCollectionPath(coll);
-    synchronized (getUpdateLock()) {
-      
-      cmdExecutor.ensureExists(fullpath, zkClient);
-      log.info("Updating collection state at {} from ZooKeeper... ", fullpath);
-      
-      Watcher watcher = new Watcher() {
-        
-        @Override
-        public void process(WatchedEvent event) {
-          // session events are not change events,
-          // and do not remove the watcher
-          if (EventType.None.equals(event.getType())) {
-            return;
-          }
-          log.info("A cluster state change: {} for collection {} has occurred - updating... (live nodes size: {})",
-              (event), coll, ZkStateReader.this.clusterState == null ? 0
-                  : ZkStateReader.this.clusterState.getLiveNodes().size());
-          try {
-            
-            // delayed approach
-            // ZkStateReader.this.updateClusterState(false, false);
-            synchronized (ZkStateReader.this.getUpdateLock()) {
-              if (!watchedCollections.contains(coll)) {
-                log.info("Unwatched collection {}", coll);
-                return;
-              }
-              // remake watch
-              final Watcher thisWatch = this;
-              Stat stat = new Stat();
-              byte[] data = zkClient.getData(fullpath, thisWatch, stat, true);
-              
-              if (data == null || data.length == 0) {
-                log.warn("No value set for collection state : {}", coll);
-                return;
-                
-              }
-              ClusterState clusterState = ClusterState.load(stat.getVersion(),
-                  data, Collections.<String> emptySet(), fullpath);
-              // update volatile
-              
-              DocCollection newState = clusterState.getCollectionStates()
-                  .get(coll).get();
-              updateWatchedCollection(newState);
-              
-            }
-          } catch (KeeperException e) {
-            if (e.code() == KeeperException.Code.SESSIONEXPIRED
-                || e.code() == KeeperException.Code.CONNECTIONLOSS) {
-              log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
-              return;
-            }
-            log.error("Unwatched collection :" + coll, e);
-            throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
-            
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            log.error("Unwatched collection :" + coll, e);
-            return;
-          }
+  private void updateWatchedCollection(String coll, DocCollection newState) {
+    if (newState == null) {
+      log.info("Deleting data for {}", coll);
+      watchedCollectionStates.remove(coll);
+      return;
+    }
+
+    log.info("Updating data for {} to ver {} ", coll, newState.getZNodeVersion());
+    // CAS update loop
+    while (true) {
+      DocCollection oldState = watchedCollectionStates.get(coll);
+      if (oldState == null) {
+        if (watchedCollectionStates.putIfAbsent(coll, newState) == null) {
+          break;
+        }
+      } else {
+        if (oldState.getZNodeVersion() >= newState.getZNodeVersion()) {
+          // Nothing to do, someone else updated same or newer.
+          return;
         }
-        
-      };
-      zkClient.exists(fullpath, watcher, true);
-    }
-    DocCollection collection = getCollectionLive(this, coll);
-    if (collection != null) {
-      updateWatchedCollection(collection);
+        if (watchedCollectionStates.replace(coll, oldState, newState)) {
+          break;
+        }
+      }
     }
   }
   
-  private void updateWatchedCollection(DocCollection newState) {
-    watchedCollectionStates.put(newState.getName(), newState);
-    log.info("Updating data for {} to ver {} ", newState.getName(), newState.getZNodeVersion());
-    this.clusterState = clusterState.copyWith(newState.getName(), newState);
-  }
-  
   /** This is not a public API. Only used by ZkController */
-  public void removeZKWatch(final String coll) {
-    synchronized (this) {
-      watchedCollections.remove(coll);
-      watchedCollectionStates.remove(coll);
-      try {
-        updateClusterState();
-      } catch (KeeperException e) {
-        log.error("Error updating state",e);
-      } catch (InterruptedException e) {
-        log.error("Error updating state",e);
-        Thread.currentThread().interrupt();
+  public void removeZKWatch(String coll) {
+    interestingCollections.remove(coll);
+    watchedCollectionStates.remove(coll);
+    ClusterState.CollectionRef lazyCollectionStateFormat2 = tryMakeLazyCollectionStateFormat2(coll);
+    synchronized (getUpdateLock()) {
+      if (lazyCollectionStateFormat2 != null) {
+        this.lazyCollectionStates.put(coll, lazyCollectionStateFormat2);
       }
+      constructState();
     }
   }
 
@@ -979,5 +1069,4 @@ public class ZkStateReader implements Cl
 
     }
   }
-
 }

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java?rev=1694692&r1=1694691&r2=1694692&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java Fri Aug  7 14:07:21 2015
@@ -19,6 +19,8 @@ package org.apache.solr.common.params;
 
 import java.util.Locale;
 
+import org.apache.solr.common.SolrException;
+
 public interface CollectionParams 
 {
   /** What action **/
@@ -51,7 +53,8 @@ public interface CollectionParams
     DELETEREPLICAPROP(true),
     BALANCESHARDUNIQUE(true),
     REBALANCELEADERS(true),
-    MODIFYCOLLECTION(true);
+    MODIFYCOLLECTION(true),
+    MIGRATESTATEFORMAT(true);
     
     public final boolean isWrite;