You are viewing a plain text version of this content. The canonical link for it is here.
Posted to solr-commits@lucene.apache.org by ma...@apache.org on 2010/02/12 18:59:42 UTC

svn commit: r909537 - in /lucene/solr/branches/cloud/src: common/org/apache/solr/common/cloud/ java/org/apache/solr/cloud/ java/org/apache/solr/handler/component/ solrj/org/apache/solr/client/solrj/impl/ test/org/apache/solr/cloud/

Author: markrmiller
Date: Fri Feb 12 17:59:41 2010
New Revision: 909537

URL: http://svn.apache.org/viewvc?rev=909537&view=rev
Log:
more reorganization for solrj

Added:
    lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/
    lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/CloudState.java
      - copied, changed from r909490, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java
    lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/ConnectionManager.java
      - copied, changed from r909490, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java
    lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
      - copied, changed from r909490, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java
    lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/OnReconnect.java
    lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/SolrZkClient.java
      - copied, changed from r909490, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java
    lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/SolrZooKeeper.java
      - copied, changed from r909490, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZooKeeper.java
    lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/ZkStateReader.java
      - copied, changed from r909510, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkStateReader.java
Removed:
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZooKeeper.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkStateReader.java
Modified:
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudDescriptor.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkClientConnectionStrategy.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/QueryComponent.java
    lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/CloudSolrServer.java
    lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
    lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java
    lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
    lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java
    lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java

Copied: lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/CloudState.java (from r909490, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java)
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/CloudState.java?p2=lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/CloudState.java&p1=lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java&r1=909490&r2=909537&rev=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java (original)
+++ lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/CloudState.java Fri Feb 12 17:59:41 2010
@@ -1,4 +1,4 @@
-package org.apache.solr.cloud;
+package org.apache.solr.common.cloud;
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -25,6 +25,10 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.solr.cloud.Slice;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkNodeProps;
+import org.apache.solr.cloud.ZooKeeperException;
 import org.apache.solr.common.SolrException;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -72,12 +76,12 @@
     Map<String,Map<String,Slice>> collectionStates;
     if (!onlyLiveNodes) {
       List<String> collections = zkClient.getChildren(
-          ZkController.COLLECTIONS_ZKNODE, null);
+          ZkStateReader.COLLECTIONS_ZKNODE, null);
 
       collectionStates = new HashMap<String,Map<String,Slice>>();
       for (String collection : collections) {
-        String shardIdPaths = ZkController.COLLECTIONS_ZKNODE + "/"
-            + collection + ZkController.SHARDS_ZKNODE;
+        String shardIdPaths = ZkStateReader.COLLECTIONS_ZKNODE + "/"
+            + collection + ZkStateReader.SHARDS_ZKNODE;
         List<String> shardIdNames;
         try {
           shardIdNames = zkClient.getChildren(shardIdPaths, null);
@@ -137,7 +141,7 @@
   }
   
   private static Set<String> getLiveNodes(SolrZkClient zkClient) throws KeeperException, InterruptedException {
-    List<String> liveNodes = zkClient.getChildren(ZkController.NODES_ZKNODE, null);
+    List<String> liveNodes = zkClient.getChildren(ZkStateReader.LIVE_NODES_ZKNODE, null);
     Set<String> liveNodesSet = new HashSet<String>(liveNodes.size());
     liveNodesSet.addAll(liveNodes);
 

Copied: lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/ConnectionManager.java (from r909490, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java)
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/ConnectionManager.java?p2=lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/ConnectionManager.java&p1=lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java&r1=909490&r2=909537&rev=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java (original)
+++ lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/ConnectionManager.java Fri Feb 12 17:59:41 2010
@@ -1,4 +1,4 @@
-package org.apache.solr.cloud;
+package org.apache.solr.common.cloud;
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -21,7 +21,8 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.solr.cloud.SolrZkClient.OnReconnect;
+import org.apache.solr.cloud.ZkClientConnectionStrategy;
+import org.apache.solr.cloud.ZkClientConnectionStrategy.ZkUpdate;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.KeeperState;

Copied: lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/DefaultConnectionStrategy.java (from r909490, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java)
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/DefaultConnectionStrategy.java?p2=lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/DefaultConnectionStrategy.java&p1=lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java&r1=909490&r2=909537&rev=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java (original)
+++ lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/DefaultConnectionStrategy.java Fri Feb 12 17:59:41 2010
@@ -1,4 +1,4 @@
-package org.apache.solr.cloud;
+package org.apache.solr.common.cloud;
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -23,6 +23,8 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.solr.cloud.ZkClientConnectionStrategy;
+import org.apache.solr.cloud.ZkClientConnectionStrategy.ZkUpdate;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

Added: lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/OnReconnect.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/OnReconnect.java?rev=909537&view=auto
==============================================================================
--- lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/OnReconnect.java (added)
+++ lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/OnReconnect.java Fri Feb 12 17:59:41 2010
@@ -0,0 +1,5 @@
+package org.apache.solr.common.cloud;
+
+public interface OnReconnect {
+  public void command();
+}

Copied: lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/SolrZkClient.java (from r909490, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java)
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/SolrZkClient.java?p2=lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/SolrZkClient.java&p1=lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java&r1=909490&r2=909537&rev=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java (original)
+++ lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/SolrZkClient.java Fri Feb 12 17:59:41 2010
@@ -1,4 +1,4 @@
-package org.apache.solr.cloud;
+package org.apache.solr.common.cloud;
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -24,6 +24,8 @@
 import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.solr.cloud.ZkClientConnectionStrategy;
+import org.apache.solr.cloud.ZooKeeperException;
 import org.apache.solr.cloud.ZkClientConnectionStrategy.ZkUpdate;
 import org.apache.solr.common.SolrException;
 import org.apache.zookeeper.CreateMode;
@@ -44,10 +46,6 @@
  */
 public class SolrZkClient {
   static final String NEWL = System.getProperty("line.separator");
-  
-  public static interface OnReconnect {
-    public void command();
-  }
 
   static final int DEFAULT_CLIENT_CONNECT_TIMEOUT = 5000;
 
@@ -483,5 +481,9 @@
   void updateKeeper(SolrZooKeeper keeper) {
    this.keeper = keeper;
   }
+  
+  public SolrZooKeeper getSolrZooKeeper() {
+    return keeper;
+  }
 
 }

Copied: lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/SolrZooKeeper.java (from r909490, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZooKeeper.java)
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/SolrZooKeeper.java?p2=lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/SolrZooKeeper.java&p1=lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZooKeeper.java&r1=909490&r2=909537&rev=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZooKeeper.java (original)
+++ lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/SolrZooKeeper.java Fri Feb 12 17:59:41 2010
@@ -1,4 +1,4 @@
-package org.apache.solr.cloud;
+package org.apache.solr.common.cloud;
 
 import java.io.IOException;
 
@@ -14,7 +14,7 @@
     // TODO Auto-generated constructor stub
   }
   
-  protected ClientCnxn getConnection() {
+  public ClientCnxn getConnection() {
     return cnxn;
   }
 

Copied: lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/ZkStateReader.java (from r909510, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkStateReader.java)
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/ZkStateReader.java?p2=lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/ZkStateReader.java&p1=lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkStateReader.java&r1=909510&r2=909537&rev=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkStateReader.java (original)
+++ lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/ZkStateReader.java Fri Feb 12 17:59:41 2010
@@ -1,4 +1,4 @@
-package org.apache.solr.cloud;
+package org.apache.solr.common.cloud;
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -20,21 +20,30 @@
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.solr.cloud.SolrZkClient.OnReconnect;
+import org.apache.solr.cloud.Slice;
+import org.apache.solr.cloud.ZooKeeperException;
 import org.apache.solr.common.SolrException;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ZkStateReader {
   private static Logger log = LoggerFactory.getLogger(ZkStateReader.class);
   
+  public static final String COLLECTIONS_ZKNODE = "/collections";
+  public static final String SHARDS_ZKNODE = "/shards";
+  public static final String LIVE_NODES_ZKNODE = "/live_nodes";
+  
   private volatile CloudState cloudState  = new CloudState(new HashSet<String>(0), new HashMap<String,Map<String,Slice>>(0));
   
   private static final long CLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("CLOUD_UPDATE_DELAY", "5000"));
@@ -45,11 +54,14 @@
 
   private SolrZkClient zkClient;
   
+  private boolean closeClient = false;
+  
   public ZkStateReader(SolrZkClient zkClient) {
     this.zkClient = zkClient;
   }
   
   public ZkStateReader(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout) throws InterruptedException, TimeoutException, IOException {
+    closeClient = true;
     zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout,
         // on reconnect, reload cloud info
         new OnReconnect() {
@@ -149,6 +161,114 @@
 
   }
   
+  public void addShardZkNodeWatches() throws KeeperException, InterruptedException {
+    CloudState cloudState = getCloudState();
+    Set<String> knownCollections = cloudState.getCollections();
+    
+    List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
+
+    for(final String collection : collections) {
+      if(!knownCollections.contains(collection)) {
+        log.info("Found new collection:" + collection);
+        Watcher watcher = new Watcher() {
+          public void process(WatchedEvent event) {
+            log.info("Detected changed ShardId in collection:" + collection);
+            try {
+              addShardsWatches(collection);
+              updateCloudState(false);
+            } catch (KeeperException e) {
+              log.error("", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
+            } catch (InterruptedException e) {
+              // Restore the interrupted status
+              Thread.currentThread().interrupt();
+              log.error("", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
+            } catch (IOException e) {
+              log.error("", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
+            }
+          }
+        };
+        boolean madeWatch = true;
+        String shardZkNode = COLLECTIONS_ZKNODE + "/" + collection
+            + SHARDS_ZKNODE;
+        for (int i = 0; i < 5; i++) {
+          try {
+            zkClient.getChildren(shardZkNode, watcher);
+          } catch (KeeperException.NoNodeException e) {
+            // most likely, the collections node has been created, but not the
+            // shards node yet -- pause and try again
+            madeWatch = false;
+            if (i == 4) {
+              log.error("Could not set shards zknode watch, because the zknode does not exist:" + shardZkNode);
+              break;
+            }
+            Thread.sleep(100);
+          }
+          if (madeWatch) {
+            log.info("Made shard watch:" + shardZkNode);
+            break;
+          }
+        }
+      }
+    }
+  }
+  
+  public void addShardsWatches(final String collection) throws KeeperException,
+      InterruptedException {
+    if (zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE)) {
+      List<String> shardIds = zkClient.getChildren(COLLECTIONS_ZKNODE + "/"
+          + collection + SHARDS_ZKNODE, null);
+      CloudState cloudState = getCloudState();
+      Set<String> knownShardIds;
+      Map<String,Slice> slices = cloudState.getSlices(collection);
+      if (slices != null) {
+        knownShardIds = slices.keySet();
+      } else {
+        knownShardIds = new HashSet<String>(0);
+      }
+      for (final String shardId : shardIds) {
+        if (!knownShardIds.contains(shardId)) {
+          zkClient.getChildren(COLLECTIONS_ZKNODE + "/" + collection
+              + SHARDS_ZKNODE + "/" + shardId, new Watcher() {
+
+            public void process(WatchedEvent event) {
+              log.info("Detected a shard change under ShardId:" + shardId + " in collection:" + collection);
+              try {
+                updateCloudState(false);
+              } catch (KeeperException e) {
+                log.error("", e);
+                throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                    "", e);
+              } catch (InterruptedException e) {
+                // Restore the interrupted status
+                Thread.currentThread().interrupt();
+                log.error("", e);
+                throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                    "", e);
+              } catch (IOException e) {
+                log.error("", e);
+                throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                    "", e);
+              }
+            }
+          });
+        }
+      }
+    }
+  }
+  
+  public void addShardsWatches() throws KeeperException, InterruptedException {
+    List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
+    for (final String collection : collections) {
+      addShardsWatches(collection);
+    }
+  }
+  
   /**
    * @return information about the cluster from ZooKeeper
    */
@@ -159,4 +279,18 @@
   public Object getUpdateLock() {
     return this;
   }
+
+  public void close() {
+    if (closeClient) {
+      try {
+        zkClient.close();
+      } catch (InterruptedException e) {
+        // Restore the interrupted status
+        Thread.currentThread().interrupt();
+        log.error("", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+            e);
+      }
+    }
+  }
 }

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudDescriptor.java?rev=909537&r1=909536&r2=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudDescriptor.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudDescriptor.java Fri Feb 12 17:59:41 2010
@@ -1,11 +1,7 @@
 package org.apache.solr.cloud;
 
-import org.apache.solr.common.params.MapSolrParams;
 import org.apache.solr.common.params.SolrParams;
 
-import java.util.Collections;
-import java.util.HashMap;
-
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkClientConnectionStrategy.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkClientConnectionStrategy.java?rev=909537&r1=909536&r2=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkClientConnectionStrategy.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkClientConnectionStrategy.java Fri Feb 12 17:59:41 2010
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.solr.common.cloud.SolrZooKeeper;
 import org.apache.zookeeper.Watcher;
 
 /**

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java?rev=909537&r1=909536&r2=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java Fri Feb 12 17:59:41 2010
@@ -20,18 +20,17 @@
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
-import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.solr.cloud.SolrZkClient.OnReconnect;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.OnReconnect;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -63,10 +62,8 @@
 
 
   // package private for tests
-  static final String SHARDS_ZKNODE = "/shards";
+
   static final String CONFIGS_ZKNODE = "/configs";
-  static final String COLLECTIONS_ZKNODE = "/collections";
-  static final String NODES_ZKNODE = "/live_nodes";
 
   public static final String URL_PROP = "url";
   public static final String NODE_NAME = "node_name";
@@ -85,10 +82,6 @@
   private String localHost;
 
   private String hostName;
-  
-
-
-  private boolean readonly;  // temporary hack to enable reuse in SolrJ client
 
   /**
    * @param zkServerAddress ZooKeeper server host address
@@ -108,7 +101,6 @@
     this.localHostPort = locaHostPort;
     this.localHostContext = localHostContext;
     this.localHost = localHost;
-    this.readonly = localHostPort==null;
 
     zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout,
         // on reconnect, reload cloud info
@@ -150,7 +142,7 @@
    */
   private void addZkShardsNode(String shardId, String collection) throws IOException, InterruptedException, KeeperException {
 
-    String shardsZkPath = COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE + "/" + shardId;
+    String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + ZkStateReader.SHARDS_ZKNODE + "/" + shardId;
     
     try {
       
@@ -164,7 +156,7 @@
         
         // nocommit - scrutinize
         // ping that there is a new shardId
-        zkClient.setData(COLLECTIONS_ZKNODE, (byte[])null);
+        zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
 
       }
     } catch (KeeperException e) {
@@ -280,7 +272,7 @@
       
       // makes nodes zkNode
       try {
-        zkClient.makePath(NODES_ZKNODE);
+        zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE);
       } catch (KeeperException e) {
         // its okay if another beats us creating the node
         if (e.code() != KeeperException.Code.NODEEXISTS) {
@@ -314,7 +306,7 @@
   private void createEphemeralLiveNode() throws KeeperException,
       InterruptedException {
     String nodeName = getNodeName();
-    String nodePath = NODES_ZKNODE + "/" + nodeName;
+    String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
     log.info("Register node as live in ZooKeeper:" + nodePath);
     Watcher liveNodeWatcher = new Watcher() {
 
@@ -347,26 +339,24 @@
       
     };
     try {
-      if (!readonly) {
-        boolean nodeDeleted = true;
-        try {
-          // we attempt a delete in the case of a quick server bounce -
-          // if there was not a graceful shutdown, the node may exist
-          // until expiration timeout - so a node won't be created here because
-          // it exists, but eventually the node will be removed. So delete
-          // in case it exists and create a new node.
-          zkClient.delete(nodePath, -1);
-        } catch (KeeperException.NoNodeException e) {
-          // fine if there is nothing to delete
-          nodeDeleted = false;
-        }
-        if (nodeDeleted) {
-          log
-              .info("Found a previous node that still exists while trying to register a new live node "
-                  + nodePath + " - removing existing node to create another.");
-        }
-        zkClient.makePath(nodePath, CreateMode.EPHEMERAL);
+      boolean nodeDeleted = true;
+      try {
+        // we attempt a delete in the case of a quick server bounce -
+        // if there was not a graceful shutdown, the node may exist
+        // until expiration timeout - so a node won't be created here because
+        // it exists, but eventually the node will be removed. So delete
+        // in case it exists and create a new node.
+        zkClient.delete(nodePath, -1);
+      } catch (KeeperException.NoNodeException e) {
+        // fine if there is nothing to delete
+        nodeDeleted = false;
+      }
+      if (nodeDeleted) {
+        log
+            .info("Found a previous node that still exists while trying to register a new live node "
+                + nodePath + " - removing existing node to create another.");
       }
+      zkClient.makePath(nodePath, CreateMode.EPHEMERAL);
     } catch (KeeperException e) {
       // its okay if the node already exists
       if (e.code() != KeeperException.Code.NODEEXISTS) {
@@ -374,15 +364,13 @@
       }
       System.out.println("NODE ALREADY EXISTS");
     }
-    zkClient.getChildren(NODES_ZKNODE, liveNodeWatcher);
+    zkClient.getChildren(ZkStateReader.LIVE_NODES_ZKNODE, liveNodeWatcher);
   }
   
   public String getNodeName() {
     return hostName + ":" + localHostPort + "_"+ localHostContext;
   }
 
-
-
   /**
    * @param path
    * @return
@@ -406,7 +394,7 @@
 
     String configName = null;
 
-    String path = COLLECTIONS_ZKNODE + "/" + collection;
+    String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
     if (log.isInfoEnabled()) {
       log.info("Load collection config from:" + path);
     }
@@ -444,7 +432,7 @@
     
     String collection = cloudDesc.getCollectionName();
     
-    String shardsZkPath = COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE + "/" + cloudDesc.getShardId();
+    String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + ZkStateReader.SHARDS_ZKNODE + "/" + cloudDesc.getShardId();
 
     boolean shardZkNodeAlreadyExists = zkClient.exists(shardsZkPath);
     
@@ -469,14 +457,14 @@
     if(shardZkNodeAlreadyExists && forcePropsUpdate) {
       zkClient.setData(shardsZkPath + "/" + shardZkNodeName, bytes);
       // tell everyone to update cloud info
-      zkClient.setData(COLLECTIONS_ZKNODE, (byte[])null);
+      zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
     } else {
       addZkShardsNode(cloudDesc.getShardId(), collection);
       try {
         zkClient.create(shardsZkPath + "/" + shardZkNodeName, bytes,
             CreateMode.PERSISTENT);
         // tell everyone to update cloud info
-        zkClient.setData(COLLECTIONS_ZKNODE, (byte[])null);
+        zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
       } catch (KeeperException e) {
         // its okay if the node already exists
         if (e.code() != KeeperException.Code.NODEEXISTS) {
@@ -536,12 +524,12 @@
 
   private void setUpCollectionsNode() throws KeeperException, InterruptedException {
     try {
-      if (!zkClient.exists(COLLECTIONS_ZKNODE) && !readonly) {
+      if (!zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE)) {
         if (log.isInfoEnabled()) {
-          log.info("creating zk collections node:" + COLLECTIONS_ZKNODE);
+          log.info("creating zk collections node:" + ZkStateReader.COLLECTIONS_ZKNODE);
         }
         // makes collections zkNode if it doesn't exist
-        zkClient.makePath(COLLECTIONS_ZKNODE, CreateMode.PERSISTENT, null);
+        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE, CreateMode.PERSISTENT, null);
       }
     } catch (KeeperException e) {
       // its okay if another beats us creating the node
@@ -559,13 +547,13 @@
     }
     
     log.info("Start watching collections zk node for changes");
-    zkClient.getChildren(COLLECTIONS_ZKNODE, new Watcher(){
+    zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, new Watcher(){
 
       public void process(WatchedEvent event) {
           try {
             log.info("Detected a new or removed collection");
             synchronized (zkStateReader.getUpdateLock()) {
-              addShardZkNodeWatches();
+              zkStateReader.addShardZkNodeWatches();
               zkStateReader.updateCloudState(false);
             }
             // re-watch
@@ -588,7 +576,7 @@
 
       }});
     
-    zkClient.exists(COLLECTIONS_ZKNODE, new Watcher(){
+    zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE, new Watcher(){
 
       public void process(WatchedEvent event) {
         if(event.getType() !=  EventType.NodeDataChanged) {
@@ -597,10 +585,10 @@
         log.info("Notified of CloudState change");
         try {
           synchronized (zkStateReader.getUpdateLock()) {
-            addShardZkNodeWatches();
+            zkStateReader.addShardZkNodeWatches();
             zkStateReader.updateCloudState(false);
           }
-          zkClient.exists(COLLECTIONS_ZKNODE, this);
+          zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE, this);
         } catch (KeeperException e) {
           log.error("", e);
           throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
@@ -619,120 +607,12 @@
         
       }});
   }
-  
-  public void addShardZkNodeWatches() throws KeeperException, InterruptedException {
-    CloudState cloudState = getCloudState();
-    Set<String> knownCollections = cloudState.getCollections();
-    
-    List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
-
-    for(final String collection : collections) {
-      if(!knownCollections.contains(collection)) {
-        log.info("Found new collection:" + collection);
-        Watcher watcher = new Watcher() {
-          public void process(WatchedEvent event) {
-            log.info("Detected changed ShardId in collection:" + collection);
-            try {
-              addShardsWatches(collection);
-              zkStateReader.updateCloudState(false);
-            } catch (KeeperException e) {
-              log.error("", e);
-              throw new ZooKeeperException(
-                  SolrException.ErrorCode.SERVER_ERROR, "", e);
-            } catch (InterruptedException e) {
-              // Restore the interrupted status
-              Thread.currentThread().interrupt();
-              log.error("", e);
-              throw new ZooKeeperException(
-                  SolrException.ErrorCode.SERVER_ERROR, "", e);
-            } catch (IOException e) {
-              log.error("", e);
-              throw new ZooKeeperException(
-                  SolrException.ErrorCode.SERVER_ERROR, "", e);
-            }
-          }
-        };
-        boolean madeWatch = true;
-        String shardZkNode = COLLECTIONS_ZKNODE + "/" + collection
-            + SHARDS_ZKNODE;
-        for (int i = 0; i < 5; i++) {
-          try {
-            zkClient.getChildren(shardZkNode, watcher);
-          } catch (KeeperException.NoNodeException e) {
-            // most likely, the collections node has been created, but not the
-            // shards node yet -- pause and try again
-            madeWatch = false;
-            if (i == 4) {
-              log.error("Could not set shards zknode watch, because the zknode does not exist:" + shardZkNode);
-              break;
-            }
-            Thread.sleep(100);
-          }
-          if (madeWatch) {
-            log.info("Made shard watch:" + shardZkNode);
-            break;
-          }
-        }
-      }
-    }
-  }
-  
-  public void addShardsWatches(final String collection) throws KeeperException,
-      InterruptedException {
-    if (zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE)) {
-      List<String> shardIds = zkClient.getChildren(COLLECTIONS_ZKNODE + "/"
-          + collection + SHARDS_ZKNODE, null);
-      CloudState cloudState = getCloudState();
-      Set<String> knownShardIds;
-      Map<String,Slice> slices = cloudState.getSlices(collection);
-      if (slices != null) {
-        knownShardIds = slices.keySet();
-      } else {
-        knownShardIds = new HashSet<String>(0);
-      }
-      for (final String shardId : shardIds) {
-        if (!knownShardIds.contains(shardId)) {
-          zkClient.getChildren(COLLECTIONS_ZKNODE + "/" + collection
-              + SHARDS_ZKNODE + "/" + shardId, new Watcher() {
-
-            public void process(WatchedEvent event) {
-              log.info("Detected a shard change under ShardId:" + shardId + " in collection:" + collection);
-              try {
-                zkStateReader.updateCloudState(false);
-              } catch (KeeperException e) {
-                log.error("", e);
-                throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-                    "", e);
-              } catch (InterruptedException e) {
-                // Restore the interrupted status
-                Thread.currentThread().interrupt();
-                log.error("", e);
-                throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-                    "", e);
-              } catch (IOException e) {
-                log.error("", e);
-                throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-                    "", e);
-              }
-            }
-          });
-        }
-      }
-    }
-  }
-  
-  public void addShardsWatches() throws KeeperException, InterruptedException {
-    List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
-    for (final String collection : collections) {
-      addShardsWatches(collection);
-    }
-  }
 
   public void createCollectionZkNode(CloudDescriptor cd) throws KeeperException, InterruptedException, IOException {
     String collection = cd.getCollectionName();
     
     log.info("Check for collection zkNode:" + collection);
-    String collectionPath = COLLECTIONS_ZKNODE + "/" + collection;
+    String collectionPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
     
     try {
       if(!zkClient.exists(collectionPath)) {
@@ -800,7 +680,7 @@
           zkClient.makePath(collectionPath, collectionProps.store(), CreateMode.PERSISTENT, null, true);
          
           // ping that there is a new collection
-          zkClient.setData(COLLECTIONS_ZKNODE, (byte[])null);
+          zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
         } catch (KeeperException e) {
           // its okay if the node already exists
           if (e.code() != KeeperException.Code.NODEEXISTS) {

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/QueryComponent.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/QueryComponent.java?rev=909537&r1=909536&r2=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/QueryComponent.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/QueryComponent.java Fri Feb 12 17:59:41 2010
@@ -27,6 +27,7 @@
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.CloudState;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.ShardParams;

Modified: lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=909537&r1=909536&r2=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/CloudSolrServer.java (original)
+++ lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/CloudSolrServer.java Fri Feb 12 17:59:41 2010
@@ -1,20 +1,31 @@
 package org.apache.solr.client.solrj.impl;
 
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServer;
 import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.cloud.*;
+import org.apache.solr.cloud.Slice;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkNodeProps;
+import org.apache.solr.cloud.ZooKeeperException;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.NamedList;
 import org.apache.zookeeper.KeeperException;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.util.*;
-import java.util.concurrent.TimeoutException;
-
 public class CloudSolrServer extends SolrServer {
-  private volatile ZkController zkController;
+  private volatile ZkStateReader zkStateReader;
   private String zkHost; // the zk server address
   private int zkConnectTimeout = 10000;
   private int zkClientTimeout = 10000;
@@ -61,14 +72,15 @@
    * @throws InterruptedException
    */
   public void connect() {
-    if (zkController != null) return;
+    if (zkStateReader != null) return;
     synchronized(this) {
-      if (zkController != null) return;
+      if (zkStateReader != null) return;
       try {
-        ZkController zk = new ZkController(zkHost, zkConnectTimeout, zkClientTimeout, null, null, null);
+        ZkStateReader zk = new ZkStateReader(zkHost, zkConnectTimeout, zkClientTimeout);
+        // nocommit : deal with other watches
         zk.addShardZkNodeWatches();
-        zk.getZkStateReader().updateCloudState(true);
-        zkController = zk;
+        zk.updateCloudState(true);
+        zkStateReader = zk;
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
@@ -89,7 +101,7 @@
   public NamedList<Object> request(SolrRequest request) throws SolrServerException, IOException {
     connect();
 
-    CloudState cloudState = zkController.getCloudState();
+    CloudState cloudState = zkStateReader.getCloudState();
 
     String collection = request.getParams().get("collection", defaultCollection);
 
@@ -126,11 +138,11 @@
   }
 
   public void close() {
-    if (zkController != null) {
+    if (zkStateReader != null) {
       synchronized(this) {
-        if (zkController != null)
-          zkController.close();
-        zkController = null;
+        if (zkStateReader!= null)
+          zkStateReader.close();
+        zkStateReader = null;
       }
     }
   }

Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java?rev=909537&r1=909536&r2=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java Fri Feb 12 17:59:41 2010
@@ -21,6 +21,7 @@
 import java.util.HashSet;
 
 import org.apache.solr.BaseDistributedSearchTestCase;
+import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.util.TestHarness;
 

Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java?rev=909537&r1=909536&r2=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java Fri Feb 12 17:59:41 2010
@@ -19,6 +19,7 @@
 
 import java.io.File;
 
+import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.util.AbstractSolrTestCase;
 import org.apache.solr.util.TestHarness;

Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java?rev=909537&r1=909536&r2=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java Fri Feb 12 17:59:41 2010
@@ -23,6 +23,8 @@
 
 import junit.framework.TestCase;
 
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
@@ -185,7 +187,7 @@
 
     // quickly kill / start client
 
-    container2.getZkController().getZkClient().keeper.getConnection()
+    container2.getZkController().getZkClient().getSolrZooKeeper().getConnection()
         .disconnect();
     container2.shutdown();
 

Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java?rev=909537&r1=909536&r2=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java Fri Feb 12 17:59:41 2010
@@ -23,6 +23,9 @@
 
 import junit.framework.TestCase;
 
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 
@@ -79,7 +82,7 @@
       zkController = new ZkController(AbstractZkTestCase.ZOO_KEEPER_ADDRESS,
           TIMEOUT, 1000, "localhost", "8983", "/solr");
  
-      zkController.updateCloudState(true);
+      zkController.getZkStateReader().updateCloudState(true);
       CloudState cloudInfo = zkController.getCloudState();
       Map<String,Slice> slices = cloudInfo.getSlices("collection1");
       assertNotNull(slices);
@@ -141,7 +144,7 @@
       
       ZkNodeProps props = new ZkNodeProps();
       props.put("configName", actualConfigName);
-      zkClient.makePath(ZkController.COLLECTIONS_ZKNODE + "/" + COLLECTION_NAME , props.store(), CreateMode.PERSISTENT);
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + COLLECTION_NAME , props.store(), CreateMode.PERSISTENT);
 
       if (DEBUG) {
         zkClient.printLayoutToStdOut();

Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java?rev=909537&r1=909536&r2=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java Fri Feb 12 17:59:41 2010
@@ -21,6 +21,7 @@
 
 import junit.framework.TestCase;
 
+import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;