You are viewing a plain text version of this content. The canonical link for it is here.
Posted to solr-commits@lucene.apache.org by ma...@apache.org on 2010/01/12 15:56:49 UTC

svn commit: r898348 - in /lucene/solr/branches/cloud/src: java/org/apache/solr/cloud/ java/org/apache/solr/core/ java/org/apache/solr/handler/component/ test/org/apache/solr/cloud/

Author: markrmiller
Date: Tue Jan 12 14:56:48 2010
New Revision: 898348

URL: http://svn.apache.org/viewvc?rev=898348&view=rev
Log:
many updates and additions

Added:
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudDescriptor.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java
Modified:
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardInfo.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreDescriptor.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/core/SolrCore.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/QueryElevationComponent.java
    lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
    lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java
    lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
    lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java
    lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java
    lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java

Added: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudDescriptor.java?rev=898348&view=auto
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudDescriptor.java (added)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudDescriptor.java Tue Jan 12 14:56:48 2010
@@ -0,0 +1,50 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class CloudDescriptor {
+  // nocommit : zk
+  private String shardList;
+  private String collectionName;
+  private String role = "none";
+  
+  public String getRole() {
+    return role;
+  }
+
+  public void setRole(String role) {
+    this.role = role;
+  }
+
+  public void setShardList(String shardList) {
+    this.shardList = shardList;
+  }
+  
+  //nocommit: may be null
+  public String getShardList() {
+    return shardList;
+  }
+  
+  public String getCollectionName() {
+    return collectionName;
+  }
+
+  public void setCollectionName(String collectionName) {
+    this.collectionName = collectionName;
+  }
+}

Added: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java?rev=898348&view=auto
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java (added)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java Tue Jan 12 14:56:48 2010
@@ -0,0 +1,34 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class CloudInfo {
+  private Map<String,CollectionInfo> collectionInfos = new HashMap<String,CollectionInfo>();
+  
+  //nocommit
+  public void addCollectionInfo(String collection, CollectionInfo collectionInfo) {
+    collectionInfos.put(collection, collectionInfo);
+  }
+  
+  public CollectionInfo getCollectionInfo(String collection) {
+    return collectionInfos.get(collection);
+  }
+}

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java Tue Jan 12 14:56:48 2010
@@ -40,8 +40,8 @@
       .getLogger(CollectionInfo.class);
   
   static final String SHARD_LIST_PROP = "shard_list";
-
   static final String URL_PROP = "url";
+  static final String ROLE_PROP = "role";
   
   // maps shard name to the shard addresses and roles
   private final Map<String,ShardInfoList> shardNameToShardInfoList;
@@ -77,7 +77,7 @@
     HashMap<String,ShardInfoList> shardNameToShardList = new HashMap<String,ShardInfoList>();
 
     if (zkClient.exists(path, null) == null) {
-      throw new IllegalStateException("Cannot find zk node that should exist:"
+      throw new IllegalStateException("Cannot find zk shards node that should exist:"
           + path);
     }
     List<String> nodes = zkClient.getChildren(path, null);

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java Tue Jan 12 14:56:48 2010
@@ -21,6 +21,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.solr.cloud.SolrZkClient.OnReconnect;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
@@ -45,12 +46,15 @@
 
   private SolrZkClient client;
 
-  public ConnectionManager(String name, SolrZkClient client, String zkServerAddress, int zkClientTimeout, ZkClientConnectionStrategy strat) {
+  private OnReconnect onReconnect;
+
+  public ConnectionManager(String name, SolrZkClient client, String zkServerAddress, int zkClientTimeout, ZkClientConnectionStrategy strat, OnReconnect onConnect) {
     this.name = name;
     this.client = client;
     this.connectionStrategy = strat;
     this.zkServerAddress = zkServerAddress;
     this.zkClientTimeout = zkClientTimeout;
+    this.onReconnect = onConnect;
     reset();
   }
 
@@ -62,7 +66,8 @@
 
   public synchronized void process(WatchedEvent event) {
     if (log.isInfoEnabled()) {
-      log.info("Watcher " + this + " name:" + name + " got event " + event);
+      log.info("Watcher " + this + " name:" + name + " got event " + event
+          + " path:" + event.getPath() + " type:" + event.getType());
     }
 
     state = event.getState();
@@ -72,9 +77,6 @@
     } else if (state == KeeperState.Expired) {
       connected = false;
       log.info("Attempting to reconnect to ZooKeeper...");
-      boolean connected = true;
-
-      // nocommit : close old ZooKeeper client?
 
       try {
         connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this, new ZkClientConnectionStrategy.ZkUpdate() {
@@ -82,26 +84,36 @@
           public void update(ZooKeeper keeper) throws InterruptedException, TimeoutException, IOException {
            waitForConnected(SolrZkClient.CONNECT_TIMEOUT);
            client.updateKeeper(keeper);
+           if(onReconnect != null) {
+             onReconnect.command();
+           }
            ConnectionManager.this.connected = true;
           }
         });
       } catch (Exception e) {
-        // TODO Auto-generated catch block
+        // nocommit
         e.printStackTrace();
       }
 
       log.info("Connected:" + connected);
-      // nocommit: start reconnect attempts
     } else if (state == KeeperState.Disconnected) {
+      if(connected == false) {
+        // nocommit
+        System.out.println("we already know we are dc'd - why are we notified twice?");
+        return;
+      }
       connected = false;
-      // nocommit: start reconnect attempts
-      
+      // nocommit: start reconnect attempts - problem if this is shutdown related?
+
       try {
         connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this, new ZkClientConnectionStrategy.ZkUpdate() {
           @Override
           public void update(ZooKeeper keeper) throws InterruptedException, TimeoutException, IOException {
            waitForConnected(SolrZkClient.CONNECT_TIMEOUT);
            client.updateKeeper(keeper);
+           if(onReconnect != null) {
+             onReconnect.command();
+           }
            ConnectionManager.this.connected = true;
           }
         });
@@ -124,9 +136,8 @@
     return state;
   }
 
-  public synchronized ZooKeeper waitForConnected(long waitForConnection)
+  public synchronized void waitForConnected(long waitForConnection)
       throws InterruptedException, TimeoutException, IOException {
-    ZooKeeper keeper = new ZooKeeper(zkServerAddress, zkClientTimeout, this);
     long expire = System.currentTimeMillis() + waitForConnection;
     long left = waitForConnection;
     while (!connected && left > 0) {
@@ -136,7 +147,6 @@
     if (!connected) {
       throw new TimeoutException("Could not connect to ZooKeeper within " + waitForConnection + " ms");
     }
-    return keeper;
   }
 
   public synchronized void waitForDisconnected(long timeout)

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java Tue Jan 12 14:56:48 2010
@@ -18,24 +18,49 @@
  */
 
 import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- *
+ * nocommit : default needs backoff retry reconnection attempts
  */
 public class DefaultConnectionStrategy extends ZkClientConnectionStrategy {
 
+  private static Logger log = LoggerFactory.getLogger(DefaultConnectionStrategy.class);
+  private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+  
   @Override
   public void connect(String serverAddress, int timeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException {
     updater.update(new ZooKeeper(serverAddress, timeout, watcher));
   }
 
   @Override
-  public void reconnect(String serverAddress, int timeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException {
-    updater.update(new ZooKeeper(serverAddress, timeout, watcher));
+  public void reconnect(final String serverAddress, final int zkClientTimeout,
+      final Watcher watcher, final ZkUpdate updater) throws IOException {
+    log.info("Starting reconnect to ZooKeeper attempts ...");
+    executor.scheduleAtFixedRate(new Runnable() {
+      public void run() {
+        log.info("Attempting the connect...");
+        try {
+          updater.update(new ZooKeeper(serverAddress, zkClientTimeout, watcher));
+          // nocommit
+          log.info("Reconnected to ZooKeeper");
+        } catch (Exception e) {
+          // nocommit
+          e.printStackTrace();
+          log.info("Reconnect to ZooKeeper failed");
+        }
+        executor.shutdownNow();
+        
+      }
+    }, 0, 1000, TimeUnit.MILLISECONDS); // nocommit : we actually want to do backoff retry
   }
 
 }

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardInfo.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardInfo.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardInfo.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardInfo.java Tue Jan 12 14:56:48 2010
@@ -24,9 +24,12 @@
 public final class ShardInfo {
 
   private final String url;
-  //nocommit do role based on existing ReplicationHandler role detection?
+
+  // nocommit do role based on existing ReplicationHandler role detection?
   private final Role role;
 
+  private String zkNodeName;
+
   public ShardInfo(String url) {
     this.url = url;
     role = Role.SLAVE;
@@ -44,7 +47,14 @@
   public String getUrl() {
     return url;
   }
-  
+
+  public String getZkNodeName() {
+    return zkNodeName;
+  }
+
+  public void setZkNodeName(String zkNodeName) {
+    this.zkNodeName = zkNodeName;
+  }
 
   enum Role {
     MASTER, SLAVE

Added: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java?rev=898348&view=auto
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java (added)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java Tue Jan 12 14:56:48 2010
@@ -0,0 +1,68 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.solr.common.SolrException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// nocommit - explore handling shard changes
+// watches the shards zkNode
+class ShardsWatcher implements Watcher {
+
+  private static Logger log = LoggerFactory.getLogger(ZkController.class);
+
+  // thread safe
+  private ZkController controller;
+
+  public ShardsWatcher(ZkController controller) {
+    this.controller = controller;
+  }
+
+  public void process(WatchedEvent event) {
+    // nocommit : this will be called too often as shards register themselves?
+    System.out.println("shard node changed");
+
+    try {
+      // nocommit : refresh watcher
+      // controller.getKeeperConnection().exists(event.getPath(), this);
+
+      // TODO: need to load whole state?
+      controller.loadCloudInfo();
+
+    } catch (KeeperException e) {
+      log.error("", e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "ZooKeeper Exception", e);
+    } catch (InterruptedException e) {
+      // Restore the interrupted status
+      Thread.currentThread().interrupt();
+    } catch (IOException e) {
+      log.error("", e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "IOException", e);
+    }
+
+  }
+
+}

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java Tue Jan 12 14:56:48 2010
@@ -25,6 +25,7 @@
 
 import org.apache.commons.io.FileUtils;
 import org.apache.solr.cloud.ZkClientConnectionStrategy.ZkUpdate;
+import org.apache.solr.common.SolrException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
@@ -45,17 +46,21 @@
  */
 public class SolrZkClient {
   static final String NEWL = System.getProperty("line.separator");
+  
+  public static interface OnReconnect {
+    public void command();
+  }
 
   static final int CONNECT_TIMEOUT = 5000;
 
-  protected static final Logger log = LoggerFactory
+  private static final Logger log = LoggerFactory
       .getLogger(SolrZkClient.class);
 
   boolean connected = false;
 
   private ConnectionManager connManager;
 
-  private volatile ZooKeeper keeper;
+  volatile ZooKeeper keeper;
   
   /**
    * @param zkServerAddress
@@ -65,7 +70,11 @@
    * @throws IOException
    */
   public SolrZkClient(String zkServerAddress, int zkClientTimeout) throws InterruptedException, TimeoutException, IOException {
-    this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy());
+    this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy(), null);
+  }
+  
+  public SolrZkClient(String zkServerAddress, int zkClientTimeout, OnReconnect onReonnect) throws InterruptedException, TimeoutException, IOException {
+    this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy(), onReonnect);
   }
 
   /**
@@ -77,23 +86,24 @@
    * @throws IOException
    */
   public SolrZkClient(String zkServerAddress, int zkClientTimeout,
-      ZkClientConnectionStrategy strat) throws InterruptedException,
+      ZkClientConnectionStrategy strat, final OnReconnect onReconnect) throws InterruptedException,
       TimeoutException, IOException {
-    connManager = new ConnectionManager("ZooKeeperConnection Watcher:" + zkServerAddress, this,
-        zkServerAddress, zkClientTimeout, strat);
-    strat.connect(zkServerAddress, zkClientTimeout, connManager, new ZkUpdate() {
-      @Override
-      public void update(ZooKeeper zooKeeper) {
-        if(keeper != null) {
-          try {
-            keeper.close();
-          } catch (InterruptedException e) {
-            // nocommit
+    connManager = new ConnectionManager("ZooKeeperConnection Watcher:"
+        + zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect);
+    strat.connect(zkServerAddress, zkClientTimeout, connManager,
+        new ZkUpdate() {
+          @Override
+          public void update(ZooKeeper zooKeeper) {
+            if (keeper != null) {
+              try {
+                keeper.close();
+              } catch (InterruptedException e) {
+                // nocommit
+              }
+            }
+            keeper = zooKeeper;
           }
-        }
-        keeper = zooKeeper;
-      }
-    });
+        });
     connManager.waitForConnected(CONNECT_TIMEOUT);
   }
 
@@ -136,6 +146,17 @@
       throws KeeperException, InterruptedException {
     return keeper.exists(path, watcher);
   }
+  
+  /**
+   * @param path
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public boolean exists(final String path)
+      throws KeeperException, InterruptedException {
+    return keeper.exists(path, null) != null;
+  }
 
   /**
    * @param path
@@ -286,7 +307,7 @@
           mode = createMode;
           bytes = data;
         }
-        create(currentPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
+        keeper.create(currentPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
         // set new watch
         exists(currentPath, watcher);
       } else if (i == paths.length - 1) {
@@ -318,7 +339,7 @@
    * @throws KeeperException
    * @throws InterruptedException
    */
-  public void write(String path, byte[] data) throws KeeperException,
+  public void setData(String path, byte[] data) throws KeeperException,
       InterruptedException {
 
     makePath(path);
@@ -340,14 +361,14 @@
    * @throws KeeperException
    * @throws InterruptedException
    */
-  public void write(String path, File file) throws IOException,
+  public void setData(String path, File file) throws IOException,
       KeeperException, InterruptedException {
     if (log.isInfoEnabled()) {
       log.info("Write to ZooKeepeer " + file.getAbsolutePath() + " to " + path);
     }
 
     String data = FileUtils.readFileToString(file);
-    write(path, data.getBytes());
+    setData(path, data.getBytes());
   }
 
   /**
@@ -419,7 +440,7 @@
    */
   void updateKeeper(ZooKeeper keeper) {
     // nocommit
-   log.info("Updating ZooKeeper instance");
+   log.info("Updating ZooKeeper instance:" + keeper);
    this.keeper = keeper;
   }
 

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java Tue Jan 12 14:56:48 2010
@@ -19,6 +19,7 @@
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetAddress;
@@ -34,6 +35,7 @@
 
 import javax.xml.parsers.ParserConfigurationException;
 
+import org.apache.solr.cloud.SolrZkClient.OnReconnect;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.core.SolrConfig;
 import org.apache.solr.core.SolrCore;
@@ -43,6 +45,7 @@
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,177 +57,102 @@
  * notes: loads everything on init, creates what's not there - further updates
  * are prompted with Watches.
  * 
- * TODO: handle ZooKeeper goes down / failures, Solr still runs
  */
 public final class ZkController {
-  static final String NEWL = System.getProperty("line.separator");
-  
-  private static final String COLLECTIONS_ZKNODE = "/collections/";
-
-  static final String NODE_ZKPREFIX = "/node";
-
-  private static final String SHARDS_ZKNODE = "/shards";
-
-  static final String PROPS_DESC = "NodeDesc";
-
-
-
-
-  private static final String CONFIGS_ZKNODE = "/configs/";
-
-
-  // nocommit - explore handling shard changes
-  // watches the shards zkNode
-  static class ShardsWatcher implements Watcher {
-
-    private ZkController controller;
-
-    public ShardsWatcher(ZkController controller) {
-      this.controller = controller;
-    }
-
-    public void process(WatchedEvent event) {
-      // nocommit : this will be called too often as shards register themselves
-      System.out.println("shards changed");
-
-      try {
-        // refresh watcher
-        //controller.getKeeperConnection().exists(event.getPath(), this);
-
-        // TODO: need to load whole state?
-        controller.loadCollectionInfo();
-
-      } catch (KeeperException e) {
-        log.error("", e);
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-            "ZooKeeper Exception", e);
-      } catch (InterruptedException e) {
-        // Restore the interrupted status
-        Thread.currentThread().interrupt();
-      } catch (IOException e) {
-        log.error("", e);
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-            "IOException", e);
-      }
 
-    }
-
-  }
+  private static Logger log = LoggerFactory.getLogger(ZkController.class);
 
-  final ShardsWatcher SHARD_WATCHER = new ShardsWatcher(this);
+  static final String NEWL = System.getProperty("line.separator");
 
   private final static Pattern URL_POST = Pattern.compile("https?://(.*)");
-
   private final static Pattern URL_PREFIX = Pattern.compile("(https?://).*");
 
-  private static Logger log = LoggerFactory
-      .getLogger(ZkController.class);
-
-  private SolrZkClient zkClient;
+  // package private for tests
+  static final String CORE_ZKPREFIX = "/core";
+  static final String SHARDS_ZKNODE = "/shards";
+  // nocommit : ok to be public? for corecontainer access
+  public static final String CONFIGS_ZKNODE = "/configs";
+  static final String COLLECTIONS_ZKNODE = "/collections";
 
-  SolrZkClient getKeeperConnection() {
-    return zkClient;
-  }
+  static final String PROPS_DESC = "CoreDesc";
 
-  private String collectionName;
+  final ShardsWatcher shardWatcher = new ShardsWatcher(this);
 
-  private volatile CollectionInfo collectionInfo;
+  private SolrZkClient zkClient;
 
-  private String shardsZkPath;
+  private volatile CloudInfo cloudInfo;
 
   private String zkServerAddress;
 
-  private String hostPort;
-
-  private String hostContext;
-
-  private String configName;
-
-  private String zooKeeperHostName;
-
-  private String hostName;
+  private String localHostPort;
+  private String localHostContext;
+  private String localHostName;
+  private String localHost;
 
   /**
    * 
    * @param zkServerAddress ZooKeeper server host address
-   * @param collection
-   * @param hostName
-   * @param hostPort
-   * @param hostContext
    * @param zkClientTimeout
-   * @throws IOException 
-   * @throws TimeoutException 
-   * @throws InterruptedException 
+   * @param collection
+   * @param localHost
+   * @param locaHostPort
+   * @param localHostContext
+   * @throws IOException
+   * @throws TimeoutException
+   * @throws InterruptedException
    */
-  public ZkController(String zkServerAddress, String collection,
-      String hostName, String hostPort, String hostContext, int zkClientTimeout) throws InterruptedException, TimeoutException, IOException {
+  public ZkController(String zkServerAddress, int zkClientTimeout, String localHost, String locaHostPort,
+      String localHostContext) throws InterruptedException,
+      TimeoutException, IOException {
 
-    this.collectionName = collection;
     this.zkServerAddress = zkServerAddress;
-    this.hostPort = hostPort;
-    this.hostContext = hostContext;
-    this.hostName = hostName;
-    zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout);
-
-    shardsZkPath = COLLECTIONS_ZKNODE + collectionName + SHARDS_ZKNODE;
+    this.localHostPort = locaHostPort;
+    this.localHostContext = localHostContext;
+    this.localHost = localHost;
+    zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout,
+        // on reconnect, reload cloud info
+        new OnReconnect() {
+
+          public void command() {
+            try {
+              loadCloudInfo();
+            } catch (KeeperException e) {
+              log.error("ZooKeeper Exception", e);
+              throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+                  "ZooKeeper Exception", e);
+            } catch (InterruptedException e) {
+              // Restore the interrupted status
+              Thread.currentThread().interrupt();
+            } catch (IOException e) {
+              log.error("", e);
+              throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "",
+                  e);
+            }
 
+          }
+        });
+    
     init();
   }
 
-  private void init() {
-
-    try {
-      zooKeeperHostName = getHostAddress();
-      Matcher m = URL_POST.matcher(zooKeeperHostName);
-      if (m.matches()) {
-        String hostName = m.group(1);
-        // register host
-        zkClient.makePath(hostName);
-      } else {
-        // nocommit
-        throw new IllegalStateException("Bad host:" + zooKeeperHostName);
-      }
-
-      // build layout if not exists
-      buildZkLayoutZkNodes();
-      
-      configName = readConfigName(collectionName);
-
-      // load the state of the cloud
-      loadCollectionInfo();
-
-    } catch (IOException e) {
-      log.error("", e);
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-          "Can't create ZooKeeperController", e);
-    } catch (InterruptedException e) {
-      // Restore the interrupted status
-      Thread.currentThread().interrupt();
-    } catch (KeeperException e) {
-      log.error("KeeperException", e);
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", e);
-    }
-
-  }
-
-  public boolean configFileExists(String fileName) throws KeeperException,
-      InterruptedException {
-    return configFileExists(configName, fileName);
-  }
-
   /**
    * nocommit: adds nodes if they don't exist, eg /shards/ node. consider race
    * conditions
    */
-  private void buildZkLayoutZkNodes() throws IOException {
+  private void addZkShardsNode(String collection) throws IOException {
+
+    String shardsZkPath = COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE;
     try {
       // shards node
-      if (!exists(shardsZkPath)) {
+      if (!zkClient.exists(shardsZkPath)) {
         if (log.isInfoEnabled()) {
           log.info("creating zk shards node:" + shardsZkPath);
         }
         // makes shards zkNode if it doesn't exist
-        zkClient.makePath(shardsZkPath, CreateMode.PERSISTENT, SHARD_WATCHER);
+        zkClient.makePath(shardsZkPath, CreateMode.PERSISTENT, null);
+        
+        // ping that there is a new collection
+        zkClient.setData(COLLECTIONS_ZKNODE, (byte[])null);
       }
     } catch (KeeperException e) {
       // its okay if another beats us creating the node
@@ -237,17 +165,10 @@
       // Restore the interrupted status
       Thread.currentThread().interrupt();
     }
-    
-    // no watch the shards node
-    try {
-      zkClient.exists(shardsZkPath, new Watcher(){
 
-        public void process(WatchedEvent event) {
-          // nocommit
-          // the shards node has been updated
-          // we need to look for new nodes
-          
-        }});
+    // now watch the shards node
+    try {
+      zkClient.exists(shardsZkPath, shardWatcher);
     } catch (KeeperException e) {
       log.error("ZooKeeper Exception", e);
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
@@ -271,42 +192,126 @@
   }
 
   /**
-   * @return information about the current collection from ZooKeeper
+   * @param collection
+   * @param fileName
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public boolean configFileExists(String collection, String fileName)
+      throws KeeperException, InterruptedException {
+    Stat stat = zkClient.exists(CONFIGS_ZKNODE + "/" + fileName, null);
+    return stat != null;
+  }
+
+  /**
+   * @return information about the cluster from ZooKeeper
+   */
+  public CloudInfo getCloudInfo() {
+    return cloudInfo;
+  }
+
+  public List<String> getCollectionNames() throws KeeperException,
+      InterruptedException {
+    // nocommit : watch for new collections?
+    List<String> collectionNodes = zkClient.getChildren(COLLECTIONS_ZKNODE,
+        null);
+
+    return collectionNodes;
+  }
+
+  /**
+   * Load SolrConfig from ZooKeeper.
+   * 
+   * TODO: consider *many* cores firing up at once and loading the same files
+   * from ZooKeeper
+   * 
+   * @param resourceLoader
+   * @param solrConfigFileName
+   * @return
+   * @throws IOException
+   * @throws ParserConfigurationException
+   * @throws SAXException
+   * @throws InterruptedException
+   * @throws KeeperException
    */
-  public CollectionInfo getCollectionInfo() {
-    return collectionInfo;
+  public SolrConfig getConfig(String zkConfigName, String solrConfigFileName,
+      SolrResourceLoader resourceLoader) throws IOException,
+      ParserConfigurationException, SAXException, KeeperException,
+      InterruptedException {
+    byte[] config = zkClient.getData(CONFIGS_ZKNODE + "/" + zkConfigName + "/"
+        + solrConfigFileName, null, null);
+    InputStream is = new ByteArrayInputStream(config);
+    SolrConfig cfg = solrConfigFileName == null ? new SolrConfig(
+        resourceLoader, SolrConfig.DEFAULT_CONF_FILE, is) : new SolrConfig(
+        resourceLoader, solrConfigFileName, is);
+
+    return cfg;
   }
 
   /**
+   * @param zkConfigName
+   * @param fileName
    * @return
+   * @throws KeeperException
+   * @throws InterruptedException
    */
-  public String getZkServerAddress() {
-    return zkServerAddress;
+  public byte[] getConfigFileData(String zkConfigName, String fileName)
+      throws KeeperException, InterruptedException {
+    return zkClient.getData(CONFIGS_ZKNODE + "/" + zkConfigName, null, null);
   }
 
-  // load and publish a new CollectionInfo
-  private void loadCollectionInfo() throws KeeperException, InterruptedException, IOException {
-    // build immutable CollectionInfo
-    
-      CollectionInfo collectionInfo = new CollectionInfo(zkClient, shardsZkPath);
-      // update volatile
-      this.collectionInfo = collectionInfo;
+  // nocommit: fooling around
+  private String getHostAddress() throws IOException {
+
+    if (localHost == null) {
+      localHost = "http://" + InetAddress.getLocalHost().getHostName();
+    } else {
+      Matcher m = URL_PREFIX.matcher(localHost);
+      if (m.matches()) {
+        String prefix = m.group(1);
+        localHost = prefix + localHost;
+      } else {
+        localHost = "http://" + localHost;
+      }
+    }
+    if (log.isInfoEnabled()) {
+      log.info("Register host with ZooKeeper:" + localHost);
+    }
+
+    return localHost;
   }
 
   /**
-   * @return name of configuration zkNode to use
+   * Load IndexSchema from ZooKeeper.
+   * 
+   * TODO: consider *many* cores firing up at once and loading the same files
+   * from ZooKeeper
+   * 
+   * @param resourceLoader
+   * @param schemaName
+   * @param config
+   * @return
+   * @throws InterruptedException
+   * @throws KeeperException
    */
-  public String getConfigName() {
-    return configName;
+  public IndexSchema getSchema(String zkConfigName, String schemaName,
+      SolrConfig config, SolrResourceLoader resourceLoader)
+      throws KeeperException, InterruptedException {
+    byte[] configBytes = zkClient.getData(CONFIGS_ZKNODE + "/" + zkConfigName
+        + "/" + schemaName, null, null);
+    InputStream is = new ByteArrayInputStream(configBytes);
+    IndexSchema schema = new IndexSchema(config, schemaName, is);
+    return schema;
   }
 
   // nocommit - testing
-  public String getSearchNodes() {
+  public String getSearchNodes(String collection) {
     StringBuilder nodeString = new StringBuilder();
     boolean first = true;
     List<String> nodes;
 
-    nodes = collectionInfo.getSearchShards();
+    nodes = cloudInfo.getCollectionInfo(collection).getSearchShards();
     // nocommit
     System.out.println("there are " + nodes.size() + " node(s)");
     for (String node : nodes) {
@@ -320,192 +325,116 @@
     return nodeString.toString();
   }
 
+  SolrZkClient getZkClient() {
+    return zkClient;
+  }
+
   /**
-   * Register shard. A SolrCore calls this on startup to register with
-   * ZooKeeper.
-   * 
-   * @param core
    * @return
    */
-  public String registerShard(SolrCore core) {
-    String coreName = core.getCoreDescriptor().getName();
-    String shardUrl = zooKeeperHostName + ":" + hostPort + "/" + hostContext
-        + "/" + coreName;
+  public String getZkServerAddress() {
+    return zkServerAddress;
+  }
 
-    // nocommit:
-    if (log.isInfoEnabled()) {
-      log.info("Register shard - core:" + core.getName() + " address:"
-          + shardUrl);
-    }
+  private void init() {
 
-    String nodePath = null;
     try {
-      // create node
-      ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      // nocommit: could do xml
-      Properties props = new Properties();
-      props.put(CollectionInfo.URL_PROP, shardUrl);
-
-      String shardList = core.getCoreDescriptor().getShardList();
-
-      props.put(CollectionInfo.SHARD_LIST_PROP, shardList == null ? "" : shardList);
-      props.store(baos, PROPS_DESC);
-
+      localHostName = getHostAddress();
+      Matcher m = URL_POST.matcher(localHostName);
+      if (m.matches()) {
+        String hostName = m.group(1);
+        // register host
+        zkClient.makePath(hostName);
+      } else {
+        // nocommit
+        throw new IllegalStateException("Unrecognized host:"
+            + localHostName);
+      }
       
-      nodePath = zkClient.create(shardsZkPath + NODE_ZKPREFIX, baos
-          .toByteArray(), CreateMode.EPHEMERAL_SEQUENTIAL);
       // nocommit
-      zkClient.exists(shardsZkPath, SHARD_WATCHER);
+      setUpCollectionsNode();
 
-    } catch (InterruptedException e) {
-      // Restore the interrupted status
-      Thread.currentThread().interrupt();
-    } catch (KeeperException e) {
-      log.error("ZooKeeper Exception", e);
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-          "ZooKeeper Exception", e);
     } catch (IOException e) {
       log.error("", e);
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-          "", e);
-    } 
-
-    return nodePath;
-  }
-  
-  /**
-   * @param core
-   * @param zkNodePath
-   */
-  public void unRegisterShard(SolrCore core, String zkNodePath) {
-    // nocommit : version?
-    try {
-      zkClient.delete(zkNodePath, -1);
+          "Can't create ZooKeeperController", e);
     } catch (InterruptedException e) {
       // Restore the interrupted status
       Thread.currentThread().interrupt();
-    } catch (KeeperException.NoNodeException e) {
-      // nocommit - this is okay - for some reason the node is already gone
-      log.warn("Unregistering core: " + core.getName()
-          + " but core's ZooKeeper node has already been removed");
     } catch (KeeperException e) {
-      log.error("ZooKeeper Exception", e);
-      // we can't get through to ZooKeeper, so log error
-      // and allow close process to continue -
-      // if ZooKeeper is down, our ephemeral node
-      // should be removed anyway
+      log.error("KeeperException", e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", e);
     }
+
   }
 
-  // nocommit: fooling around
-  private String getHostAddress() throws IOException {
+  // load and publish a new CollectionInfo
+  public void loadCloudInfo() throws KeeperException, InterruptedException,
+      IOException {
+    // nocommit : not thread safe anymore ?
 
-    if (hostName == null) {
-      hostName = "http://" + InetAddress.getLocalHost().getHostName();
-    } else {
-      Matcher m = URL_PREFIX.matcher(hostName);
-      if (m.matches()) {
-        String prefix = m.group(1);
-        hostName = prefix + hostName;
-      } else {
-        hostName = "http://" + hostName;
-      }
-    }
-    if (log.isInfoEnabled()) {
-      log.info("Register host with ZooKeeper:" + hostName);
+    log.info("Updating cloud state from ZooKeeper... :" + zkClient.keeper);
+    
+    // build immutable CloudInfo
+    CloudInfo cloudInfo = new CloudInfo();
+    List<String> collections = getCollectionNames();
+    // nocommit : load all collection info
+    for (String collection : collections) {
+      String shardsZkPath = COLLECTIONS_ZKNODE + "/" + collection
+          + SHARDS_ZKNODE;
+      CollectionInfo collectionInfo = new CollectionInfo(zkClient, shardsZkPath);
+      cloudInfo.addCollectionInfo(collection, collectionInfo);
     }
 
-    return hostName;
-  }
-
-  /**
-   * Check if path exists in ZooKeeper.
-   * 
-   * @param path ZooKeeper path
-   * @return true if path exists in ZooKeeper
-   * @throws InterruptedException
-   * @throws KeeperException
-   */
-  public boolean exists(String path) throws KeeperException,
-      InterruptedException {
-    Object exists = zkClient.exists(path, null);
-
-    return exists != null;
+    // update volatile
+    this.cloudInfo = cloudInfo;
   }
 
   /**
-   * Load SolrConfig from ZooKeeper.
-   * 
-   * TODO: consider *many* cores firing up at once and loading the same files
-   * from ZooKeeper
-   * 
-   * @param resourceLoader
-   * @param solrConfigFileName
+   * @param path
    * @return
-   * @throws IOException
-   * @throws ParserConfigurationException
-   * @throws SAXException
-   * @throws InterruptedException
    * @throws KeeperException
+   * @throws InterruptedException
    */
-  public SolrConfig getConfig(String zkConfigName, String solrConfigFileName,
-      SolrResourceLoader resourceLoader) throws IOException,
-      ParserConfigurationException, SAXException, KeeperException,
+  public boolean pathExists(String path) throws KeeperException,
       InterruptedException {
-    byte[] config = zkClient.getData(CONFIGS_ZKNODE + zkConfigName
-        + "/" + solrConfigFileName, null, null);
-    InputStream is = new ByteArrayInputStream(config);
-    SolrConfig cfg = solrConfigFileName == null ? new SolrConfig(
-        resourceLoader, SolrConfig.DEFAULT_CONF_FILE, is) : new SolrConfig(
-        resourceLoader, solrConfigFileName, is);
-
-    return cfg;
-  }
-
-  public byte[] getConfigFileData(String zkConfigName, String fileName)
-      throws KeeperException, InterruptedException {
-    return zkClient.getData(CONFIGS_ZKNODE + zkConfigName, null, null);
+    return zkClient.exists(path);
   }
 
   /**
-   * Load IndexSchema from ZooKeeper.
-   * 
-   * TODO: consider *many* cores firing up at once and loading the same files
-   * from ZooKeeper
-   * 
-   * @param resourceLoader
-   * @param schemaName
-   * @param config
+   * @param collection
    * @return
-   * @throws InterruptedException
    * @throws KeeperException
+   * @throws InterruptedException
    */
-  public IndexSchema getSchema(String zkConfigName, String schemaName,
-      SolrConfig config, SolrResourceLoader resourceLoader)
-      throws KeeperException, InterruptedException {
-    byte[] configBytes = zkClient.getData(CONFIGS_ZKNODE + zkConfigName
-        + "/" + schemaName, null, null);
-    InputStream is = new ByteArrayInputStream(configBytes);
-    IndexSchema schema = new IndexSchema(config, schemaName, is);
-    return schema;
-  }
-
   public String readConfigName(String collection) throws KeeperException,
       InterruptedException {
     // nocommit: load all config at once or organize differently (Properties?)
     String configName = null;
 
-    String path = COLLECTIONS_ZKNODE + collection;
+    String path = COLLECTIONS_ZKNODE + "/" + collection;
     if (log.isInfoEnabled()) {
       log.info("Load collection config from:" + path);
     }
     List<String> children;
     try {
       children = zkClient.getChildren(path, null);
-    } catch(KeeperException.NoNodeException e) {
-      log.error("Could not find config name to use for collection:" + collection, e);
+    } catch (KeeperException.NoNodeException e) {
+      // no config is set - check if there is only one config
+      // and if there is, use that
+      children = zkClient.getChildren(CONFIGS_ZKNODE, null);
+      if(children.size() == 1) {
+        String config = children.get(0);
+        log.info("No config set for " + collection + ", using single config found:" + config);
+        return config;
+      }
+
+      log.error(
+          "Multiple configurations were found, but config name to use for collection:"
+              + collection + " could not be located", e);
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-          "Timeout waiting for ZooKeeper connection", e);
+          "Multiple configurations were found, but config name to use for collection:"
+              + collection + " could not be located", e);
     }
     for (String node : children) {
       // nocommit
@@ -516,12 +445,19 @@
         if (log.isInfoEnabled()) {
           log.info("Using collection config:" + configName);
         }
+        // nocommmit : bail or read more?
       }
     }
 
     if (configName == null) {
+      children = zkClient.getChildren(CONFIGS_ZKNODE, null);
+      if(children.size() == 1) {
+        String config = children.get(0);
+        log.info("No config set for " + collection + ", using single config found:" + config);
+        return config;
+      }
       throw new IllegalStateException("no config specified for collection:"
-          + collection);
+          + collection + " " + children.size() + " configurations exist");
     }
 
     return configName;
@@ -536,20 +472,19 @@
    * @throws KeeperException
    * @throws IOException
    */
-  public Map<String,ShardInfoList> readShardInfo(String path)
+  public Map<String,ShardInfoList> readShardsNode(String path)
       throws KeeperException, InterruptedException, IOException {
-    // for now, just reparse everything
+
     HashMap<String,ShardInfoList> shardNameToShardList = new HashMap<String,ShardInfoList>();
 
-    if (!exists(path)) {
+    if (!zkClient.exists(path)) {
       throw new IllegalStateException("Cannot find zk node that should exist:"
           + path);
     }
     List<String> nodes = zkClient.getChildren(path, null);
 
     for (String zkNodeName : nodes) {
-      byte[] data = zkClient.getData(path + "/" + zkNodeName, null,
-          null);
+      byte[] data = zkClient.getData(path + "/" + zkNodeName, null, null);
 
       Properties props = new Properties();
       props.load(new ByteArrayInputStream(data));
@@ -580,10 +515,163 @@
     return Collections.unmodifiableMap(shardNameToShardList);
   }
 
-  public boolean configFileExists(String configName, String fileName)
-      throws KeeperException, InterruptedException {
-    Stat stat = zkClient.exists(CONFIGS_ZKNODE + configName, null);
-    return stat != null;
+  /**
+   * Register shard. A SolrCore calls this on startup to register with
+   * ZooKeeper.
+   * 
+   * @param core
+   * @return
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public String register(SolrCore core) throws IOException,
+      KeeperException, InterruptedException {
+    String coreName = core.getCoreDescriptor().getName();
+    String shardUrl = localHostName + ":" + localHostPort + "/" + localHostContext
+        + "/" + coreName;
+
+    // nocommit:
+    if (log.isInfoEnabled()) {
+      log.info("Register shard - core:" + core.getName() + " address:"
+          + shardUrl);
+    }
+
+    CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
+
+    String collection = cloudDesc.getCollectionName();
+    String nodePath = null;
+    String shardsZkPath = COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE;
+
+    // build layout if not exists
+    // nocommit : consider how we watch shards on all collections
+    addZkShardsNode(collection);
+
+    // create node
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    // nocommit: could do xml
+    Properties props = new Properties();
+    props.put(CollectionInfo.URL_PROP, shardUrl);
+
+    String shardList = cloudDesc.getShardList();
+
+    props.put(CollectionInfo.SHARD_LIST_PROP, shardList == null ? ""
+        : shardList);
+
+    props.put(CollectionInfo.ROLE_PROP, cloudDesc.getRole());
+
+    props.store(baos, PROPS_DESC);
+
+    nodePath = zkClient.create(shardsZkPath + CORE_ZKPREFIX,
+        baos.toByteArray(), CreateMode.EPHEMERAL_SEQUENTIAL);
+
+    return nodePath;
+  }
+
+  /**
+   * @param core
+   * @param zkNodePath
+   */
+  public void unregister(SolrCore core, String zkNodePath) {
+    // nocommit : version?
+    try {
+      zkClient.delete(zkNodePath, -1);
+    } catch (InterruptedException e) {
+      // Restore the interrupted status
+      Thread.currentThread().interrupt();
+    } catch (KeeperException.NoNodeException e) {
+      // nocommit - this is okay - for some reason the node is already gone
+      log.warn("Unregistering core: " + core.getName()
+          + " but core's ZooKeeper node has already been removed");
+    } catch (KeeperException e) {
+      log.error("ZooKeeper Exception", e);
+      // we can't get through to ZooKeeper, so log error
+      // and allow close process to continue -
+      // if ZooKeeper is down, our ephemeral node
+      // should be removed anyway
+    }
+  }
+
+  /**
+   * @param dir
+   * @param zkPath
+   * @throws IOException
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public void uploadDirToCloud(File dir, String zkPath) throws IOException, KeeperException, InterruptedException {
+    File[] files = dir.listFiles();
+    for(File file : files) {
+      if (!file.getName().startsWith(".")) {
+        if (!file.isDirectory()) {
+          zkClient.setData(zkPath + "/" + file.getName(), file);
+        } else {
+          uploadDirToCloud(file, zkPath + "/" + file.getName());
+        }
+      }
+    }
+    
+  }
+
+  // convenience for testing
+  void printLayoutToStdOut() throws KeeperException, InterruptedException {
+    zkClient.printLayoutToStdOut();
+  }
+
+  // nocommit
+  public void watchShards() throws KeeperException, InterruptedException {
+    List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE, new Watcher() {
+
+      public void process(WatchedEvent event) {
+        System.out.println("Collections node event:" + event);
+        
+      }});
+  }
+
+  private void setUpCollectionsNode() throws KeeperException, InterruptedException {
+    try {
+      if (!zkClient.exists(COLLECTIONS_ZKNODE)) {
+        if (log.isInfoEnabled()) {
+          log.info("creating zk collections node:" + COLLECTIONS_ZKNODE);
+        }
+        // makes collections zkNode if it doesn't exist
+        zkClient.makePath(COLLECTIONS_ZKNODE, CreateMode.PERSISTENT, null);
+      }
+    } catch (KeeperException e) {
+      // its okay if another beats us creating the node
+      if (e.code() != KeeperException.Code.NODEEXISTS) {
+        log.error("ZooKeeper Exception", e);
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "ZooKeeper Exception", e);
+      }
+    } catch (InterruptedException e) {
+      // Restore the interrupted status
+      Thread.currentThread().interrupt();
+    }
+    
+    log.info("Start watching collections node for changes");
+    zkClient.exists(COLLECTIONS_ZKNODE, new Watcher(){
+
+      public void process(WatchedEvent event) {
+        // nocommit
+        System.out.println("collections node changed: "+ event);
+        if(event.getType() == EventType.NodeDataChanged) {
+          // no commit - we may have a new collection, watch the shards node for them
+          
+          // re-watch
+          try {
+            zkClient.exists(COLLECTIONS_ZKNODE, this);
+          } catch (KeeperException e) {
+            log.error("ZooKeeper Exception", e);
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+                "ZooKeeper Exception", e);
+          } catch (InterruptedException e) {
+            // Restore the interrupted status
+            Thread.currentThread().interrupt();
+          }
+        }
+
+      }});
   }
 
 }

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java Tue Jan 12 14:56:48 2010
@@ -69,8 +69,8 @@
     //nocommit:
     System.out.println("look for:" + file);
     try {
-      if (zkController.exists(file)) {
-        byte[] bytes = zkController.getKeeperConnection().getData(getConfigDir() + "/" + resource, null, null);
+      if (zkController.pathExists(file)) {
+        byte[] bytes = zkController.getZkClient().getData(getConfigDir() + "/" + resource, null, null);
         return new ByteArrayInputStream(bytes);
       }
     } catch (Exception e) {

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java Tue Jan 12 14:56:48 2010
@@ -54,7 +54,7 @@
  */
 public class CoreContainer 
 {
-  private static final String DEFAULT_CORE_NAME = "DEFAULT_CORE";
+  static final String DEFAULT_CORE_NAME = "DEFAULT_CORE";
 
   protected static Logger log = LoggerFactory.getLogger(CoreContainer.class);
   
@@ -79,7 +79,6 @@
   protected String solrConfigFilenameOverride;
   protected String solrDataDirOverride;
   protected String zkPortOverride;
-  protected String collection;
   private String testShardsListOverride;
   private ZkController zooKeeperController;
 
@@ -92,6 +91,8 @@
   
   private void initZooKeeper(String zkHost, int zkClientTimeout) {
     // nocommit: perhaps get from solr.xml
+    
+    // if zkHost sys property is not set, we are not using ZooKeeper
     String zookeeperHost;
     if(zkHost == null) {
       zookeeperHost = System.getProperty("zkHost");
@@ -100,20 +101,39 @@
     }
     
     if (zookeeperHost != null) {
+      // we are ZooKeeper enabled
       try {
-        // nocommit : exceptions
-        zooKeeperController = new ZkController(zookeeperHost, collection, host,  hostPort, hostContext, zkClientTimeout);
+        zooKeeperController = new ZkController(zookeeperHost, zkClientTimeout, host, hostPort, hostContext);
+        
+        String confDir = System.getProperty("bootstrapConfDir");
+        if(confDir != null) {
+          File dir = new File(confDir);
+          if(!dir.isDirectory()) {
+            throw new IllegalArgumentException("bootstrap conf dir must be directory");
+          }
+          String confName = System.getProperty("bootstrapConfName", "conf1");
+          zooKeeperController.uploadDirToCloud(dir, ZkController.CONFIGS_ZKNODE + confName);
+          
+        }
+        
       } catch (InterruptedException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
+        // Restore the interrupted status
+        Thread.currentThread().interrupt();
       } catch (TimeoutException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
+        log.error("Could not connect to ZooKeeper", e);
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "Could not connect to ZooKeeper", e);
       } catch (IOException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
+        log.error("", e);
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "", e);
+      } catch (KeeperException e) {
+        log.error("", e);
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "", e);
       }
     }
+    
   }
 
   public Properties getContainerProperties() {
@@ -283,7 +303,8 @@
       hostContext = cfg.get("solr/cores/@hostContext", "solr");
       host = cfg.get("solr/cores/@host", null);
 
-      collection = cfg.get("solr/cores/@collection", "collection1"); //nocommit: default collection
+      // nocommit read from core
+      //collection = cfg.get("solr/cores/@collection", "collection1"); //nocommit: default collection
 
       if(shareSchema){
         indexSchemaCache = new ConcurrentHashMap<String ,IndexSchema>();
@@ -341,9 +362,17 @@
           }
           opt = DOMUtil.getAttr(node, "shardList", null);
           if(testShardsListOverride != null && name.equals("")) {
-            p.setShardList(testShardsListOverride);
+            p.getCloudDescriptor().setShardList(testShardsListOverride);
           } else if(opt != null) {
-            p.setShardList(opt);
+            p.getCloudDescriptor().setShardList(opt);
+          }
+          opt = DOMUtil.getAttr(node, "role", null);
+          if(opt != null) {
+            p.getCloudDescriptor().setRole(opt);
+          }
+          opt = DOMUtil.getAttr(node, "collection", null);
+          if (opt != null) {
+            p.getCloudDescriptor().setCollectionName(opt);
           }
           opt = DOMUtil.getAttr(node, "properties", null);
           if (opt != null) {
@@ -364,13 +393,33 @@
           SolrException.logOnce(log,null,ex);
         }
       }
-    }
-
-    finally {
+    } finally {
       if (cfgis != null) {
         try { cfgis.close(); } catch (Exception xany) {}
       }
     }
+    
+    
+    if(zooKeeperController != null) {
+      // nocommit : exceptions
+      try {
+        zooKeeperController.loadCloudInfo();
+        
+        // nocommit : set shards node watches
+        zooKeeperController.watchShards();
+      } catch (InterruptedException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      } catch (KeeperException e) {
+        log.error("ZooKeeper Exception", e);
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "ZooKeeper Exception", e);
+      } catch (IOException e) {
+        log.error("", e);
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "", e);
+      }
+    }
   }
 
   private Properties readProperties(Config cfg, Node node) throws XPathExpressionException {
@@ -479,16 +528,18 @@
     String instanceDir = idir.getPath();
     
     // Initialize the solr config
-    SolrResourceLoader solrLoader ;
+    SolrResourceLoader solrLoader = null;
     
     SolrConfig config = null;
+    String zkConfigName = null;
     if(zooKeeperController == null) {
       solrLoader = new SolrResourceLoader(instanceDir, libLoader, getCoreProps(instanceDir, dcore.getPropertiesName(),dcore.getCoreProperties()));
       config = new SolrConfig(solrLoader, dcore.getConfigName(), null);
     } else {
-      solrLoader = new ZkSolrResourceLoader(instanceDir, collection, libLoader, getCoreProps(instanceDir, dcore.getPropertiesName(),dcore.getCoreProperties()), zooKeeperController);
       try {
-        config = zooKeeperController.getConfig(zooKeeperController.getConfigName(), dcore.getConfigName(), solrLoader);
+        zkConfigName = zooKeeperController.readConfigName(dcore.getCloudDescriptor().getCollectionName());
+        solrLoader = new ZkSolrResourceLoader(instanceDir, zkConfigName, libLoader, getCoreProps(instanceDir, dcore.getPropertiesName(),dcore.getCoreProperties()), zooKeeperController);
+        config = zooKeeperController.getConfig(zkConfigName, dcore.getConfigName(), solrLoader);
       } catch (KeeperException e) {
         log.error("ZooKeeper Exception", e);
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
@@ -496,7 +547,7 @@
       } catch (InterruptedException e) {
         // Restore the interrupted status
         Thread.currentThread().interrupt();
-        //nocommit: we may not know the config name - now what
+        //nocommit: not good - we can't continue
       }
     }
     IndexSchema schema = null;
@@ -531,7 +582,8 @@
     if(schema == null){
       if(zooKeeperController != null) {
         try {
-          schema = zooKeeperController.getSchema(zooKeeperController.getConfigName(), dcore.getSchemaName(), config, solrLoader);
+          System.out.println("config:" + zkConfigName);
+          schema = zooKeeperController.getSchema(zkConfigName, dcore.getSchemaName(), config, solrLoader);
         } catch (KeeperException e) {
           log.error("ZooKeeper Exception", e);
           throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreDescriptor.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreDescriptor.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreDescriptor.java Tue Jan 12 14:56:48 2010
@@ -20,6 +20,8 @@
 import java.util.Properties;
 import java.io.File;
 
+import org.apache.solr.cloud.CloudDescriptor;
+
 /**
  * A Solr core descriptor
  *
@@ -34,11 +36,16 @@
   protected String schemaName;
   private final CoreContainer coreContainer;
   private Properties coreProperties;
-  private String shardList;
+  
+  // nocommit : only filled when using ZooKeeper
+  private CloudDescriptor cloudDesc = new CloudDescriptor();
 
   public CoreDescriptor(CoreContainer coreContainer, String name, String instanceDir) {
     this.coreContainer = coreContainer;
     this.name = name;
+    
+    // cloud collection defaults to core name
+    this.cloudDesc.setCollectionName(name == "" ? CoreContainer.DEFAULT_CORE_NAME : name);
     if (name == null) {
       throw new RuntimeException("Core needs a name");
     }
@@ -173,13 +180,7 @@
     }
   }
 
-  public void setShardList(String shardList) {
-    System.out.println("set shard list:" + shardList);
-    this.shardList = shardList;
-  }
-  
-  //nocommit: may be null
-  public String getShardList() {
-    return shardList;
+  public CloudDescriptor getCloudDescriptor() {
+    return cloudDesc;
   }
 }

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/core/SolrCore.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/core/SolrCore.java Tue Jan 12 14:56:48 2010
@@ -50,6 +50,7 @@
 import org.apache.solr.util.plugin.NamedListInitializedPlugin;
 import org.apache.solr.util.plugin.SolrCoreAware;
 import org.apache.solr.util.plugin.PluginInfoInitialized;
+import org.apache.zookeeper.KeeperException;
 import org.apache.commons.io.IOUtils;
 import org.xml.sax.SAXException;
 
@@ -531,7 +532,21 @@
     
     zooKeeperComponent = cd.getCoreContainer().getZooKeeperController();
     if(zooKeeperComponent != null) {
-      this.zkNodePath = zooKeeperComponent.registerShard(this);
+      // load ZooKeeper - nocommit: somehow fall back to local configs?
+      try {
+        this.zkNodePath = zooKeeperComponent.register(this);
+      } catch (InterruptedException e) {
+        // Restore the interrupted status
+        Thread.currentThread().interrupt();
+      } catch (KeeperException e) {
+        log.error("ZooKeeper Exception", e);
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "ZooKeeper Exception", e);
+      } catch (IOException e) {
+        log.error("", e);
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "", e);
+      } 
     }
 
     //Initialize JMX
@@ -711,7 +726,7 @@
     log.info(logid+" CLOSING SolrCore " + this);
     // nocommit : if ZooKeeper, unregister core
     if(zooKeeperComponent != null) {
-      zooKeeperComponent.unRegisterShard(this, zkNodePath);
+      zooKeeperComponent.unregister(this, zkNodePath);
     }
     try {
       infoRegistry.clear();

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/QueryElevationComponent.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/QueryElevationComponent.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/QueryElevationComponent.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/QueryElevationComponent.java Tue Jan 12 14:56:48 2010
@@ -175,7 +175,7 @@
         // check if using ZooKeeper
         ZkController zooKeeperController = core.getCoreDescriptor().getCoreContainer().getZooKeeperController();
         if(zooKeeperController != null) {
-          exists = zooKeeperController.configFileExists(f);
+          exists = zooKeeperController.configFileExists(core.getCoreDescriptor().getCloudDescriptor().getCollectionName(), f);
         } else {
           File fC = new File( core.getResourceLoader().getConfigDir(), f );
           File fD = new File( core.getDataDir(), f );

Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java Tue Jan 12 14:56:48 2010
@@ -85,18 +85,18 @@
      
       RandVal.uniqueValues = new HashSet(); //reset random values
       doTest();
-      printeLayout();
+      printLayout();
 
       destroyServers();
     }
   }
 
   public void tearDown() throws Exception {
-    printeLayout();
+    printLayout();
     super.tearDown();
   }
   
-  private void printeLayout() throws Exception {
+  protected void printLayout() throws Exception {
     SolrZkClient zkClient = new SolrZkClient(AbstractZkTestCase.JUST_HOST_NAME, AbstractZkTestCase.TIMEOUT);
     zkClient.printLayoutToStdOut();
     zkClient.close();

Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java Tue Jan 12 14:56:48 2010
@@ -42,7 +42,8 @@
       + System.getProperty("file.separator") + getClass().getName() + "-"
       + System.currentTimeMillis());
 
-  private ZkTestServer zkServer;
+  protected ZkTestServer zkServer;
+  protected String zkDir;
 
   public AbstractZkTestCase() {
 
@@ -59,7 +60,7 @@
   public void setUp() throws Exception {
     try {
       System.setProperty("zkHost", ZOO_KEEPER_ADDRESS);
-      String zkDir = tmpDir.getAbsolutePath() + File.separator
+      zkDir = tmpDir.getAbsolutePath() + File.separator
       + "zookeeper/server1/data";
       zkServer = new ZkTestServer(zkDir);
       zkServer.run();
@@ -103,7 +104,7 @@
 
     zkClient = new SolrZkClient(ZOO_KEEPER_ADDRESS, AbstractZkTestCase.TIMEOUT);
     
-    zkClient.makePath("/collections/collection1/config=collection1");
+    //zkClient.makePath("/collections/collection1/config=collection1");
 
     putConfig(zkClient, config);
     putConfig(zkClient, schema);
@@ -119,7 +120,7 @@
   }
 
   private static void putConfig(SolrZkClient zkConnection, String name) throws Exception {
-    zkConnection.write("/configs/collection1/" + name, new File("solr"
+    zkConnection.setData("/configs/conf1/" + name, new File("solr"
         + File.separator + "conf" + File.separator + name));
   }
 

Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java Tue Jan 12 14:56:48 2010
@@ -21,6 +21,7 @@
 
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.core.CoreDescriptor;
 
 /**
  * nocommit: 
@@ -73,6 +74,7 @@
 
   public void testDistribSearch() throws Exception {
     for (int nServers = 3; nServers < 4; nServers++) {
+      printLayout();
       createServers(nServers);
       RandVal.uniqueValues = new HashSet(); //reset random values
       doTest();
@@ -127,6 +129,9 @@
       query("q","*:*", "sort",f+" asc");
     }
 
+    h.getCoreContainer().getCore("DEFAULT_CORE").close();
+    CoreDescriptor dcore= new CoreDescriptor( h.getCoreContainer(), "testcore", "testcore");
+    h.getCoreContainer().create(dcore);
 
     // these queries should be exactly ordered and scores should exactly match
     query("q","*:*", "sort",i1+" desc");
@@ -227,6 +232,8 @@
       query("q","fox duplicate horses", "hl","true", "hl.fl", t1);
       query("q","*:*", "rows",100);
     }
+    
+    super.printLayout();
 
     // Thread.sleep(10000000000L);
   }

Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java Tue Jan 12 14:56:48 2010
@@ -22,7 +22,6 @@
 
 
 /**
- * TODO: assert config came from ZooKeeper
  *
  */
 public class BasicZkTest extends AbstractZkTestCase {
@@ -81,7 +80,14 @@
       assertU(a, a);
     }
     assertU(commit());
-
+    
+    zkServer.shutdown();
+    Thread.sleep(300);
+    // try a reconnect
+    
+    zkServer = new ZkTestServer(zkDir);
+    zkServer.run();
+    
     // test maxint
     assertQ(req("q", "id:[100 TO 110]", "rows", "2147483647"),
         "//*[@numFound='4']");
@@ -102,6 +108,6 @@
     assertQ(req("id:[100 TO 110]"), "//*[@numFound='0']");
     
     //nocommit
-    System.out.println("search nodes:" + h.getCoreContainer().getZooKeeperController().getSearchNodes());
+    System.out.println("search nodes:" + h.getCoreContainer().getZooKeeperController().getSearchNodes("DEFAULT_CORE"));
   }
 }

Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java Tue Jan 12 14:56:48 2010
@@ -57,6 +57,7 @@
         + "zookeeper/server1/data";
     ZkTestServer server = null;
     SolrZkClient zkClient = null;
+    ZkController zkController = null;
     try {
       server = new ZkTestServer(zkDir);
       server.run();
@@ -79,10 +80,10 @@
         zkClient.printLayoutToStdOut();
       }
 
-      ZkController zkController = new ZkController(ZOO_KEEPER_ADDRESS,
-          "collection1", "localhost", "8983", "/solr", TIMEOUT);
+      zkController = new ZkController(ZOO_KEEPER_ADDRESS, TIMEOUT,
+          "localhost", "8983", "/solr");
       Map<String,ShardInfoList> shardInfoMap = zkController
-          .readShardInfo(shardsPath);
+          .readShardsNode(shardsPath);
       assertTrue(shardInfoMap.size() > 0);
 
       Set<Entry<String,ShardInfoList>> entries = shardInfoMap.entrySet();
@@ -117,6 +118,9 @@
       if (server != null) {
         server.shutdown();
       }
+      if(zkController != null) {
+        zkController.close();
+      }
     }
   }
 
@@ -140,15 +144,43 @@
       zkClient.printLayoutToStdOut();
     }
 
-    ZkController zkController = new ZkController(ZOO_KEEPER_ADDRESS,
-        "collection1", "localhost", "8983", "/solr", TIMEOUT);
+    ZkController zkController = new ZkController(ZOO_KEEPER_ADDRESS, TIMEOUT,
+        "localhost", "8983", "/solr");
     String configName = zkController.readConfigName(COLLECTION_NAME);
     assertEquals(configName, actualConfigName);
 
+
+    // nocommit : close in finally
+    zkController.close();
     zkClient.close();
     server.shutdown();
 
   }
+  
+  public void testUploadToCloud() throws Exception {
+    String zkDir = tmpDir.getAbsolutePath() + File.separator
+        + "zookeeper/server1/data";
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+    server.run();
+
+    AbstractZkTestCase.makeSolrZkNode();
+
+    ZkController zkController = new ZkController(ZOO_KEEPER_ADDRESS, TIMEOUT,
+        "localhost", "8983", "/solr");
+
+
+    zkController.uploadDirToCloud(new File("solr/conf"), ZkController.CONFIGS_ZKNODE + "/config1");
+    
+    if (DEBUG) {
+      zkController.printLayoutToStdOut();
+    }
+    
+    // nocommit close in finally
+    zkController.close();
+    server.shutdown();
+
+  }
 
   private void addShardToZk(SolrZkClient zkClient, String shardsPath,
       String url, String shardList) throws IOException, KeeperException,
@@ -160,7 +192,7 @@
     props.put(CollectionInfo.SHARD_LIST_PROP, shardList);
     props.store(baos, ZkController.PROPS_DESC);
 
-    zkClient.create(shardsPath + ZkController.NODE_ZKPREFIX,
+    zkClient.create(shardsPath + ZkController.CORE_ZKPREFIX,
         baos.toByteArray(), CreateMode.EPHEMERAL_SEQUENTIAL);
   }
 

Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java Tue Jan 12 14:56:48 2010
@@ -18,24 +18,51 @@
  */
 
 import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import junit.framework.TestCase;
 
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
 
 public class ZkSolrClientTest extends TestCase {
   protected File tmpDir = new File(System.getProperty("java.io.tmpdir")
       + System.getProperty("file.separator") + getClass().getName() + "-"
       + System.currentTimeMillis());
   
-  public void testBasic() throws Exception {
+  public void testConnect() throws Exception {
+    String zkDir = tmpDir.getAbsolutePath() + File.separator
+    + "zookeeper/server1/data";
+    ZkTestServer server = null;
+
+    server = new ZkTestServer(zkDir);
+    server.run();
+
+    SolrZkClient zkClient = new SolrZkClient(AbstractZkTestCase.ZOO_KEEPER_ADDRESS,
+        AbstractZkTestCase.TIMEOUT);
+    
+    zkClient.close();
+    server.shutdown();
+  }
+  
+  public void testMakeRootNode() throws Exception {
+    String zkDir = tmpDir.getAbsolutePath() + File.separator
+    + "zookeeper/server1/data";
+    ZkTestServer server = null;
+
+    server = new ZkTestServer(zkDir);
+    server.run();
+
+    AbstractZkTestCase.makeSolrZkNode();
+    
+    SolrZkClient zkClient = new SolrZkClient(AbstractZkTestCase.ZOO_KEEPER_SERVER,
+        AbstractZkTestCase.TIMEOUT);
+    
+    assertTrue(zkClient.exists("/solr"));
+    
+    zkClient.close();
+    server.shutdown();
+  }
+  
+  public void testReconnect() throws Exception {
     String zkDir = tmpDir.getAbsolutePath() + File.separator
         + "zookeeper/server1/data";
     ZkTestServer server = null;
@@ -47,40 +74,7 @@
       AbstractZkTestCase.makeSolrZkNode();
 
       zkClient = new SolrZkClient(AbstractZkTestCase.ZOO_KEEPER_ADDRESS,
-          AbstractZkTestCase.TIMEOUT, new ZkClientConnectionStrategy() {
-            ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
-            @Override
-            public void reconnect(final String serverAddress, final int zkClientTimeout,
-                final Watcher watcher, final ZkUpdate updater) throws IOException {
-              System.out.println("reconnecting");
-              executor.scheduleAtFixedRate(new Runnable() {
-                public void run() {
-                  // nocommit
-                  System.out.println("Attempting the connect...");
-                  try {
-                    updater.update(new ZooKeeper(serverAddress, zkClientTimeout, watcher));
-                    // nocommit
-                    System.out.println("Connect done");
-                  } catch (Exception e) {
-                    // nocommit
-                    e.printStackTrace();
-                    System.out.println("failed reconnect");
-                  }
-                  executor.shutdownNow();
-                  
-                }
-              }, 0, 400, TimeUnit.MILLISECONDS);
-              
-            }
-            
-            @Override
-            public void connect(String zkServerAddress, int zkClientTimeout,
-                Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException {
-              System.out.println("connecting");
-              updater.update(new ZooKeeper(zkServerAddress, zkClientTimeout, watcher));
-              
-            }
-          });
+          AbstractZkTestCase.TIMEOUT);
       String shardsPath = "/collections/collection1/shards";
       zkClient.makePath(shardsPath);
 
@@ -103,7 +97,9 @@
       server = new ZkTestServer(zkDir);
       server.run();
       
-      Thread.sleep(80);
+      // wait for reconnect
+      Thread.sleep(1000);
+      
       zkClient.makePath("collections/collection1/config=collection3");
       
       zkClient.printLayoutToStdOut();
@@ -114,6 +110,7 @@
     } catch(Exception e) {
       // nocommit
       e.printStackTrace();
+      throw e;
     } finally {
     
       if (zkClient != null) {