You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2011/12/16 18:07:45 UTC

svn commit: r1215227 - in /lucene/dev/branches/solrcloud/solr: core/src/java/org/apache/solr/cloud/ core/src/test/org/apache/solr/cloud/ solrj/src/java/org/apache/solr/common/cloud/

Author: markrmiller
Date: Fri Dec 16 17:07:44 2011
New Revision: 1215227

URL: http://svn.apache.org/viewvc?rev=1215227&view=rev
Log:
commit sami's latest overseer patch

Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/OverseerElector.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CoreAssignment.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1215227&r1=1215226&r2=1215227&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Fri Dec 16 17:07:44 2011
@@ -130,7 +130,7 @@ public class LeaderElector {
   }
 
   protected void runIamLeaderProcess(final ElectionContext context) throws KeeperException,
-      InterruptedException, IOException {
+      InterruptedException {
     String currentLeaderZkPath = context.electionPath
         + LEADER_NODE;
     // TODO: leader election tests do not currently set the props

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java?rev=1215227&r1=1215226&r2=1215227&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java Fri Dec 16 17:07:44 2011
@@ -40,8 +40,8 @@ public class NodeStateWatcher implements
   private static Logger log = LoggerFactory.getLogger(NodeStateWatcher.class);
 
   public static interface NodeStateChangeListener {
-    void coreCreated(String shardZkNodeName, Set<CoreState> cores) throws KeeperException;
-    void coreChanged(String nodeName, Set<CoreState> cores) throws KeeperException;
+    void coreCreated(String shardZkNodeName, Set<CoreState> cores) throws KeeperException, InterruptedException;
+    void coreChanged(String nodeName, Set<CoreState> cores) throws KeeperException, InterruptedException;
   }
 
   private final SolrZkClient zkClient;
@@ -120,6 +120,9 @@ public class NodeStateWatcher implements
             listener.coreCreated(nodeName, Collections.unmodifiableSet(newCores));
           } catch (KeeperException e) {
             log.warn("Could not talk to ZK", e);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.warn("Could not talk to ZK", e);
           }
         }
 
@@ -128,6 +131,9 @@ public class NodeStateWatcher implements
           listener.coreChanged(nodeName, Collections.unmodifiableSet(changedCores));
           } catch (KeeperException e) {
             log.warn("Could not talk to ZK", e);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.warn("Could not talk to ZK", e);
           }
         }
     } else {

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1215227&r1=1215226&r2=1215227&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java Fri Dec 16 17:07:44 2011
@@ -32,11 +32,9 @@ import org.apache.solr.common.SolrExcept
 import org.apache.solr.common.cloud.*;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.KeeperException.Code;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +43,8 @@ import org.slf4j.LoggerFactory;
  */
 public class Overseer implements NodeStateChangeListener {
   
+  public static final String ASSIGNMENTS_NODE = "/node_assignments";
+  public static final String STATES_NODE = "/node_states";
   private static Logger log = LoggerFactory.getLogger(Overseer.class);
   
   private final SolrZkClient zkClient;
@@ -57,23 +57,9 @@ public class Overseer implements NodeSta
     log.info("Constructing new Overseer");
     this.zkClient = zkClient;
     this.reader = reader;
-    createZkNodes(zkClient);
     createWatches();
   }
   
-  public static void createZkNodes(SolrZkClient zkClient) throws KeeperException, InterruptedException {
-    //create assignments node if it does not exist
-    if (!zkClient.exists("/node_assignments")) {
-      try {
-        zkClient.makePath("/node_assignments", CreateMode.PERSISTENT);
-      } catch (KeeperException e) {
-        if (e.code() != KeeperException.Code.NODEEXISTS) {
-          throw e;
-        }
-      }
-    }
-  }
-
   public synchronized void createWatches()
       throws KeeperException, InterruptedException {
     // We need to fetch the current cluster state and the set of live nodes
@@ -148,7 +134,7 @@ public class Overseer implements NodeSta
   private void addNodeStateWatches(Set<String> nodeNames) throws InterruptedException, KeeperException {
     
     for (String nodeName : nodeNames) {
-      String path = "/node_states/" + nodeName;
+      final String path = STATES_NODE + "/" + nodeName;
       synchronized (nodeStateWatches) {
         if (!nodeStateWatches.containsKey(nodeName)) {
           try {
@@ -175,12 +161,14 @@ public class Overseer implements NodeSta
   /**
    * Try to assign core to the cluster
    * @throws KeeperException 
+   * @throws InterruptedException 
    */
-  private void updateState(String nodeName, CoreState coreState) throws KeeperException {
+  private void updateState(String nodeName, CoreState coreState) throws KeeperException, InterruptedException {
     String collection = coreState.getCollectionName();
     String coreName = coreState.getCoreName();
     
     synchronized (reader.getUpdateLock()) {
+      reader.updateCloudState(true); //get fresh copy of the state
       String shardId;
       CloudState state = reader.getCloudState();
       if (coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP) == null) {
@@ -221,7 +209,7 @@ public class Overseer implements NodeSta
   }
   
   @Override
-  public void coreCreated(String nodeName, Set<CoreState> states) throws KeeperException {
+  public void coreCreated(String nodeName, Set<CoreState> states) throws KeeperException, InterruptedException {
     log.debug("Cores created: " + nodeName + " states:" +states);
     for (CoreState state : states) {
       updateState(nodeName, state);
@@ -241,7 +229,6 @@ public class Overseer implements NodeSta
     ArrayList<CoreAssignment> assignments = new ArrayList<CoreAssignment>();
     for(CoreState coreState: states) {
       final String coreName = coreState.getCoreName();
-      final String collection = coreState.getCollectionName();
       HashMap<String, String> coreProperties = new HashMap<String, String>();
       Map<String, Slice> slices = cloudState.getSlices(coreState.getCollectionName());
       for(Entry<String, Slice> entry: slices.entrySet()) {
@@ -250,22 +237,13 @@ public class Overseer implements NodeSta
           coreProperties.put(ZkStateReader.SHARD_ID_PROP, entry.getKey());
         }
       }
-      CoreAssignment assignment = new CoreAssignment(coreName, collection, coreProperties);
+      CoreAssignment assignment = new CoreAssignment(coreName, coreProperties);
       assignments.add(assignment);
     }
     
     //serialize
     byte[] content = ZkStateReader.toJSON(assignments);
-    final String nodeName = "/node_assignments/" + node;
-    if (!zkClient.exists(nodeName)) {
-      try {
-        zkClient.makePath(nodeName);
-      } catch (KeeperException ke) {
-        if (ke.code() != Code.NODEEXISTS) {
-          throw ke;
-        }
-      }
-    }
+    final String nodeName = ASSIGNMENTS_NODE + "/" + node;
     zkClient.setData(nodeName, content);
   }
   
@@ -278,10 +256,31 @@ public class Overseer implements NodeSta
   }
 
   @Override
-  public void coreChanged(String nodeName, Set<CoreState> states) throws KeeperException  {
+  public void coreChanged(String nodeName, Set<CoreState> states) throws KeeperException, InterruptedException  {
     log.debug("Cores changed: " + nodeName + " states:" + states);
     for (CoreState state : states) {
       updateState(nodeName, state);
     }
   }
+  
+  public static void createClientNodes(SolrZkClient zkClient, String nodeName) throws KeeperException, InterruptedException {
+    createZkNode(zkClient, STATES_NODE + "/" + nodeName);
+    createZkNode(zkClient, ASSIGNMENTS_NODE + "/" + nodeName);
+  }
+  
+  private static void createZkNode(SolrZkClient zkClient, String nodeName) throws KeeperException, InterruptedException {
+    
+    if (log.isInfoEnabled()) {
+      log.info("creating node:" + nodeName);
+    }
+    
+    try {
+      if (!zkClient.exists(nodeName)) {
+        zkClient.makePath(nodeName, CreateMode.PERSISTENT, null);
+      }
+      
+    } catch (NodeExistsException e) {
+      // it's ok
+    }
+  }
 }
\ No newline at end of file

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/OverseerElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/OverseerElector.java?rev=1215227&r1=1215226&r2=1215227&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/OverseerElector.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/OverseerElector.java Fri Dec 16 17:07:44 2011
@@ -17,13 +17,9 @@ package org.apache.solr.cloud;
  * limitations under the License.
  */
 
-import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Overseer Elector.
@@ -31,7 +27,6 @@ import org.slf4j.LoggerFactory;
 public class OverseerElector extends LeaderElector {
   private final SolrZkClient client;
   private final ZkStateReader reader;
-  private static Logger log = LoggerFactory.getLogger(OverseerElector.class);
   
   public OverseerElector(SolrZkClient client, ZkStateReader stateReader) {
     super(client);
@@ -40,21 +35,8 @@ public class OverseerElector extends Lea
   }
   
   @Override
-  protected void runIamLeaderProcess(ElectionContext context) {
-    try {
-      new Overseer(client, reader);
-    } catch (KeeperException e) {
-      if (e.code() == KeeperException.Code.SESSIONEXPIRED
-          || e.code() == KeeperException.Code.CONNECTIONLOSS) {
-        log.warn("Cannot run overseer leader process, Solr cannot talk to ZK");
-        return;
-      }
-      throw new ZooKeeperException(
-          SolrException.ErrorCode.SERVER_ERROR, "", e);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      log.warn("Could not run leader process", e);
-    }
+  protected void runIamLeaderProcess(ElectionContext context) throws KeeperException, InterruptedException{
+    new Overseer(client, reader);
   }
   
 }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1215227&r1=1215226&r2=1215227&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java Fri Dec 16 17:07:44 2011
@@ -302,9 +302,9 @@ public final class ZkController {
         }
       }
       
+      Overseer.createClientNodes(zkClient, getNodeName());
       createEphemeralLiveNode();
       setUpCollectionsNode();
-      createAssignmentsNode();
       
       byte[] assignments = zkClient.getData(getAssignmentsNode(), new Watcher(){
 
@@ -359,9 +359,12 @@ public final class ZkController {
 
   }
 
-
   private String getAssignmentsNode() {
-    return "/node_assignments/" + getNodeName();
+    return Overseer.ASSIGNMENTS_NODE + "/" + getNodeName();
+  }
+
+  private String getStatesNode() {
+    return Overseer.STATES_NODE + "/" + getNodeName();
   }
 
   private void createEphemeralLiveNode() throws KeeperException,
@@ -476,17 +479,17 @@ public final class ZkController {
     props.put(ZkStateReader.ROLES_PROP, cloudDesc.getRoles());
     props.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
     if(shardId!=null) {
-      props.put("shard_id", shardId);
+      props.put(ZkStateReader.SHARD_ID_PROP, shardId);
     }
 
     if (shardId == null && getShardId(desc, state, shardZkNodeName)) {
       publishState(cloudDesc, shardZkNodeName, props); //need to publish state to get overseer assigned id
       shardId = doGetShardIdProcess(coreName, cloudDesc);
       cloudDesc.setShardId(shardId);
-      props.put("shard_id", shardId);
+      props.put(ZkStateReader.SHARD_ID_PROP, shardId);
     } else {
       // shard id was picked up in getShardId
-      props.put("shard_id", cloudDesc.getShardId());
+      props.put(ZkStateReader.SHARD_ID_PROP, cloudDesc.getShardId());
       shardId = cloudDesc.getShardId();
       publishState(cloudDesc, shardZkNodeName, props);
     }
@@ -494,7 +497,7 @@ public final class ZkController {
     if (log.isInfoEnabled()) {
         log.info("Register shard - core:" + coreName + " address:"
             + shardUrl + "shardId:" + shardId);
-      }
+    }
 
     ZkNodeProps zkProps = new ZkNodeProps(props);
 
@@ -668,33 +671,6 @@ public final class ZkController {
     }
     
   }
-  
-  private void createAssignmentsNode() throws KeeperException, InterruptedException {
-    String nodeName = getAssignmentsNode();
-    
-    try {
-      
-      if (log.isInfoEnabled()) {
-        log.info("creating node:" + nodeName);
-      }
-
-      zkClient.makePath(nodeName, CreateMode.PERSISTENT, null);
-
-    } catch (KeeperException e) {
-      // its okay if another beats us creating the node
-      if (e.code() != KeeperException.Code.NODEEXISTS) {
-        log.error("", e);
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-            "", e);
-      }
-    } catch (InterruptedException e) {
-      // Restore the interrupted status
-      Thread.currentThread().interrupt();
-      log.error("", e);
-      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-          "", e);
-    }
-  }
 
   public void createCollectionZkNode(CloudDescriptor cd) throws KeeperException, InterruptedException, IOException {
     String collection = cd.getCollectionName();
@@ -809,12 +785,6 @@ public final class ZkController {
     final String nodePath = "/node_states/" + getNodeName();
 
     try {
-
-      if (!zkClient.exists(nodePath)) {
-        // nocommit: race condition - someone else might make the node first
-        zkClient.makePath(nodePath);
-      }
-      
       log.info("publishing node state:" + coreStates.values());
       zkClient.setData(
           nodePath,
@@ -833,9 +803,9 @@ public final class ZkController {
     }
   }
 
-  private String doGetShardIdProcess(String coreName, CloudDescriptor descriptor) {
+  private String doGetShardIdProcess(String coreName, CloudDescriptor descriptor) throws InterruptedException {
     final String shardZkNodeName = getNodeName() + "_" + coreName;
-    int retryCount = 20;
+    int retryCount = 40;
     while (retryCount-->0) {
       synchronized (assignments) {
         CoreAssignment assignment = assignments.get(shardZkNodeName);

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java?rev=1215227&r1=1215226&r2=1215227&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java Fri Dec 16 17:07:44 2011
@@ -18,17 +18,23 @@ package org.apache.solr.cloud;
  */
 
 import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.cloud.CloudState;
 import org.apache.solr.common.cloud.CoreState;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.zookeeper.CreateMode;
@@ -131,26 +137,177 @@ public class OverseerTest extends SolrTe
     
     System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
   }
-  
+
+  @Test
+  public void testShardAssignmentBigger() throws Exception {
+    String zkDir = dataDir.getAbsolutePath() + File.separator
+        + "zookeeper/server1/data";
+
+    final int nodeCount = 10; //how many simulated nodes
+    final int coreCount = 66; //how many cores to register
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+    ZkStateReader reader = null;
+    final ZkController[] controllers = new ZkController[nodeCount];
+
+    try {
+      server.run();
+      AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
+      AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
+
+      zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
+      reader = new ZkStateReader(zkClient);
+      
+      System.setProperty(ZkStateReader.NUM_SHARDS_PROP, "3");
+
+      for (int i = 0; i < nodeCount; i++) {
+      
+      controllers[i] = new ZkController(server.getZkAddress(), TIMEOUT, 10000,
+          "localhost", "898" + i, "solr", new CurrentCoreDescriptorProvider() {
+
+            @Override
+            public List<CoreDescriptor> getCurrentDescriptors() {
+              // do nothing
+              return null;
+            }
+          });
+      }
+
+      System.setProperty("bootstrap_confdir", getFile("solr/conf")
+          .getAbsolutePath());
+
+      
+      final ExecutorService[] nodeExecutors = new ExecutorService[nodeCount];
+      for (int i = 0; i < nodeCount; i++) {
+        nodeExecutors[i] = Executors.newFixedThreadPool(1);
+      }
+      
+      final String[] ids = new String[coreCount];
+      //register total of coreCount cores
+      for (int i = 0; i < coreCount; i++) {
+        final int slot = i;
+        Runnable coreStarter = new Runnable() {
+          @Override
+          public void run() {
+            // TODO Auto-generated method stub
+            CloudDescriptor collection1Desc = new CloudDescriptor();
+            collection1Desc.setCollectionName("collection1");
+
+            final String coreName = "core" + slot;
+            
+            CoreDescriptor desc = new CoreDescriptor(null, coreName, "");
+            desc.setCloudDescriptor(collection1Desc);
+            try {
+              ids[slot] = controllers[slot % nodeCount].register(coreName, desc);
+            } catch (Exception e) {
+              fail("register threw exception:" + e);
+            }
+          }
+        };
+        
+        nodeExecutors[i % nodeCount].submit(coreStarter);
+      }
+      
+      for (int i = 0; i < nodeCount; i++) {
+        nodeExecutors[i].shutdown();
+      }
+
+      for (int i = 0; i < nodeCount; i++) {
+        while (!nodeExecutors[i].awaitTermination(100, TimeUnit.MILLISECONDS));
+      }
+      
+      // make sure all cores have been assigned a id in cloudstate
+      for (int i = 0; i < 40; i++) {
+        reader.updateCloudState(true);
+        CloudState state = reader.getCloudState();
+        Map<String,Slice> slices = state.getSlices("collection1");
+        int count = 0;
+        for (String name : slices.keySet()) {
+          count += slices.get(name).getShards().size();
+        }
+        if (coreCount == count) break;
+        Thread.sleep(200);
+      }
+
+      // make sure all cores have been returned a id
+      for (int i = 0; i < 40; i++) {
+        int assignedCount = 0;
+        for (int j = 0; j < coreCount; j++) {
+          if (ids[j] != null) {
+            assignedCount++;
+          }
+        }
+        if (coreCount == assignedCount) {
+          break;
+        }
+        Thread.sleep(200);
+      }
+      
+      final HashMap<String, AtomicInteger> counters = new HashMap<String,AtomicInteger>();
+      for (int i = 1; i < 4; i++) {
+        counters.put("shard" + i, new AtomicInteger());
+      }
+      
+      for (int i = 0; i < coreCount; i++) {
+        final AtomicInteger ai = counters.get(ids[i]);
+        assertNotNull("could not find counter for shard:" + ids[i], ai);
+        ai.incrementAndGet();
+      }
+
+      for (String counter: counters.keySet()) {
+        int count = counters.get(counter).intValue();
+        int expectedCount = coreCount / 3;
+        if (count != expectedCount) {
+          fail("unevenly assigned shard ids, " + counter + " had " + count
+              + ", expected " + expectedCount + " (+-1)");
+        }
+      }
+      
+    } finally {
+      if (DEBUG) {
+        if (controllers[0] != null) {
+          controllers[0].printLayoutToStdOut();
+        }
+      }
+      if (zkClient != null) {
+        zkClient.close();
+      }
+      if (reader != null) {
+        reader.close();
+      }
+      for (int i = 0; i < controllers.length; i++)
+        if (controllers[i] != null) {
+          controllers[i].close();
+        }
+      server.shutdown();
+    }
+    
+    System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
+  }
+
   //wait until i slices for collection have appeared 
-  private void waitForSliceCount(ZkStateReader stateReader, String collection, int i) throws InterruptedException {
+  private void waitForSliceCount(ZkStateReader stateReader, String collection, int i) throws InterruptedException, KeeperException {
     waitForCollections(stateReader, collection);
-    int maxIterations = 400;
+    int maxIterations = 200;
     while (0 < maxIterations--) {
       CloudState state = stateReader.getCloudState();
       Map<String,Slice> sliceMap = state.getSlices(collection);
       if (sliceMap != null && sliceMap.keySet().size() == i) {
         return;
       }
-      Thread.sleep(50);
+      Thread.sleep(100);
     }
   }
 
   //wait until collections are available
-  private void waitForCollections(ZkStateReader stateReader, String... collections) throws InterruptedException {
+  private void waitForCollections(ZkStateReader stateReader, String... collections) throws InterruptedException, KeeperException {
     int maxIterations = 100;
     while (0 < maxIterations--) {
-      Set<String> availableCollections = stateReader.getCloudState().getCollections();
+      stateReader.updateCloudState(true);
+      final CloudState state = stateReader.getCloudState();
+      Set<String> availableCollections = state.getCollections();
       int availableCount = 0;
       for(String requiredCollection: collections) {
         if(availableCollections.contains(requiredCollection)) {
@@ -160,6 +317,7 @@ public class OverseerTest extends SolrTe
         Thread.sleep(50);
       }
     }
+    log.warn("Timeout waiting for collections: " + Arrays.asList(collections) + " state:" + stateReader.getCloudState());
   }
   
   @Test
@@ -171,6 +329,7 @@ public class OverseerTest extends SolrTe
     
     SolrZkClient zkClient = null;
     ZkStateReader reader = null;
+    SolrZkClient overseerClient = null;
     
     try {
       server.run();
@@ -189,15 +348,12 @@ public class OverseerTest extends SolrTe
       reader = new ZkStateReader(zkClient);
       reader.createClusterStateWatchersAndUpdate();
 
-      OverseerElector elector1 = new OverseerElector(zkClient, reader);
+      Overseer.createClientNodes(zkClient, "node1");
       
       ElectionContext ec = new OverseerElectionContext("node1");
-      elector1.setup(ec);
-      elector1.joinElection(ec);
-      
-      Thread.sleep(1000);
-      
       
+      overseerClient = electNewOverseer(server.getZkAddress(), reader, ec);
+
       HashMap<String, String> coreProps = new HashMap<String,String>();
       coreProps.put(ZkStateReader.URL_PROP, "http://127.0.0.1/solr");
       coreProps.put(ZkStateReader.NODE_NAME_PROP, "node1");
@@ -230,10 +386,8 @@ public class OverseerTest extends SolrTe
       state = new CoreState("core1", "collection1", coreProps);
 
       zkClient.setData(nodePath, ZkStateReader.toJSON(new CoreState[]{state}));
-      
-      Thread.sleep(2000); // wait for data to update
-      
-      assertEquals("Illegal state", ZkStateReader.ACTIVE, reader.getCloudState().getSlice("collection1", "shard1").getShards().get("core1").get(ZkStateReader.STATE_PROP));
+
+      verifyStatus(reader, ZkStateReader.ACTIVE);
 
     } finally {
       System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
@@ -241,13 +395,28 @@ public class OverseerTest extends SolrTe
       if (zkClient != null) {
         zkClient.close();
       }
+      if (overseerClient != null) {
+        overseerClient.close();
+      }
+
       if (reader != null) {
         reader.close();
       }
       server.shutdown();
     }
-    
+  }
 
+  private void verifyStatus(ZkStateReader reader, String expectedState) throws InterruptedException {
+    int maxIterations = 100;
+    String coreState = null;
+    while(maxIterations-->0) {
+      coreState = reader.getCloudState().getSlice("collection1", "shard1").getShards().get("core1").get(ZkStateReader.STATE_PROP);
+      if(coreState.equals(expectedState)) {
+        return;
+      }
+      Thread.sleep(50);
+    }
+    fail("Illegal state, was:" + coreState + " expected:" + expectedState + "cloudState:" + reader.getCloudState());
   }
   
   @Test
@@ -257,100 +426,76 @@ public class OverseerTest extends SolrTe
     
     ZkTestServer server = new ZkTestServer(zkDir);
     
-    SolrZkClient zkClient = null;
-    SolrZkClient zkClient2 = null;
+    SolrZkClient controllerClient = null;
+    SolrZkClient overseerClient = null;
     ZkStateReader reader = null;
     
     try {
       server.run();
-      zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
-      zkClient2 = new SolrZkClient(server.getZkAddress(), TIMEOUT);
+      controllerClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
       
       AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
       AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
-      zkClient.makePath("/live_nodes");
+      controllerClient.makePath("/live_nodes");
       
-      // create collections
-      Map<String,String> props = new HashMap<String,String>();
-      props.put(ZkStateReader.NUM_SHARDS_PROP, "2");
-      ZkNodeProps zkProps = new ZkNodeProps(props);
-      zkClient.makePath("/collections/collection1",
-          ZkStateReader.toJSON(zkProps));
-
-      reader = new ZkStateReader(zkClient2);
+      reader = new ZkStateReader(controllerClient);
       reader.createClusterStateWatchersAndUpdate();
 
-      OverseerElector elector1 = new OverseerElector(zkClient, reader);
+      Overseer.createClientNodes(controllerClient, "node1");
+
       
       ElectionContext ec = new OverseerElectionContext("node1");
-      elector1.setup(ec);
-      elector1.joinElection(ec);
-      
-      Thread.sleep(50);
-
-
-      OverseerElector elector2 = new OverseerElector(zkClient2, reader);
       
-      elector2.setup(ec);
-      elector2.joinElection(ec);
+      overseerClient = electNewOverseer(server.getZkAddress(), reader, ec);
       
       // live node
-      String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1";
-      zkClient2.makePath(nodePath, CreateMode.EPHEMERAL);
-      
+      final String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1";
+      controllerClient.makePath(nodePath, CreateMode.EPHEMERAL);
       
       HashMap<String,String> coreProps = new HashMap<String,String>();
       coreProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
       CoreState state = new CoreState("core1", "collection1", coreProps);
       
-      nodePath = "/node_states/node1";
-      
-      try {
-        zkClient2.makePath(nodePath, CreateMode.EPHEMERAL);
-      } catch (KeeperException ke) {
-        if (ke.code() != Code.NODEEXISTS) {
-          throw ke;
-        }
-      }
+      final String statePath = Overseer.STATES_NODE + "/node1";
       // publish node state (recovering)
-      zkClient2.setData(nodePath, ZkStateReader.toJSON(new CoreState[] {state}));
+      controllerClient.setData(statePath, ZkStateReader.toJSON(new CoreState[] {state}));
       
       // wait overseer assignment
       waitForSliceCount(reader, "collection1", 1);
-      
-      assertEquals("Illegal state", ZkStateReader.RECOVERING,
-          reader.getCloudState().getSlice("collection1", "shard1").getShards()
-              .get("core1").get(ZkStateReader.STATE_PROP));
-      
-      //zkClient2.printLayoutToStdOut();
-      // close overseer client (kills current overseer)
-      zkClient.close();
-      zkClient = null;
-      
+
+      verifyStatus(reader, ZkStateReader.RECOVERING);
+
       // publish node state (active)
       coreProps.put(ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
       coreProps.put(ZkStateReader.SHARD_ID_PROP, "shard1");
       state = new CoreState("core1", "collection1", coreProps);
+      controllerClient.setData(statePath,
+          ZkStateReader.toJSON(new CoreState[] {state}));
+
+      verifyStatus(reader, ZkStateReader.ACTIVE);
+      overseerClient.close();
       
-      zkClient2
-          .setData(nodePath, ZkStateReader.toJSON(new CoreState[] {state}));
-      
-      // nocommit - we should do short waits and poll
-      Thread.sleep(1000); // wait for data to update
-      
-      // zkClient2.printLayoutToStdOut();
-      
-      assertEquals("Illegal state", ZkStateReader.ACTIVE,
-          reader.getCloudState().getSlice("collection1", "shard1").getShards()
-              .get("core1").get(ZkStateReader.STATE_PROP));
-      
+      coreProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
+      state = new CoreState("core1", "collection1", coreProps);
+             
+      controllerClient.setData(statePath,
+          ZkStateReader.toJSON(new CoreState[] {state}));
+
+      overseerClient = electNewOverseer(server.getZkAddress(), reader, ec);
+      
+      verifyStatus(reader, ZkStateReader.RECOVERING);
+      
+      assertEquals("Live nodes count does not match", 1, reader.getCloudState()
+          .getLiveNodes().size());
+      assertEquals("Shard count does not match", 1, reader.getCloudState()
+          .getSlice("collection1", "shard1").getShards().size());      
     } finally {
       
-      if (zkClient != null) {
-       zkClient.close();
+      if (overseerClient != null) {
+       overseerClient.close();
       }
-      if (zkClient2 != null) {
-        zkClient2.close();
+      if (controllerClient != null) {
+        controllerClient.close();
       }
       if (reader != null) {
         reader.close();
@@ -358,4 +503,16 @@ public class OverseerTest extends SolrTe
       server.shutdown();
     }
   }
+
+  private SolrZkClient electNewOverseer(String address,
+      ZkStateReader reader, ElectionContext ec) throws InterruptedException,
+      TimeoutException, IOException, KeeperException {
+    SolrZkClient overseerClient;
+    OverseerElector overseerElector;
+    overseerClient = new SolrZkClient(address, TIMEOUT);
+    overseerElector = new OverseerElector(overseerClient, reader);
+    overseerElector.setup(ec);
+    overseerElector.joinElection(ec);
+    return overseerClient;
+  }
 }
\ No newline at end of file

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CoreAssignment.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CoreAssignment.java?rev=1215227&r1=1215226&r2=1215227&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CoreAssignment.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CoreAssignment.java Fri Dec 16 17:07:44 2011
@@ -36,7 +36,7 @@ public class CoreAssignment implements J
     this.properties = Collections.unmodifiableMap(props);
   }
   
-  public CoreAssignment(String coreName, String collectionName, Map<String,String> properties) {
+  public CoreAssignment(String coreName, Map<String,String> properties) {
     HashMap<String,String> props = new HashMap<String,String>();
     props.putAll(properties);
     props.put(CORE, coreName);
@@ -66,12 +66,16 @@ public class CoreAssignment implements J
   
   @Override
   public int hashCode() {
-    return getCoreName().hashCode();
+    return properties.hashCode();
   }
   
   @Override
-  public boolean equals(Object obj) {
-    return hashCode() == obj.hashCode();
+  public boolean equals(Object other) {
+    if(other instanceof CoreAssignment) {
+      CoreAssignment otherAssignment = (CoreAssignment) other;
+      return this.getProperties().equals(otherAssignment.getProperties());
+    }
+    return false;
   }
   
   @Override