You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2011/10/15 00:36:36 UTC

svn commit: r1183538 - in /lucene/dev/branches/solrcloud/solr: core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/core/ core/src/test/org/apache/solr/cloud/ solrj/src/java/org/apache/solr/client/solrj/impl/ solrj/src/java/org/apache/sol...

Author: markrmiller
Date: Fri Oct 14 22:36:35 2011
New Revision: 1183538

URL: http://svn.apache.org/viewvc?rev=1183538&view=rev
Log:
initial work for SOLR-2765: Shard/Node states and SOLR-2821: Improve how cluster state is managed in ZooKeeper.

Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/AssignShard.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreContainer.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/AssignShard.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/AssignShard.java?rev=1183538&r1=1183537&r2=1183538&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/AssignShard.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/AssignShard.java Fri Oct 14 22:36:35 2011
@@ -17,12 +17,15 @@ package org.apache.solr.cloud;
  * the License.
  */
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.zookeeper.KeeperException;
@@ -51,18 +54,23 @@ public class AssignShard {
     lock.lock();
     String returnShardId = null;
     try {
-      // lets read the current shards - we want to read straight from zk, and we
-      // assume we have some kind
+      // lets read the current shards - we want to read straight from zk (we
+      // need the absolute latest info), and we assume we have some kind
       // of collection level lock
-      String shardIdPaths = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection
-          + ZkStateReader.SHARDS_ZKNODE;
       
-      List<String> shardIdNames = client.getChildren(shardIdPaths, null);
+      // TODO: this made a lot more sense when the cluster state was on multiple nodes
+      // and it was just a single getChildren read.
+
+      CloudState state = CloudState.load(client.getData(ZkStateReader.CLUSTER_STATE, null, null));
+      Map<String, Slice> sliceMap = state.getSlices(collection);
       
-      if (shardIdNames.size() == 0) {
+      if (sliceMap == null) {
         return "shard1";
       }
       
+      List<String> shardIdNames = new ArrayList<String>(sliceMap.keySet());
+
+      
       if (shardIdNames.size() < slices) {
         return "shard" + (shardIdNames.size() + 1);
       }
@@ -70,7 +78,7 @@ public class AssignShard {
       // else figure out which shard needs more replicas
       final Map<String,Integer> map = new HashMap<String,Integer>();
       for (String shardId : shardIdNames) {
-        int cnt = client.getChildren(shardIdPaths + "/" + shardId, null).size();
+    	int cnt = sliceMap.get(shardId).getShards().size();
         map.put(shardId, cnt);
       }
 

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java?rev=1183538&r1=1183537&r2=1183538&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java Fri Oct 14 22:36:35 2011
@@ -24,7 +24,8 @@ public class CloudDescriptor {
   private String shardId;
   private String collectionName;
   private SolrParams params;
-
+  private String roles;
+  
   public void setShardId(String shardId) {
     this.shardId = shardId;
   }
@@ -41,6 +42,14 @@ public class CloudDescriptor {
     this.collectionName = collectionName;
   }
 
+  public String getRoles(){
+	  return roles;
+  }
+  
+  public void setRoles(String roles){
+	  this.roles = roles;
+  }
+  
   /** Optional parameters that can change how a core is created. */
   public SolrParams getParams() {
     return params;

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1183538&r1=1183537&r2=1183538&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java Fri Oct 14 22:36:35 2011
@@ -19,9 +19,12 @@ package org.apache.solr.cloud;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.net.InetAddress;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeoutException;
 import java.util.regex.Matcher;
@@ -30,6 +33,7 @@ import java.util.regex.Pattern;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.CloudState;
 import org.apache.solr.common.cloud.OnReconnect;
+import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -38,8 +42,8 @@ import org.apache.solr.common.params.Sol
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -134,10 +138,10 @@ public final class ZkController {
           public void command() {
             try {
               // we need to create all of our lost watches
-              zkStateReader.makeCollectionsNodeWatches();
-              zkStateReader.makeShardsWatches(true);
+//              zkStateReader.makeCollectionsNodeWatches();
+//              zkStateReader.makeShardsWatches(true);
               createEphemeralLiveNode();
-              zkStateReader.updateCloudState(false);
+              zkStateReader.createClusterStateWatchersAndUpdate();
               
               // re register all descriptors
               List<CoreDescriptor> descriptors = registerOnReconnect
@@ -175,47 +179,6 @@ public final class ZkController {
   }
 
   /**
-   * Adds the /collection/shards/shards_id node as well as the /collections/leader_elect/shards_id node.
-   * 
-   * @param shardId
-   * @param collection
-   * @throws IOException
-   * @throws InterruptedException 
-   * @throws KeeperException 
-   */
-  private void addZkShardsNode(String shardId, String collection)
-      throws IOException, InterruptedException, KeeperException {
-    
-    String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection
-        + ZkStateReader.SHARDS_ZKNODE + "/" + shardId;
-    
-    
-    try {
-      
-      // shards node
-      if (!zkClient.exists(shardsZkPath)) {
-        if (log.isInfoEnabled()) {
-          log.info("creating zk shards node:" + shardsZkPath);
-        }
-        // makes shards zkNode if it doesn't exist
-        zkClient.makePath(shardsZkPath, CreateMode.PERSISTENT, null);
-        
-      }
-    } catch (KeeperException e) {
-      // its okay if another beats us creating the node
-      if (e.code() != KeeperException.Code.NODEEXISTS) {
-        throw e;
-      }
-    }
-    
-    leaderElector.setupForSlice(shardId, collection);
-    
-    // TODO: consider how these notifications are being done
-    // ping that there is a new shardId
-    zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[]) null);
-  }
-
-  /**
    * Closes the underlying ZooKeeper client.
    */
   public void close() {
@@ -331,7 +294,7 @@ public final class ZkController {
       
       createEphemeralLiveNode();
       setUpCollectionsNode();
-      zkStateReader.makeCollectionsNodeWatches();
+      zkStateReader.createClusterStateWatchersAndUpdate();
       
     } catch (IOException e) {
       log.error("", e);
@@ -356,47 +319,7 @@ public final class ZkController {
     String nodeName = getNodeName();
     String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
     log.info("Register node as live in ZooKeeper:" + nodePath);
-    Watcher liveNodeWatcher = new Watcher() {
-
-      public void process(WatchedEvent event) {
-        try {
-          log.info("Updating live nodes:" + zkClient);
-          try {
-            zkStateReader.updateLiveNodes();
-          } finally {
-            // re-make watch
-
-            String path = event.getPath();
-            if(path == null) {
-              // on shutdown, it appears this can trigger with a null path
-              log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
-              return;
-            }
-            zkClient.getChildren(event.getPath(), this);
-          }
-        } catch (KeeperException e) {
-          if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
-            log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
-            return;
-          }
-          log.error("", e);
-          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-              "", e);
-        } catch (InterruptedException e) {
-          // Restore the interrupted status
-          Thread.currentThread().interrupt();
-          log.error("", e);
-          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-              "", e);
-        } catch (IOException e) {
-          log.error("", e);
-          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-              "", e);
-        }
-        
-      }
-      
-    };
+   
     try {
       boolean nodeDeleted = true;
       try {
@@ -422,15 +345,7 @@ public final class ZkController {
       if (e.code() != KeeperException.Code.NODEEXISTS) {
         throw e;
       }
-    }
-    zkClient.getChildren(ZkStateReader.LIVE_NODES_ZKNODE, liveNodeWatcher);
-    try {
-      zkStateReader.updateLiveNodes();
-    } catch (IOException e) {
-      log.error("", e);
-      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-          "", e);
-    }
+    }    
   }
   
   public String getNodeName() {
@@ -504,44 +419,17 @@ public final class ZkController {
       shardId = assignShard.assignShard(collection, numShards);
       cloudDesc.setShardId(shardId);
     }
-    String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + ZkStateReader.SHARDS_ZKNODE + "/" + shardId;
-
-    boolean shardZkNodeAlreadyExists = zkClient.exists(shardsZkPath);
-    
-    if (log.isInfoEnabled()) {
-      log.info("Register shard - core:" + coreName + " address:"
-          + shardUrl);
-    }
-
-    ZkNodeProps props = getShardZkProps(shardUrl);
-
-    byte[] bytes = props.store();
     
     String shardZkNodeName = getNodeName() + "_" + coreName;
 
-    if(shardZkNodeAlreadyExists) {
-      zkClient.setData(shardsZkPath + "/" + shardZkNodeName, bytes);
-      // tell everyone to update cloud info
-      zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
-    } else {
-      addZkShardsNode(shardId, collection);
-      try {
-        log.info("create node:" + shardZkNodeName);
-        zkClient.create(shardsZkPath + "/" + shardZkNodeName, bytes,
-            CreateMode.PERSISTENT);
-        
-        // tell everyone to update cloud info
-        zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
-      } catch (KeeperException e) {
-        // its okay if the node already exists
-        if (e.code() != KeeperException.Code.NODEEXISTS) {
-          throw e;
-        }
-        // for some reason the shard already exists, though it didn't when we
-        // started registration - just continue
-        
+    if (log.isInfoEnabled()) {
+        log.info("Register shard - core:" + coreName + " address:"
+            + shardUrl);
       }
-    }
+    
+    leaderElector.setupForSlice(shardId, collection);
+    
+    ZkNodeProps props = addToZk(cloudDesc, shardUrl, shardZkNodeName);
     
     // leader election
     doLeaderElectionProcess(shardId, collection, shardZkNodeName, props);
@@ -549,11 +437,71 @@ public final class ZkController {
   }
 
 
-  private ZkNodeProps getShardZkProps(String shardUrl) {
+  ZkNodeProps addToZk(final CloudDescriptor cloudDesc, String shardUrl,
+      final String shardZkNodeName)
+      throws KeeperException, InterruptedException,
+      UnsupportedEncodingException, IOException {
     ZkNodeProps props = new ZkNodeProps();
     props.put(ZkStateReader.URL_PROP, shardUrl);
     
     props.put(ZkStateReader.NODE_NAME, getNodeName());
+    
+    props.put("roles", cloudDesc.getRoles());
+
+    Map<String, ZkNodeProps> shardProps = new HashMap<String, ZkNodeProps>();
+    shardProps.put(shardZkNodeName, props);
+		Slice slice = new Slice(cloudDesc.getShardId(), shardProps);
+		
+		boolean persisted = false;
+		Stat stat = zkClient.exists(ZkStateReader.CLUSTER_STATE, null);
+		if (stat == null) {
+			log.info("/clusterstate does not exist, attempting to create");
+			try {
+				CloudState state = new CloudState();
+
+				state.addSlice(cloudDesc.getCollectionName(), slice);
+
+				zkClient.create(ZkStateReader.CLUSTER_STATE,
+						CloudState.store(state), Ids.OPEN_ACL_UNSAFE,
+						CreateMode.PERSISTENT);
+				persisted = true;
+				log.info("/clusterstate created");
+			} catch (KeeperException e) {
+				if (e.code() != Code.NODEEXISTS) {
+					// If this node exists, no big deal
+					throw e;
+				}
+			}
+		}
+		if (!persisted) {
+			stat = new Stat();
+			boolean updated = false;
+			
+			// TODO: we don't want to retry forever
+			// give up at some point
+			while (!updated) {
+
+				byte[] data = zkClient.getData(ZkStateReader.CLUSTER_STATE,
+						null, stat);
+				log.info("Attempting to update /clusterstate version "
+						+ stat.getVersion());
+				CloudState state = CloudState.load(data);
+
+				state.addSlice(cloudDesc.getCollectionName(), slice);
+
+				try {
+					zkClient.setData(ZkStateReader.CLUSTER_STATE,
+							CloudState.store(state), stat.getVersion());
+					updated = true;
+				} catch (KeeperException e) {
+					if (e.code() != Code.BADVERSION) {
+						throw e;
+					}
+					log.info("Failed to update /clusterstate, retrying");
+				}
+
+			}
+		}
     return props;
   }
 
@@ -724,21 +672,6 @@ public final class ZkController {
               throw e;
             }
           }
-          try {
-            // shards node
-            if (!zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection
-                + ZkStateReader.SHARDS_ZKNODE)) {
-              // makes shards zkNode if it doesn't exist
-              zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection
-                  + ZkStateReader.SHARDS_ZKNODE);
-            }
-          } catch (KeeperException e) {
-            // its okay if another beats us creating the node
-            if (e.code() != KeeperException.Code.NODEEXISTS) {
-              throw e;
-            }
-          }
-          
          
           // ping that there is a new collection
           zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1183538&r1=1183537&r2=1183538&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreContainer.java Fri Oct 14 22:36:35 2011
@@ -411,6 +411,10 @@ public class CoreContainer 
           if (opt != null) {
             p.getCloudDescriptor().setCollectionName(opt);
           }
+          opt = DOMUtil.getAttr(node, "roles", null);
+          if(opt != null){
+        	  p.getCloudDescriptor().setRoles(opt);
+          }
         }
         opt = DOMUtil.getAttr(node, "properties", null);
         if (opt != null) {
@@ -431,29 +435,6 @@ public class CoreContainer 
         SolrException.logOnce(log,null,ex);
       }
     }
-    
-    if(zkController != null) {
-      try {
-        synchronized (zkController.getZkStateReader().getUpdateLock()) {
-          zkController.getZkStateReader().makeShardZkNodeWatches(false);
-          zkController.getZkStateReader().updateCloudState(true);
-        }
-      } catch (InterruptedException e) {
-        // Restore the interrupted status
-        Thread.currentThread().interrupt();
-        log.error("", e);
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-            "", e);
-      } catch (KeeperException e) {
-        log.error("", e);
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-            "", e);
-      } catch (IOException e) {
-        log.error("", e);
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-            "", e);
-      }
-    }
   }
 
   private Properties readProperties(Config cfg, Node node) throws XPathExpressionException {

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java?rev=1183538&r1=1183537&r2=1183538&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java Fri Oct 14 22:36:35 2011
@@ -19,7 +19,6 @@ package org.apache.solr.cloud;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -50,17 +49,6 @@ public class CloudStateUpdateTest extend
 
   private static final boolean VERBOSE = false;
 
-  private static final String URL1 = "http://localhost:3133/solr/core0";
-  private static final String URL3 = "http://localhost:3133/solr/core1";
-  private static final String URL2 = "http://localhost:3123/solr/core1";
-  private static final String URL4 = "http://localhost:3123/solr/core4";
-  private static final String SHARD4 = "localhost:3123_solr_core4";
-  private static final String SHARD3 = "localhost:3123_solr_core3";
-  private static final String SHARD2 = "localhost:3123_solr_core2";
-  private static final String SHARD1 = "localhost:3123_solr_core1";
-  
-  private static final int TIMEOUT = 10000;
-
   protected ZkTestServer zkServer;
 
   protected String zkDir;
@@ -138,77 +126,6 @@ public class CloudStateUpdateTest extend
     log.info("####SETUP_END " + getName());
     
   }
-  
-  @Test
-  public void testIncrementalUpdate() throws Exception {
-    System.setProperty("CLOUD_UPDATE_DELAY", "1");
-    String zkDir = dataDir.getAbsolutePath() + File.separator
-        + "zookeeper/server1/data";
-    ZkTestServer server = null;
-    SolrZkClient zkClient = null;
-    ZkController zkController = null;
-    
-    server = new ZkTestServer(zkDir);
-    server.run();
-    try {
-      AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
-      AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
-      
-      zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
-      String shardsPath1 = "/collections/collection1/shards/shardid1";
-      String shardsPath2 = "/collections/collection1/shards/shardid2";
-      zkClient.makePath(shardsPath1);
-      zkClient.makePath(shardsPath2);
-      
-      addShardToZk(zkClient, shardsPath1, SHARD1, URL1);
-      addShardToZk(zkClient, shardsPath1, SHARD2, URL2);
-      addShardToZk(zkClient, shardsPath2, SHARD3, URL3);
-      
-      removeShardFromZk(server.getZkAddress(), zkClient, shardsPath1);
-      
-      zkController = new ZkController(server.getZkAddress(), TIMEOUT, 1000,
-          "localhost", "8983", "solr", 3, new CurrentCoreDescriptorProvider() {
-            
-            @Override
-            public List<CoreDescriptor> getCurrentDescriptors() {
-              // unused
-              return null;
-            }
-          });
-      
-      zkController.getZkStateReader().updateCloudState(true);
-      CloudState cloudInfo = zkController.getCloudState();
-      Map<String,Slice> slices = cloudInfo.getSlices("collection1");
-      assertFalse(slices.containsKey("shardid1"));
-      
-      zkClient.makePath(shardsPath1);
-      addShardToZk(zkClient, shardsPath1, SHARD1, URL1);
-      
-      zkController.getZkStateReader().updateCloudState(true);
-      cloudInfo = zkController.getCloudState();
-      slices = cloudInfo.getSlices("collection1");
-      assertTrue(slices.containsKey("shardid1"));
-      
-      updateUrl(zkClient, shardsPath1, SHARD1, "fake");
-      
-      addShardToZk(zkClient, shardsPath2, SHARD4, URL4);
-      
-      zkController.getZkStateReader().updateCloudState(true);
-      cloudInfo = zkController.getCloudState();
-      String url = cloudInfo.getSlices("collection1").get("shardid1").getShards().get(SHARD1).get("url");
-      
-      // because of incremental update, we don't expect to find the new 'fake'
-      // url - instead we should still
-      // be using the original url - the correct way to update this would be to
-      // remove the whole node and readd it
-      assertEquals(URL1, url);
-      
-    } finally {
-      server.shutdown();
-      zkClient.close();
-      zkController.close();
-    }
-  }
 
   @Test
   public void testCoreRegistration() throws Exception {
@@ -323,37 +240,6 @@ public class CloudStateUpdateTest extend
     System.clearProperty("CLOUD_UPDATE_DELAY");
     SolrConfig.severeErrors.clear();
   }
-
-  private void addShardToZk(SolrZkClient zkClient, String shardsPath,
-      String zkNodeName, String url) throws IOException,
-      KeeperException, InterruptedException {
-
-    ZkNodeProps props = new ZkNodeProps();
-    props.put(ZkStateReader.URL_PROP, url);
-    props.put(ZkStateReader.NODE_NAME, zkNodeName);
-    byte[] bytes = props.store();
-
-    zkClient
-        .create(shardsPath + "/" + zkNodeName, bytes, CreateMode.PERSISTENT);
-  }
-  
-  private void updateUrl(SolrZkClient zkClient, String shardsPath,
-      String zkNodeName, String url) throws IOException,
-      KeeperException, InterruptedException {
-
-    ZkNodeProps props = new ZkNodeProps();
-    props.put(ZkStateReader.URL_PROP, url);
-    props.put(ZkStateReader.NODE_NAME, zkNodeName);
-    byte[] bytes = props.store();
-
-    zkClient
-        .setData(shardsPath + "/" + zkNodeName, bytes);
-  }
-  
-  private void removeShardFromZk(String zkHost, SolrZkClient zkClient, String shardsPath) throws Exception {
-
-    AbstractZkTestCase.tryCleanPath(zkHost, shardsPath);
-  }
   
   private void printLayout(String zkHost) throws Exception {
     SolrZkClient zkClient = new SolrZkClient(

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java?rev=1183538&r1=1183537&r2=1183538&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java Fri Oct 14 22:36:35 2011
@@ -38,7 +38,7 @@ import org.junit.BeforeClass;
 public class FullDistributedZkTest extends AbstractDistributedZkTestCase {
   
   private static final String DEFAULT_COLLECTION = "collection1";
-  private static final boolean DEBUG = false;
+  private static final boolean DEBUG = true;
   String t1="a_t";
   String i1="a_si";
   String nint = "n_i";
@@ -141,12 +141,7 @@ public class FullDistributedZkTest exten
    */
   @Override
   public void doTest() throws Exception {
-    printLayout();
-    // make sure 'shard1' was auto-assigned
-    SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), AbstractZkTestCase.TIMEOUT);
-    assertTrue("shard1 was not found in zk layout", zkClient.exists("/solr/collections/collection1/shards/shard1"));
-    zkClient.close();
-    
+
     del("*:*");
     indexr(id,1, i1, 100, tlong, 100,t1,"now is the time for all good men"
             ,"foo_f", 1.414f, "foo_b", "true", "foo_d", 1.414d);

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java?rev=1183538&r1=1183537&r2=1183538&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java Fri Oct 14 22:36:35 2011
@@ -64,86 +64,6 @@ public class ZkControllerTest extends So
   }
 
   @Test
-  public void testReadShards() throws Exception {
-    String zkDir = dataDir.getAbsolutePath() + File.separator
-        + "zookeeper/server1/data";
-    ZkTestServer server = null;
-    SolrZkClient zkClient = null;
-    ZkController zkController = null;
-    try {
-      server = new ZkTestServer(zkDir);
-      server.run();
-      AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
-      AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
-
-      zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
-      String shardsPath = "/collections/collection1/shards/shardid1";
-      zkClient.makePath(shardsPath);
-
-      addShardToZk(zkClient, shardsPath, SHARD1, URL1);
-      addShardToZk(zkClient, shardsPath, SHARD2, URL2);
-      addShardToZk(zkClient, shardsPath, SHARD3, URL3);
-
-      if (DEBUG) {
-        zkClient.printLayoutToStdOut();
-      }
-
-      zkController = new ZkController(server.getZkAddress(),
-          TIMEOUT, 1000, "localhost", "8983", "solr", 3, new CurrentCoreDescriptorProvider() {
-            
-            @Override
-            public List<CoreDescriptor> getCurrentDescriptors() {
-              // do nothing
-              return null;
-            }
-          });
- 
-      zkController.getZkStateReader().updateCloudState(true);
-      CloudState cloudInfo = zkController.getCloudState();
-      Map<String,Slice> slices = cloudInfo.getSlices("collection1");
-      assertNotNull(slices);
-
-      for (Slice slice : slices.values()) {
-        Map<String,ZkNodeProps> shards = slice.getShards();
-        if (DEBUG) {
-          for (String shardName : shards.keySet()) {
-            ZkNodeProps props = shards.get(shardName);
-            System.out.println("shard:" + shardName);
-            System.out.println("props:" + props.toString());
-          }
-        }
-        assertNotNull(shards.get(SHARD1));
-        assertNotNull(shards.get(SHARD2));
-        assertNotNull(shards.get(SHARD3));
-
-        ZkNodeProps props = shards.get(SHARD1);
-        assertEquals(URL1, props.get(ZkStateReader.URL_PROP));
-        assertEquals(TEST_NODE_NAME, props.get(ZkStateReader.NODE_NAME));
-
-        props = shards.get(SHARD2);
-        assertEquals(URL2, props.get(ZkStateReader.URL_PROP));
-        assertEquals(TEST_NODE_NAME, props.get(ZkStateReader.NODE_NAME));
-
-        props = shards.get(SHARD3);
-        assertEquals(URL3, props.get(ZkStateReader.URL_PROP));
-        assertEquals(TEST_NODE_NAME, props.get(ZkStateReader.NODE_NAME));
-
-      }
-
-    } finally {
-      if (zkClient != null) {
-        zkClient.close();
-      }
-      if (zkController != null) {
-        zkController.close();
-      }
-      if (server != null) {
-        server.shutdown();
-      }
-    }
-  }
-
-  @Test
   public void testReadConfigName() throws Exception {
     String zkDir = dataDir.getAbsolutePath() + File.separator
         + "zookeeper/server1/data";

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=1183538&r1=1183537&r2=1183538&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java Fri Oct 14 22:36:35 2011
@@ -93,9 +93,7 @@ public class CloudSolrServer extends Sol
       if (zkStateReader != null) return;
       try {
         ZkStateReader zk = new ZkStateReader(zkHost, zkConnectTimeout, zkClientTimeout);
-        zk.makeCollectionsNodeWatches();
-        zk.makeShardZkNodeWatches(false);
-        zk.updateCloudState(true);
+        zk.createClusterStateWatchersAndUpdate();
         zkStateReader = zk;
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java?rev=1183538&r1=1183537&r2=1183538&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java Fri Oct 14 22:36:35 2011
@@ -17,7 +17,13 @@ package org.apache.solr.common.cloud;
  * limitations under the License.
  */
 
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.StringWriter;
+import java.io.UnsupportedEncodingException;
+import java.io.Writer;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -25,141 +31,206 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.zookeeper.KeeperException;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.solr.common.util.XMLErrorLogger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
 
 // immutable
 public class CloudState {
-  protected static Logger log = LoggerFactory.getLogger(CloudState.class);
-  
-  private final Map<String,Map<String,Slice>> collectionStates;
-  private final Set<String> liveNodes;
-  
-  public CloudState(Set<String> liveNodes, Map<String,Map<String,Slice>> collectionStates) {
-    this.liveNodes = liveNodes;
-    this.collectionStates = collectionStates;
-  }
-  
-  public Map<String,Slice> getSlices(String collection) {
-    Map<String,Slice> collectionState = collectionStates.get(collection);
-    if(collectionState == null) {
-      return null;
-    }
-    return Collections.unmodifiableMap(collectionState);
-  }
-  
-  public Set<String> getCollections() {
-    return Collections.unmodifiableSet(collectionStates.keySet());
-  }
-  
-  public Map<String,Map<String,Slice>> getCollectionStates() {
-    return Collections.unmodifiableMap(collectionStates);
-  }
-  
-  public Set<String> getLiveNodes() {
-    return Collections.unmodifiableSet(liveNodes);
-  }
-  
-  public boolean liveNodesContain(String name) {
-    return liveNodes.contains(name);
-  }
-  
-  public static CloudState buildCloudState(SolrZkClient zkClient, CloudState oldCloudState, boolean onlyLiveNodes) throws KeeperException, InterruptedException, IOException {
-    Map<String,Map<String,Slice>> collectionStates;
-    if (!onlyLiveNodes) {
-      List<String> collections = zkClient.getChildren(
-          ZkStateReader.COLLECTIONS_ZKNODE, null);
-
-      collectionStates = new HashMap<String,Map<String,Slice>>();
-      for (String collection : collections) {
-        String shardIdPaths = ZkStateReader.COLLECTIONS_ZKNODE + "/"
-            + collection + ZkStateReader.SHARDS_ZKNODE;
-        List<String> shardIdNames;
-        try {
-          shardIdNames = zkClient.getChildren(shardIdPaths, null);
-        } catch (KeeperException.NoNodeException e) {
-          // node is not valid currently
-          continue;
-        }
-        Map<String,Slice> slices = new HashMap<String,Slice>();
-        for (String shardIdZkPath : shardIdNames) {
-          Slice oldSlice = null;
-          if (oldCloudState.getCollectionStates().containsKey(collection)
-              && oldCloudState.getCollectionStates().get(collection)
-                  .containsKey(shardIdZkPath)) {
-            oldSlice = oldCloudState.getCollectionStates().get(collection)
-                .get(shardIdZkPath);
-          }
-          
-          Map<String,ZkNodeProps> shardsMap = readShards(zkClient, shardIdPaths
-              + "/" + shardIdZkPath, oldSlice);
-          Slice slice = new Slice(shardIdZkPath, shardsMap);
-          slices.put(shardIdZkPath, slice);
-        }
-        collectionStates.put(collection, slices);
-      }
-    } else {
-      collectionStates = oldCloudState.getCollectionStates();
-    }
-    
-    CloudState cloudInfo = new CloudState(getLiveNodes(zkClient), collectionStates);
-    
-    return cloudInfo;
-  }
-  
-  /**
-   * @param zkClient
-   * @param shardsZkPath
-   * @return
-   * @throws KeeperException
-   * @throws InterruptedException
-   * @throws IOException
-   */
-  private static Map<String,ZkNodeProps> readShards(SolrZkClient zkClient, String shardsZkPath, Slice oldSlice)
-      throws KeeperException, InterruptedException, IOException {
-
-    Map<String,ZkNodeProps> shardNameToProps = new HashMap<String,ZkNodeProps>();
-
-    if (zkClient.exists(shardsZkPath, null) == null) {
-      throw new IllegalStateException("Cannot find zk shards node that should exist:"
-          + shardsZkPath);
-    }
-
-    List<String> shardZkPaths = zkClient.getChildren(shardsZkPath, null);
-    
-    for (String shardPath : shardZkPaths) {
-      ZkNodeProps props;
-      if (oldSlice != null && oldSlice.getShards().containsKey(shardPath)) {
-        props = oldSlice.getShards().get(shardPath);
-      } else {
-        byte[] data = zkClient.getData(shardsZkPath + "/" + shardPath, null,
-            null);
-        
-        props = new ZkNodeProps();
-        props.load(data);
-      }
-      
-      shardNameToProps.put(shardPath, props);
-    }
+	protected static Logger log = LoggerFactory.getLogger(CloudState.class);
+	private static final XMLErrorLogger xmllog = new XMLErrorLogger(log);
+	private Map<String, Map<String, Slice>> collectionStates;
+	private Set<String> liveNodes;
+
+	public CloudState() {
+		this.liveNodes = new HashSet<String>();
+		this.collectionStates = new HashMap<String, Map<String, Slice>>(0);
+	}
+
+	public CloudState(Set<String> liveNodes,
+			Map<String, Map<String, Slice>> collectionStates) {
+		this.liveNodes = liveNodes;
+		this.collectionStates = collectionStates;
+	}
+
+	public Slice getSlice(String collection, String slice) {
+		if (collectionStates.containsKey(collection)
+				&& collectionStates.get(collection).containsKey(slice))
+			return collectionStates.get(collection).get(slice);
+		return null;
+	}
+
+	public void addSlice(String collection, Slice slice) {
+		if (!collectionStates.containsKey(collection)) {
+			log.info("New collection");
+			collectionStates.put(collection, new HashMap<String, Slice>());
+		}
+		if (!collectionStates.get(collection).containsKey(slice.getName())) {
+			log.info("New slice: " + slice.getName());
+			collectionStates.get(collection).put(slice.getName(), slice);
+		} else {
+			log.info("Updating existing slice");
+			
+			Map<String, ZkNodeProps> shards = new HashMap<String, ZkNodeProps>();
+			
+			Slice existingSlice = collectionStates.get(collection).get(slice.getName());
+			shards.putAll(existingSlice.getShards());
+			shards.putAll(slice.getShards());
+			Slice updatedSlice = new Slice(slice.getName(), shards);
+			collectionStates.get(collection).put(slice.getName(), updatedSlice);
+		}
+	}
+
+	public Map<String, Slice> getSlices(String collection) {
+		if(!collectionStates.containsKey(collection))
+			return null;
+		return Collections.unmodifiableMap(collectionStates.get(collection));
+	}
+
+	public Set<String> getCollections() {
+		return Collections.unmodifiableSet(collectionStates.keySet());
+	}
+
+	public Map<String, Map<String, Slice>> getCollectionStates() {
+		return Collections.unmodifiableMap(collectionStates);
+	}
+
+	public Set<String> getLiveNodes() {
+		return Collections.unmodifiableSet(liveNodes);
+	}
+
+	public void setLiveNodes(Set<String> liveNodes) {
+		this.liveNodes = liveNodes;
+	}
+
+	public boolean liveNodesContain(String name) {
+		return liveNodes.contains(name);
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		sb.append("live nodes:" + liveNodes);
+		sb.append(" collections:" + collectionStates);
+		return sb.toString();
+	}
+
+	public static CloudState load(byte[] state) {
+		// TODO this should throw some exception instead of eating them
+		CloudState cloudState = new CloudState();
+		if(state != null && state.length > 0) {
+			InputSource is = new InputSource(new ByteArrayInputStream(state));
+			DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+	
+			try {
+				DocumentBuilder db = dbf.newDocumentBuilder();
+	
+				db.setErrorHandler(xmllog);
+				Document doc = db.parse(is);
+	
+				Element root = doc.getDocumentElement();
+	
+				NodeList collectionStates = root.getChildNodes();
+				for (int x = 0; x < collectionStates.getLength(); x++) {
+					Node collectionState = collectionStates.item(x);
+					String collectionName = collectionState.getAttributes()
+							.getNamedItem("name").getNodeValue();
+					NodeList slices = collectionState.getChildNodes();
+					for (int y = 0; y < slices.getLength(); y++) {
+						Node slice = slices.item(y);
+						Node sliceName = slice.getAttributes().getNamedItem("name");
+						
+						NodeList shardsNodeList = slice.getChildNodes();
+						Map<String, ZkNodeProps> shards = new HashMap<String, ZkNodeProps>();
+						for (int z = 0; z < shardsNodeList.getLength(); z++) {
+							Node shard = shardsNodeList.item(z);
+							String shardName = shard.getAttributes()
+									.getNamedItem("name").getNodeValue();
+							NodeList propsList = shard.getChildNodes();
+							ZkNodeProps props = new ZkNodeProps();
+							
+							for (int i = 0; i < propsList.getLength(); i++) {
+								Node prop = propsList.item(i);
+								String propName = prop.getAttributes()
+										.getNamedItem("name").getNodeValue();
+								String propValue = prop.getTextContent();
+								props.put(propName, propValue);
+							}
+							shards.put(shardName, props);
+						}
+						Slice s = new Slice(sliceName.getNodeValue(), shards);
+						cloudState.addSlice(collectionName, s);
+					}
+				}
+			} catch (SAXException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			} catch (IOException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			} catch (ParserConfigurationException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			} finally {
+				// some XML parsers are broken and don't close the byte stream (but
+				// they should according to spec)
+				IOUtils.closeQuietly(is.getByteStream());
+			}
+		}
+		return cloudState;
+	}
+
+	public static byte[] store(CloudState state)
+			throws UnsupportedEncodingException, IOException {
+		StringWriter stringWriter = new StringWriter();
+		Writer w = new BufferedWriter(stringWriter);
+		w.write("<?xml version=\"1.0\" encoding=\"UTF-8\" ?>\n");
+		w.write("<clusterstate>");
+		Map<String, Map<String, Slice>> collectionStates = state
+				.getCollectionStates();
+		for (String collectionName : collectionStates.keySet()) {
+			w.write("<collectionstate name=\"" + collectionName + "\">");
+			Map<String, Slice> collection = collectionStates
+					.get(collectionName);
+			for (String sliceName : collection.keySet()) {
+				w.write("<shard name=\"" + sliceName + "\">");
+				Slice slice = collection.get(sliceName);
+				Map<String, ZkNodeProps> shards = slice.getShards();
+				for (String shardName : shards.keySet()) {
+					w.write("<replica name=\"" + shardName + "\">");
+					ZkNodeProps props = shards.get(shardName);
+					for (String propName : props.keySet()) {
+						w.write("<str name=\"" + propName + "\">"
+								+ props.get(propName) + "</str>");
+					}
+					w.write("</replica>");
+
+				}
+				w.write("</shard>");
+			}
+			w.write("</collectionstate>");
+		}
+		w.write("</clusterstate>");
+		w.flush();
+		w.close();
+		return stringWriter.toString().getBytes("UTF-8");
 
-    return Collections.unmodifiableMap(shardNameToProps);
-  }
-  
-  private static Set<String> getLiveNodes(SolrZkClient zkClient) throws KeeperException, InterruptedException {
-    List<String> liveNodes = zkClient.getChildren(ZkStateReader.LIVE_NODES_ZKNODE, null);
+	}
+
+  public void setLiveNodes(List<String> liveNodes) {
     Set<String> liveNodesSet = new HashSet<String>(liveNodes.size());
     liveNodesSet.addAll(liveNodes);
-
-    return liveNodesSet;
-  }
-  
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("live nodes:" + liveNodes);
-    sb.append(" collections:" + collectionStates);
-    return sb.toString();
+    this.liveNodes = liveNodesSet;
   }
-
 }

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1183538&r1=1183537&r2=1183538&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Fri Oct 14 22:36:35 2011
@@ -18,11 +18,7 @@ package org.apache.solr.common.cloud;
  */
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
@@ -30,10 +26,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.solr.common.SolrException;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,11 +39,12 @@ public class ZkStateReader {
   public static final String COLLECTIONS_ZKNODE = "/collections";
   public static final String URL_PROP = "url";
   public static final String NODE_NAME = "node_name";
-  public static final String SHARDS_ZKNODE = "/shards";
+  public static final String ROLES_PROP = "roles";
   public static final String LIVE_NODES_ZKNODE = "/live_nodes";
+  public static final String CLUSTER_STATE = "/clusterstate";
   
-  private volatile CloudState cloudState  = new CloudState(new HashSet<String>(0), new HashMap<String,Map<String,Slice>>(0));
-  
+  private volatile CloudState cloudState = new CloudState();
+
   private static final long CLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("CLOUD_UPDATE_DELAY", "5000"));
 
   public static final String LEADER_ELECT_ZKNODE = "/leader_elect";
@@ -81,9 +78,7 @@ public class ZkStateReader {
 
           public void command() {
             try {
-              makeCollectionsNodeWatches();
-              makeShardsWatches(true);
-              updateCloudState(false);
+            	ZkStateReader.this.createClusterStateWatchersAndUpdate();
             } catch (KeeperException e) {
               log.error("", e);
               throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
@@ -94,10 +89,6 @@ public class ZkStateReader {
               log.error("", e);
               throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
                   "", e);
-            } catch (IOException e) {
-              log.error("", e);
-              throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-                  "", e);
             }
 
           }
@@ -116,24 +107,131 @@ public class ZkStateReader {
     updateCloudState(true, true);
   }
   
-  // load and publish a new CollectionInfo
-  private synchronized void updateCloudState(boolean immediate, final boolean onlyLiveNodes) throws KeeperException, InterruptedException,
-      IOException {
+  public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,
+      InterruptedException {
+    // We need to fetch the current cluster state and the set of live nodes
+    
+    if (!zkClient.exists(CLUSTER_STATE)) {
+      try {
+        zkClient.create(CLUSTER_STATE, null, CreateMode.PERSISTENT);
+      } catch (KeeperException e) {
+        // if someone beats us to creating this ignore it
+        if (e.code() != KeeperException.Code.NODEEXISTS) {
+          throw e;
+        }
+      }
+    }
+    
+    CloudState clusterState;
     
+    log.info("Updating cluster state from ZooKeeper... ");
+    byte[] data = zkClient.getData(CLUSTER_STATE, new Watcher() {
+      
+      @Override
+      public void process(WatchedEvent event) {
+        log.info("A cluster state change has occurred");
+        try {
+          byte[] data = zkClient.getData(CLUSTER_STATE, this, null);
+          // delayed approach
+          // ZkStateReader.this.updateCloudState(false, false);
+          synchronized (ZkStateReader.this.getUpdateLock()) {
+            CloudState clusterState = CloudState.load(data);
+            clusterState.setLiveNodes(ZkStateReader.this.cloudState
+                .getLiveNodes());
+            // update volatile
+            cloudState = clusterState;
+          }
+        } catch (KeeperException e) {
+          if (e.code() == KeeperException.Code.SESSIONEXPIRED
+              || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+            log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+            return;
+          }
+          log.error("", e);
+          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+              "", e);
+        } catch (InterruptedException e) {
+          // Restore the interrupted status
+          Thread.currentThread().interrupt();
+          log.error("", e);
+          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+              "", e);
+        } /*
+           * catch(IOException e){ log.error("", e); throw new
+           * ZooKeeperException( SolrException.ErrorCode.SERVER_ERROR, "", e); }
+           */
+      }
+      
+    }, null);
+    
+    clusterState = CloudState.load(data);
+    
+    List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE,
+        new Watcher() {
+          
+          @Override
+          public void process(WatchedEvent event) {
+            log.info("Updating live nodes");
+            try {
+              // delayed approach
+              // ZkStateReader.this.updateCloudState(false, true);
+              synchronized (ZkStateReader.this.getUpdateLock()) {
+                List<String> liveNodes = zkClient.getChildren(
+                    LIVE_NODES_ZKNODE, this);
+                ZkStateReader.this.cloudState.setLiveNodes(liveNodes);
+              }
+            } catch (KeeperException e) {
+              if (e.code() == KeeperException.Code.SESSIONEXPIRED
+                  || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+                log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+                return;
+              }
+              log.error("", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
+            } catch (InterruptedException e) {
+              // Restore the interrupted status
+              Thread.currentThread().interrupt();
+              log.error("", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
+            }
+          }
+          
+        });
+    
+    clusterState.setLiveNodes(liveNodes);
+    this.cloudState = clusterState;
+    
+  }
+  
+  
+  // load and publish a new CollectionInfo
+  private synchronized void updateCloudState(boolean immediate,
+      final boolean onlyLiveNodes) throws KeeperException,
+      InterruptedException, IOException {
+    log.info("Manual update of cluster state initiated");
     // build immutable CloudInfo
     
-    if(immediate) {
-      if(!onlyLiveNodes) {
+    if (immediate) {
+      CloudState clusterState;
+      if (!onlyLiveNodes) {
         log.info("Updating cloud state from ZooKeeper... ");
+        byte[] data = zkClient.getData(ZkStateReader.CLUSTER_STATE, null, null);
+        
+        clusterState = CloudState.load(data);
       } else {
         log.info("Updating live nodes from ZooKeeper... ");
+        clusterState = cloudState;
       }
-      CloudState cloudState;
-      cloudState = CloudState.buildCloudState(zkClient, this.cloudState, onlyLiveNodes);
+      
+      List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE, null);
+      
+      clusterState.setLiveNodes(liveNodes);
       // update volatile
-      this.cloudState = cloudState;
+      this.cloudState = clusterState;
     } else {
-      if(cloudStateUpdateScheduled) {
+      if (cloudStateUpdateScheduled) {
         log.info("Cloud state update for ZooKeeper already scheduled");
         return;
       }
@@ -142,15 +240,31 @@ public class ZkStateReader {
       updateCloudExecutor.schedule(new Runnable() {
         
         public void run() {
-          log.info("Updating cloud state from ZooKeeper...");
+          log.info("Updating cluster state from ZooKeeper...");
           synchronized (getUpdateLock()) {
             cloudStateUpdateScheduled = false;
-            CloudState cloudState;
+            CloudState clusterState;
             try {
-              cloudState = CloudState.buildCloudState(zkClient,
-                  ZkStateReader.this.cloudState, onlyLiveNodes);
+              if (!onlyLiveNodes) {
+                log.info("Updating cloud state from ZooKeeper... ");
+                byte[] data = zkClient.getData(ZkStateReader.CLUSTER_STATE,
+                    null, null);
+                
+                clusterState = CloudState.load(data);
+              } else {
+                log.info("Updating live nodes from ZooKeeper... ");
+                clusterState = ZkStateReader.this.cloudState;
+              }
+              
+              List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE,
+                  null);
+              clusterState.setLiveNodes(liveNodes);
+              // update volatile
+              ZkStateReader.this.cloudState = clusterState;
+              
             } catch (KeeperException e) {
-              if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+              if (e.code() == KeeperException.Code.SESSIONEXPIRED
+                  || e.code() == KeeperException.Code.CONNECTIONLOSS) {
                 log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
                 return;
               }
@@ -163,10 +277,6 @@ public class ZkStateReader {
               log.error("", e);
               throw new ZooKeeperException(
                   SolrException.ErrorCode.SERVER_ERROR, "", e);
-            } catch (IOException e) {
-              log.error("", e);
-              throw new ZooKeeperException(
-                  SolrException.ErrorCode.SERVER_ERROR, "", e);
             }
             // update volatile
             ZkStateReader.this.cloudState = cloudState;
@@ -174,123 +284,9 @@ public class ZkStateReader {
         }
       }, CLOUD_UPDATE_DELAY, TimeUnit.MILLISECONDS);
     }
-
-  }
-  
-  public void makeShardZkNodeWatches(boolean makeWatchesForReconnect) throws KeeperException, InterruptedException {
-    CloudState cloudState = getCloudState();
     
-    Set<String> knownCollections = cloudState.getCollections();
-    List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
-
-    for(final String collection : collections) {
-      if(makeWatchesForReconnect || !knownCollections.contains(collection)) {
-        log.info("Found new collection:" + collection);
-        Watcher watcher = new Watcher() {
-          public void process(WatchedEvent event) {
-            log.info("Detected changed ShardId in collection:" + collection);
-            try {
-              makeShardsWatches(collection, false);
-              updateCloudState(false);
-            } catch (KeeperException e) {
-              if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
-                log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
-                return;
-              }
-              log.warn("", e);
-            } catch (InterruptedException e) {
-              // Restore the interrupted status
-              Thread.currentThread().interrupt();
-              log.warn("", e);
-            } catch (IOException e) {
-              log.warn("", e);
-            }
-          }
-        };
-        boolean madeWatch = true;
-        String shardZkNode = COLLECTIONS_ZKNODE + "/" + collection
-            + SHARDS_ZKNODE;
-        for (int i = 0; i < 5; i++) {
-          try {
-            zkClient.getChildren(shardZkNode, watcher);
-          } catch (KeeperException.NoNodeException e) {
-            // most likely, the collections node has been created, but not the
-            // shards node yet -- pause and try again
-            madeWatch = false;
-            if (i == 4) {
-              log.error("Could not set shards zknode watch, because the zknode does not exist:" + shardZkNode);
-              break;
-            }
-            Thread.sleep(100);
-          }
-          if (madeWatch) {
-            log.info("Made shard watch:" + shardZkNode);
-            break;
-          }
-        }
-      }
-    }
   }
-  
-  public void makeShardsWatches(final String collection, boolean makeWatchesForReconnect) throws KeeperException,
-      InterruptedException {
-    if (zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE)) {
-      List<String> shardIds = zkClient.getChildren(COLLECTIONS_ZKNODE + "/"
-          + collection + SHARDS_ZKNODE, null);
-      CloudState cloudState = getCloudState();
-      Set<String> knownShardIds;
-      Map<String,Slice> slices = cloudState.getSlices(collection);
-      if (slices != null) {
-        knownShardIds = slices.keySet();
-      } else {
-        knownShardIds = new HashSet<String>(0);
-      }
-      for (final String shardId : shardIds) {
-        if (makeWatchesForReconnect || !knownShardIds.contains(shardId)) {
-          zkClient.getChildren(COLLECTIONS_ZKNODE + "/" + collection
-              + SHARDS_ZKNODE + "/" + shardId, new Watcher() {
-
-            public void process(WatchedEvent event) {
-              log.info("Detected a shard change under ShardId:" + shardId + " in collection:" + collection);
-              try {
-                updateCloudState(false);
-              } catch (KeeperException e) {
-                if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
-                  log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
-                  return;
-                }
-                log.error("", e);
-                throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-                    "", e);
-              } catch (InterruptedException e) {
-                // Restore the interrupted status
-                Thread.currentThread().interrupt();
-                log.error("", e);
-                throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-                    "", e);
-              } catch (IOException e) {
-                log.error("", e);
-                throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-                    "", e);
-              }
-            }
-          });
-        }
-      }
-    }
-  }
-  
-  /**
-   * @throws KeeperException
-   * @throws InterruptedException
-   */
-  public void makeShardsWatches(boolean makeWatchesForReconnect) throws KeeperException, InterruptedException {
-    List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
-    for (final String collection : collections) {
-      makeShardsWatches(collection, makeWatchesForReconnect);
-    }
-  }
-  
+   
   /**
    * @return information about the cluster from ZooKeeper
    */
@@ -315,74 +311,13 @@ public class ZkStateReader {
       }
     }
   }
+  
+  abstract class RunnableWatcher implements Runnable {
+		Watcher watcher;
+		public RunnableWatcher(Watcher watcher){
+			this.watcher = watcher;
+		}
 
-  public void makeCollectionsNodeWatches() throws KeeperException, InterruptedException {
-    log.info("Start watching collections zk node for changes");
-    zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, new Watcher(){
-
-      public void process(WatchedEvent event) {
-          try {
-
-            log.info("Detected a new or removed collection");
-            synchronized (getUpdateLock()) {
-              makeShardZkNodeWatches(false);
-              updateCloudState(false);
-            }
-            // re-watch
-            String path = event.getPath();
-            if (path != null) {
-              zkClient.getChildren(path, this);
-            }
-          } catch (KeeperException e) {
-            if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
-              log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
-              return;
-            }
-            log.warn("", e);
-          } catch (InterruptedException e) {
-            // Restore the interrupted status
-            Thread.currentThread().interrupt();
-            log.warn("", e);
-          } catch (IOException e) {
-            log.warn("", e);
-          }
-
-      }});
-    
-    zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE, new Watcher(){
-
-      public void process(WatchedEvent event) {
-        if(event.getType() !=  EventType.NodeDataChanged) {
-          return;
-        }
-        log.info("Notified of CloudState change");
-        try {
-          synchronized (getUpdateLock()) {
-            makeShardZkNodeWatches(false);
-            updateCloudState(false);
-          }
-          zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE, this);
-        } catch (KeeperException e) {
-          if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
-            log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
-            return;
-          }
-          log.error("", e);
-          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-              "", e);
-        } catch (InterruptedException e) {
-          // Restore the interrupted status
-          Thread.currentThread().interrupt();
-          log.error("", e);
-          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-              "", e);
-        } catch (IOException e) {
-          log.error("", e);
-          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-              "", e);
-        }
-        
-      }});
-    
-  }
+	}
+  
 }