You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2011/12/23 15:26:37 UTC

svn commit: r1222688 - in /lucene/dev/branches/solrcloud/solr: core/src/java/org/apache/solr/cloud/ core/src/test/org/apache/solr/cloud/ solrj/src/java/org/apache/solr/common/cloud/

Author: markrmiller
Date: Fri Dec 23 14:26:37 2011
New Revision: 1222688

URL: http://svn.apache.org/viewvc?rev=1222688&view=rev
Log:
apply sami's latest patch

Added:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java   (with props)
Removed:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/OverseerElector.java
Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CoreState.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1222688&r1=1222687&r2=1222688&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Fri Dec 23 14:26:37 2011
@@ -1,5 +1,11 @@
 package org.apache.solr.cloud;
 
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -20,32 +26,54 @@ package org.apache.solr.cloud;
 public abstract class ElectionContext {
   
   final String electionPath;
-  final byte[] leaderProps;
+  final ZkNodeProps leaderProps;
   final String id;
+  final String leaderPath;
   
   public ElectionContext(final String shardZkNodeName,
-      final String electionPath, final byte[] leaderProps) {
+      final String electionPath, final String leaderPath, final ZkNodeProps leaderProps) {
     this.id = shardZkNodeName;
     this.electionPath = electionPath;
+    this.leaderPath = leaderPath;
     this.leaderProps = leaderProps;
   }
   
+  abstract void runLeaderProcess() throws KeeperException, InterruptedException;
 }
 
 final class ShardLeaderElectionContext extends ElectionContext {
   
-  public ShardLeaderElectionContext(final String shardid,
-      final String collection, final String shardZkNodeName, final byte[] props) {
-    super(shardZkNodeName, "/collections/" + collection + "/leader_elect/"
-        + shardid, props);
+  private final SolrZkClient zkClient;
+  public ShardLeaderElectionContext(final String shardId,
+      final String collection, final String shardZkNodeName, ZkNodeProps props, SolrZkClient zkClient) {
+    super(shardZkNodeName, ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/leader_elect/"
+        + shardId, ZkStateReader.getShardLeadersPath(collection, shardId),
+        props);
+    this.zkClient = zkClient;
+  }
+
+  @Override
+  void runLeaderProcess() throws KeeperException, InterruptedException {
+    String currentLeaderZkPath = leaderPath;
+    zkClient.makePath(currentLeaderZkPath, leaderProps == null ? null
+        : ZkStateReader.toJSON(leaderProps), CreateMode.EPHEMERAL);
   }
-  
 }
 
 final class OverseerElectionContext extends ElectionContext {
   
-  public OverseerElectionContext(final String zkNodeName) {
-    super(zkNodeName, "/overseer_elect", null);
+  private final SolrZkClient zkClient;
+  private final ZkStateReader stateReader;
+
+  public OverseerElectionContext(final String zkNodeName, SolrZkClient zkClient, ZkStateReader stateReader) {
+    super(zkNodeName, "/overseer_elect", null, null);
+    this.zkClient = zkClient;
+    this.stateReader = stateReader;
+  }
+
+  @Override
+  void runLeaderProcess() throws KeeperException, InterruptedException {
+    new Overseer(zkClient, stateReader);
   }
   
 }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java?rev=1222688&r1=1222687&r2=1222688&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java Fri Dec 23 14:26:37 2011
@@ -40,8 +40,7 @@ public class NodeStateWatcher implements
   private static Logger log = LoggerFactory.getLogger(NodeStateWatcher.class);
 
   public static interface NodeStateChangeListener {
-    void coreCreated(String shardZkNodeName, Set<CoreState> cores) throws KeeperException, InterruptedException;
-    void coreChanged(String nodeName, Set<CoreState> cores) throws KeeperException, InterruptedException;
+    void coreChanged(String nodeName, Set<CoreState> changedCores, Set<CoreState> allCores) throws KeeperException, InterruptedException;
   }
 
   private final SolrZkClient zkClient;
@@ -57,11 +56,12 @@ public class NodeStateWatcher implements
   }
 
   public NodeStateWatcher(SolrZkClient zkClient, String nodeName, String path,
-      NodeStateChangeListener listener) {
+      NodeStateChangeListener listener) throws KeeperException, InterruptedException {
     this.nodeName = nodeName;
     this.zkClient = zkClient;
     this.path = path;
     this.listener = listener;
+    processStateChange();
   }
 
   public void close() {
@@ -73,8 +73,7 @@ public class NodeStateWatcher implements
     if (stop)
       return;
     try {
-      byte[] data = zkClient.getData(path, this, null);
-      processStateChange(data);
+      processStateChange();
     } catch (KeeperException e) {
       // nocommit: stop working on any keeper error
       log.warn("Could not talk to ZK", e);
@@ -86,13 +85,15 @@ public class NodeStateWatcher implements
     }
   }
 
-  void processStateChange(byte[] data) {
+  private void processStateChange() throws KeeperException, InterruptedException {
+    byte[] data = zkClient.getData(path, this, null);
+
     if (data != null) {
         CoreState[] states = CoreState.load(data);
         List<CoreState> stateList = Arrays.asList(states);
-        HashSet<CoreState> newCores = new HashSet<CoreState>();
-        newCores.addAll(stateList);
-        newCores.removeAll(currentState);
+        HashSet<CoreState> modifiedCores = new HashSet<CoreState>();
+        modifiedCores.addAll(stateList);
+        modifiedCores.removeAll(currentState);
 
         HashSet<CoreState> newState = new HashSet<CoreState>();
         newState.addAll(stateList);
@@ -102,22 +103,20 @@ public class NodeStateWatcher implements
           lookup.put(state.getCoreName(), state);
         }
 
-        HashSet<CoreState> changedCores = new HashSet<CoreState>();
-
         //check for status change
         for(CoreState state: currentState) {
           if(lookup.containsKey(state.getCoreName())) {
             if(!state.getProperties().equals(lookup.get(state.getCoreName()).getProperties())) {
-              changedCores.add(lookup.get(state.getCoreName()));
+              modifiedCores.add(lookup.get(state.getCoreName()));
             }
           }
         }
         
         currentState = Collections.unmodifiableSet(newState);
 
-        if (newCores.size() > 0) {
+        if (modifiedCores.size() > 0) {
           try {
-            listener.coreCreated(nodeName, Collections.unmodifiableSet(newCores));
+            listener.coreChanged(nodeName, Collections.unmodifiableSet(modifiedCores), currentState);
           } catch (KeeperException e) {
             log.warn("Could not talk to ZK", e);
           } catch (InterruptedException e) {
@@ -126,16 +125,6 @@ public class NodeStateWatcher implements
           }
         }
 
-        if (changedCores.size() > 0) {
-          try {
-          listener.coreChanged(nodeName, Collections.unmodifiableSet(changedCores));
-          } catch (KeeperException e) {
-            log.warn("Could not talk to ZK", e);
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            log.warn("Could not talk to ZK", e);
-          }
-        }
     } else {
       // ignore null state
     }

Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java?rev=1222688&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java Fri Dec 23 14:26:37 2011
@@ -0,0 +1,89 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.KeeperException.Code;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A watcher for shard leader.
+ */
+public class ShardLeaderWatcher implements Watcher {
+  private static Logger logger = LoggerFactory.getLogger(ShardLeaderWatcher.class); 
+  static interface ShardLeaderListener {
+    void announceLeader(String collection, String shardId, ZkNodeProps props);
+  }
+  
+  private final String shard;
+  private final String collection;
+  private final String path;
+  private final SolrZkClient zkClient;
+  private volatile boolean closed = false;
+  private final ShardLeaderListener listener;
+  
+  public ShardLeaderWatcher(String shard, String collection,
+      SolrZkClient zkClient, ShardLeaderListener listener) throws KeeperException, InterruptedException {
+    this.shard = shard;
+    this.collection = collection;
+    this.path = ZkStateReader.getShardLeadersPath(collection, shard);
+    this.zkClient = zkClient;
+    this.listener = listener;
+    processLeaderChange();
+  }
+  
+  private void processLeaderChange() throws KeeperException, InterruptedException {
+    if(closed) return;
+    try {
+      byte[] data = zkClient.getData(path, this, null);
+      if (data != null) {
+        final ZkNodeProps leaderProps = ZkNodeProps.load(data);
+        listener.announceLeader(collection, shard, leaderProps);
+      }
+    } catch (KeeperException ke) {
+      //check if we lost connection or the node was gone
+      if (ke.code() != Code.CONNECTIONLOSS && ke.code() != Code.SESSIONEXPIRED
+          && ke.code() != Code.NONODE) {
+        throw ke;
+      }
+    }
+  }
+
+  @Override
+  public void process(WatchedEvent event) {
+    try {
+      processLeaderChange();
+    } catch (KeeperException e) {
+      logger.warn("Shard leader watch triggered but Solr cannot talk to zk.");
+    } catch (InterruptedException e) {
+      Thread.interrupted();
+      logger.warn("Shard leader watch triggered but Solr cannot talk to zk.");
+    }
+  }
+  
+  public void close() {
+    closed = true;
+  }
+
+}

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java?rev=1222688&r1=1222687&r2=1222688&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java Fri Dec 23 14:26:37 2011
@@ -140,8 +140,8 @@ public class CloudStateUpdateTest extend
     ZkNodeProps zkProps2 = new ZkNodeProps(props2);
     
     SolrZkClient zkClient = new SolrZkClient(zkServer.getZkAddress(), AbstractZkTestCase.TIMEOUT);
-    zkClient.makePath("/collections/testcore", ZkStateReader.toJSON(zkProps2), CreateMode.PERSISTENT);
-    zkClient.makePath("/collections/testcore/shards", CreateMode.PERSISTENT);
+    zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/testcore", ZkStateReader.toJSON(zkProps2), CreateMode.PERSISTENT);
+    zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/testcore/shards", CreateMode.PERSISTENT);
     zkClient.close();
     
     CoreDescriptor dcore = new CoreDescriptor(container1, "testcore",

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java?rev=1222688&r1=1222687&r2=1222688&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java Fri Dec 23 14:26:37 2011
@@ -21,7 +21,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.regex.Matcher;
@@ -32,9 +31,11 @@ import javax.xml.parsers.ParserConfigura
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreContainer.Initializer;
 import org.apache.solr.core.SolrConfig;
+import org.apache.zookeeper.KeeperException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -62,6 +63,8 @@ public class LeaderElectionIntegrationTe
   private Map<String,Set<Integer>> shardPorts = new HashMap<String,Set<Integer>>();
   
   private SolrZkClient zkClient;
+
+  private ZkStateReader reader;
   
   @BeforeClass
   public static void beforeClass() throws Exception {}
@@ -94,6 +97,9 @@ public class LeaderElectionIntegrationTe
     
     zkClient = new SolrZkClient(zkServer.getZkAddress(),
         AbstractZkTestCase.TIMEOUT);
+    
+    reader = new ZkStateReader(zkClient); 
+
     log.info("####SETUP_END " + getName());
     
   }
@@ -122,7 +128,6 @@ public class LeaderElectionIntegrationTe
   public void testSimpleSliceLeaderElection() throws Exception {
 
     //printLayout(zkServer.getZkAddress());
-    
     for (int i = 0; i < 4; i++) {
       // who is the leader?
       String leader = getLeader();
@@ -140,8 +145,14 @@ public class LeaderElectionIntegrationTe
       
       //printLayout(zkServer.getZkAddress());
       
-      // wait a sec for new leader to register
-      Thread.sleep(2000);
+      // poll until leader change is visible
+      for (int j = 0; j < 30; j++) {
+        String currentLeader = getLeader();
+        if(!leader.equals(currentLeader)) {
+          break;
+        }
+        Thread.sleep(100);
+      }
       
       leader = getLeader();
       int newLeaderPort = getLeaderPort(leader);
@@ -165,10 +176,12 @@ public class LeaderElectionIntegrationTe
     int leaderPort = getLeaderPort(leader);
     containerMap.get(leaderPort).getZkController().getZkClient().getSolrZooKeeper().pauseCnxn(2000);
     
-    Thread.sleep(4000);
-    
-    // first leader should not be leader anymore
-    assertNotSame(leaderPort, getLeaderPort(getLeader()));
+    for (int i = 0; i < 30; i++) { // wait till leader is changed
+      if (leaderPort != getLeaderPort(getLeader())) {
+        break;
+      }
+      Thread.sleep(100);
+    }
     
     if (VERBOSE) System.out.println("kill everyone");
     // kill everyone but the first leader that should have reconnected by now
@@ -177,33 +190,36 @@ public class LeaderElectionIntegrationTe
         entry.getValue().shutdown();
       }
     }
-    
-    Thread.sleep(1000);
-    
+
+    for (int i = 0; i < 30; i++) { // wait till leader is changed
+      if (leaderPort == getLeaderPort(getLeader())) {
+        break;
+      }
+      Thread.sleep(100);
+    }
+
     // the original leader should be leader again now - everyone else is down
     assertEquals(leaderPort, getLeaderPort(getLeader()));
     //printLayout(zkServer.getZkAddress());
     //Thread.sleep(100000);
   }
   
-  private String getLeader() throws Exception {
+  private String getLeader() throws InterruptedException {
     String leader = null;
     int tries = 30;
-    while (true) {
-      List<String> leaderChildren = zkClient.getChildren(
-          "/collections/collection1/leader_elect/shard1/leader", null);
-      if (leaderChildren.size() > 0) {
-        assertEquals("There should only be one leader", 1,
-            leaderChildren.size());
-        leader = leaderChildren.get(0);
-        break;
-      } else {
-        if (tries-- == 0) {
-          printLayout(zkServer.getZkAddress());
-          fail("No registered leader was found");
+    while (tries-- > 0) {
+      ZkNodeProps props;
+      try {
+        reader.updateCloudState(true);
+        props = reader.getLeaderProps("collection1", "shard1");
+        leader = props.get(ZkStateReader.NODE_NAME_PROP);
+        if (leader != null) {
+          break;
         }
-        Thread.sleep(1000);
+      } catch (KeeperException e) {
+        // ignore
       }
+      Thread.sleep(100);
     }
     return leader;
   }

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java?rev=1222688&r1=1222687&r2=1222688&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java Fri Dec 23 14:26:37 2011
@@ -29,8 +29,11 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.core.SolrConfig;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -73,11 +76,14 @@ public class LeaderElectionTest extends 
     private int nodeNumber;
     private int seq = -1;
     private volatile boolean stop;
+    private volatile boolean electionDone = false;
+    private final ZkNodeProps props;
     
     public ClientThread(int nodeNumber) throws Exception {
       super("Thread-" + nodeNumber);
       zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
       this.nodeNumber = nodeNumber;
+      props = new ZkNodeProps(ZkStateReader.URL_PROP, Integer.toString(nodeNumber));
     }
     
     @Override
@@ -86,14 +92,14 @@ public class LeaderElectionTest extends 
         LeaderElector elector = new LeaderElector(zkClient);
         
         ElectionContext context = new ShardLeaderElectionContext("shard1",
-            "collection1", Integer.toString(nodeNumber), null);
+            "collection1", Integer.toString(nodeNumber), props, zkClient);
         
         try {
           elector.setup(context);
-          
           seq = elector.joinElection(context);
+          electionDone = true;
           seqToThread.put(seq, this);
-        } catch (Exception e) {
+        } catch (Throwable e) {
           e.printStackTrace();
         }
         
@@ -114,31 +120,32 @@ public class LeaderElectionTest extends 
       this.stop = true;
     }
   }
-  
+
   @Test
-  public void testElection() throws Exception {
-    // add a dummy slice, just for variance - call it shard2
-    
-    SolrZkClient zkClient1 = new SolrZkClient(server.getZkAddress(), TIMEOUT);
-    
-    LeaderElector elector = new LeaderElector(zkClient1);
-    
-    ElectionContext context = new ShardLeaderElectionContext("shard2", "collection1", "dummynode1", null);
-    
+  public void testBasic() throws Exception {
+    LeaderElector elector = new LeaderElector(zkClient);
+    ZkNodeProps props = new ZkNodeProps(ZkStateReader.URL_PROP,"http://127.0.0.1/solr");
+    ElectionContext context = new ShardLeaderElectionContext("shard2", "collection1", "dummynode1", props, zkClient);
     elector.setup(context);
     elector.joinElection(context);
-    zkClient1.close();
-    
-    SolrZkClient zkClient2 = new SolrZkClient(server.getZkAddress(), TIMEOUT);
-    
-    LeaderElector elector2 = new LeaderElector(zkClient2);
-
-    ElectionContext context2 = new ShardLeaderElectionContext("shard2", "collection1", "dummynode2", null);
+    assertEquals("http://127.0.0.1/solr", getLeaderUrl("collection1", "shard2"));
+  }
+  
+  private String getLeaderUrl(String collection, String slice) throws KeeperException, InterruptedException {
+    int iterCount=30;
+    while (iterCount-- > 0)
+      try {
+      byte[] data = zkClient.getData(ZkStateReader.getShardLeadersPath(collection, slice), null, null);
+      ZkNodeProps leaderProps = ZkNodeProps.load(data);
+      return leaderProps.get(ZkStateReader.URL_PROP);
+    } catch (NoNodeException e) {
+      Thread.sleep(100);
+    }
+    throw new RuntimeException("Could not get leader props");
+  }
 
-    elector2.setup(context2);
-    elector2.joinElection(context2);
-    
-    zkClient2.close();
+  @Test
+  public void testElection() throws Exception {
     
     List<ClientThread> threads = new ArrayList<ClientThread>();
     
@@ -152,14 +159,21 @@ public class LeaderElectionTest extends 
       thread.start();
     }
     
-    // make sure the leader node is there from the start
-    try {
-      zkClient.makePath("/collections/collection1/leader_elect/shard1/leader");
-    } catch (KeeperException.NodeExistsException e) {
-      // thats fine
+    
+    while(true) { //wait for election to complete
+      int doneCount = 0;
+      for (ClientThread thread : threads) {
+        if(thread.electionDone) {
+          doneCount++;
+        }
+      }
+      if(doneCount==15) {
+        break;
+      }
+      Thread.sleep(100);
     }
     
-    int leaderThread = Integer.parseInt(getLeader());
+    int leaderThread = Integer.parseInt(getLeaderUrl("collection1", "shard1"));
     
     // whoever the leader is, should be the n_0 seq
     assertEquals(0, threads.get(leaderThread).seq);
@@ -170,9 +184,7 @@ public class LeaderElectionTest extends 
     ((ClientThread) seqToThread.get(1)).close();
     ((ClientThread) seqToThread.get(3)).close();
     
-    Thread.sleep(50);
-    
-    leaderThread = Integer.parseInt(getLeader());
+    leaderThread = Integer.parseInt(getLeaderUrl("collection1", "shard1"));
     
     // whoever the leader is, should be the n_2 seq
     assertEquals(2, threads.get(leaderThread).seq);
@@ -184,9 +196,7 @@ public class LeaderElectionTest extends 
     ((ClientThread) seqToThread.get(7)).close();
     ((ClientThread) seqToThread.get(8)).close();
     
-    Thread.sleep(50);
-    
-    leaderThread = Integer.parseInt(getLeader());
+    leaderThread = Integer.parseInt(getLeaderUrl("collection1", "shard1"));
     
     // whoever the leader is, should be the n_9 seq
     assertEquals(9, threads.get(leaderThread).seq);
@@ -205,6 +215,7 @@ public class LeaderElectionTest extends 
   
   @Test
   public void testStressElection() throws Exception {
+    //TODO add assertions
     final ScheduledExecutorService scheduler = Executors
         .newScheduledThreadPool(100);
     final List<ClientThread> threads = Collections
@@ -278,37 +289,6 @@ public class LeaderElectionTest extends 
     
   }
   
-  private String getLeader() throws Exception {
-    
-    String leader = null;
-    int tries = 30;
-    while (true) {
-      if (!zkClient.exists("/collections/collection1/leader_elect/shard1/leader")) {
-        if (tries-- == 0) {
-          printLayout(server.getZkAddress());
-          fail("No registered leader was found");
-        }
-        Thread.sleep(1000);
-        continue;
-      }
-      List<String> leaderChildren = zkClient.getChildren(
-          "/collections/collection1/leader_elect/shard1/leader", null);
-      if (leaderChildren.size() > 0) {
-        assertEquals("There should only be one leader", 1,
-            leaderChildren.size());
-        leader = leaderChildren.get(0);
-        break;
-      } else {
-        if (tries-- == 0) {
-          printLayout(server.getZkAddress());
-          fail("No registered leader was found");
-        }
-        Thread.sleep(1000);
-      }
-    }
-    return leader;
-  }
-  
   @Override
   public void tearDown() throws Exception {
     zkClient.close();

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java?rev=1222688&r1=1222687&r2=1222688&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java Fri Dec 23 14:26:37 2011
@@ -69,6 +69,7 @@ public class OverseerTest extends SolrTe
       AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
 
       zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
+      ZkStateReader reader = new ZkStateReader(zkClient);
 
       System.setProperty(ZkStateReader.NUM_SHARDS_PROP, "3");
 
@@ -84,46 +85,37 @@ public class OverseerTest extends SolrTe
 
       System.setProperty("bootstrap_confdir", getFile("solr/conf")
           .getAbsolutePath());
-      
-      CloudDescriptor collection1Desc = new CloudDescriptor();
-      collection1Desc.setCollectionName("collection1");
-
-      CoreDescriptor desc = new CoreDescriptor(null, "core1", "");
-      desc.setCloudDescriptor(collection1Desc);
-      String shard1 = zkController.register("core1", desc);
-      collection1Desc.setShardId(null);
-      desc = new CoreDescriptor(null, "core2", "");
-      desc.setCloudDescriptor(collection1Desc);
-      String shard2 = zkController.register("core2", desc);
-      collection1Desc.setShardId(null);
-      desc = new CoreDescriptor(null, "core3", "");
-      desc.setCloudDescriptor(collection1Desc);
-      String shard3 = zkController.register("core3", desc);
-      collection1Desc.setShardId(null);
-      desc = new CoreDescriptor(null, "core4", "");
-      desc.setCloudDescriptor(collection1Desc);
-      String shard4 = zkController.register("core4", desc);
-      collection1Desc.setShardId(null);
-      desc = new CoreDescriptor(null, "core5", "");
-      desc.setCloudDescriptor(collection1Desc);
-      String shard5 = zkController.register("core5", desc);
-      collection1Desc.setShardId(null);
-      desc = new CoreDescriptor(null, "core6", "");
-      desc.setCloudDescriptor(collection1Desc);
-      String shard6 = zkController.register("core6", desc);
-      collection1Desc.setShardId(null);
-
-      assertEquals("shard1", shard1);
-      assertEquals("shard2", shard2);
-      assertEquals("shard3", shard3);
-      assertEquals("shard1", shard4);
-      assertEquals("shard2", shard5);
-      assertEquals("shard3", shard6);
 
+      final int numShards=6;
+      final String[] ids = new String[numShards];
+      
+      for (int i = 0; i < numShards; i++) {
+        CloudDescriptor collection1Desc = new CloudDescriptor();
+        collection1Desc.setCollectionName("collection1");
+        CoreDescriptor desc1 = new CoreDescriptor(null, "core"
+            + (i + 1), "");
+        desc1.setCloudDescriptor(collection1Desc);
+        ids[i] = zkController.register("core" + (i + 1), desc1);
+      }
+      
+      assertEquals("shard1", ids[0]);
+      assertEquals("shard2", ids[1]);
+      assertEquals("shard3", ids[2]);
+      assertEquals("shard1", ids[3]);
+      assertEquals("shard2", ids[4]);
+      assertEquals("shard3", ids[5]);
+
+      waitForSliceCount(reader, "collection1", 3);
+
+      //make sure leaders are in cloud state
+      assertNotNull(reader.getLeaderUrl("collection1", "shard1"));
+      assertNotNull(reader.getLeaderUrl("collection1", "shard2"));
+      assertNotNull(reader.getLeaderUrl("collection1", "shard3"));
+      
     } finally {
       if (DEBUG) {
         if (zkController != null) {
-          zkController.printLayoutToStdOut();
+          zkClient.printLayoutToStdOut();
         }
       }
       if (zkClient != null) {
@@ -268,7 +260,7 @@ public class OverseerTest extends SolrTe
     } finally {
       if (DEBUG) {
         if (controllers[0] != null) {
-          controllers[0].printLayoutToStdOut();
+          zkClient.printLayoutToStdOut();
         }
       }
       if (zkClient != null) {
@@ -350,9 +342,7 @@ public class OverseerTest extends SolrTe
 
       Overseer.createClientNodes(zkClient, "node1");
       
-      ElectionContext ec = new OverseerElectionContext("node1");
-      
-      overseerClient = electNewOverseer(server.getZkAddress(), reader, ec);
+      overseerClient = electNewOverseer(server.getZkAddress());
 
       HashMap<String, String> coreProps = new HashMap<String,String>();
       coreProps.put(ZkStateReader.URL_PROP, "http://127.0.0.1/solr");
@@ -442,11 +432,7 @@ public class OverseerTest extends SolrTe
       reader.createClusterStateWatchersAndUpdate();
 
       Overseer.createClientNodes(controllerClient, "node1");
-
-      
-      ElectionContext ec = new OverseerElectionContext("node1");
-      
-      overseerClient = electNewOverseer(server.getZkAddress(), reader, ec);
+      overseerClient = electNewOverseer(server.getZkAddress());
       
       // live node
       final String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1";
@@ -481,7 +467,7 @@ public class OverseerTest extends SolrTe
       controllerClient.setData(statePath,
           ZkStateReader.toJSON(new CoreState[] {state}));
 
-      overseerClient = electNewOverseer(server.getZkAddress(), reader, ec);
+      overseerClient = electNewOverseer(server.getZkAddress());
       
       verifyStatus(reader, ZkStateReader.RECOVERING);
       
@@ -504,15 +490,14 @@ public class OverseerTest extends SolrTe
     }
   }
 
-  private SolrZkClient electNewOverseer(String address,
-      ZkStateReader reader, ElectionContext ec) throws InterruptedException,
+  private SolrZkClient electNewOverseer(String address) throws InterruptedException,
       TimeoutException, IOException, KeeperException {
-    SolrZkClient overseerClient;
-    OverseerElector overseerElector;
-    overseerClient = new SolrZkClient(address, TIMEOUT);
-    overseerElector = new OverseerElector(overseerClient, reader);
+    SolrZkClient zkClient  = new SolrZkClient(address, TIMEOUT);
+    ZkStateReader reader = new ZkStateReader(zkClient);
+    LeaderElector overseerElector = new LeaderElector(zkClient);
+    ElectionContext ec = new OverseerElectionContext(address, zkClient, reader);
     overseerElector.setup(ec);
     overseerElector.joinElection(ec);
-    return overseerClient;
+    return zkClient;
   }
 }
\ No newline at end of file

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java?rev=1222688&r1=1222687&r2=1222688&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java Fri Dec 23 14:26:37 2011
@@ -31,12 +31,8 @@ import org.apache.solr.common.SolrExcept
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.HashPartitioner.Range;
 import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-// quasi immutable :(
 public class CloudState implements JSONWriter.Writable {
-	protected static Logger log = LoggerFactory.getLogger(CloudState.class);
 	private final Map<String, Map<String,Slice>> collectionStates;  // Map<collectionName, Map<sliceName,Slice>>
 	private final Set<String> liveNodes;
   
@@ -65,25 +61,6 @@ public class CloudState implements JSONW
 		return null;
 	}
 
-	// TODO: this method must die - this object should be immutable!!
-	public void addSlice(String collection, Slice slice) {
-		if (!collectionStates.containsKey(collection)) {
-			log.info("New collection");
-			collectionStates.put(collection, new HashMap<String,Slice>());
-		}
-		if (!collectionStates.get(collection).containsKey(slice.getName())) {
-			collectionStates.get(collection).put(slice.getName(), slice);
-		} else {
-			Map<String,ZkNodeProps> shards = new HashMap<String,ZkNodeProps>();
-			
-			Slice existingSlice = collectionStates.get(collection).get(slice.getName());
-			shards.putAll(existingSlice.getShards());
-			shards.putAll(slice.getShards());
-			Slice updatedSlice = new Slice(slice.getName(), shards);
-			collectionStates.get(collection).put(slice.getName(), updatedSlice);
-		}
-	}
-
 	public Map<String, Slice> getSlices(String collection) {
 		if(!collectionStates.containsKey(collection))
 			return null;
@@ -177,10 +154,10 @@ public class CloudState implements JSONW
 
     for(String collectionName: stateMap.keySet()){
       Map<String, Object> collection = (Map<String, Object>)stateMap.get(collectionName);
-      HashMap<String, Slice> slices = new HashMap<String,Slice>();
+      Map<String, Slice> slices = new LinkedHashMap<String,Slice>();
       for(String sliceName: collection.keySet()) {
         Map<String, Map<String, String>> sliceMap = (Map<String, Map<String, String>>)collection.get(sliceName);
-        HashMap<String, ZkNodeProps> shards = new HashMap<String,ZkNodeProps>();
+        Map<String, ZkNodeProps> shards = new LinkedHashMap<String,ZkNodeProps>();
         for(String shardName: sliceMap.keySet()) {
           shards.put(shardName, new ZkNodeProps(sliceMap.get(shardName)));
         }

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CoreState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CoreState.java?rev=1222688&r1=1222687&r2=1222688&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CoreState.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CoreState.java Fri Dec 23 14:26:37 2011
@@ -74,12 +74,16 @@ public class CoreState implements JSONWr
   
   @Override
   public int hashCode() {
-    return getCoreName().hashCode();
+    return properties.hashCode();
   }
   
   @Override
-  public boolean equals(Object obj) {
-    return hashCode() == obj.hashCode();
+  public boolean equals(Object other) {
+    if(other instanceof CoreState) {
+      CoreState otherState = (CoreState) other;
+      return this.getProperties().equals(otherState.getProperties());
+    }
+    return false;
   }
   
   @Override

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1222688&r1=1222687&r2=1222688&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Fri Dec 23 14:26:37 2011
@@ -46,14 +46,16 @@ import org.slf4j.LoggerFactory;
 public class ZkStateReader {
   private static Logger log = LoggerFactory.getLogger(ZkStateReader.class);
   
-  public static final String COLLECTIONS_ZKNODE = "/collections";
   public static final String URL_PROP = "url";
   public static final String NODE_NAME_PROP = "node_name";
   public static final String ROLES_PROP = "roles";
   public static final String STATE_PROP = "state";
+  public static final String CORE_PROP = "core";
   public static final String SHARD_ID_PROP = "shard_id";
   public static final String NUM_SHARDS_PROP = "numShards";
+  public static final String LEADER_PROP = "leader";
   
+  public static final String COLLECTIONS_ZKNODE = "/collections";
   public static final String LIVE_NODES_ZKNODE = "/live_nodes";
   public static final String CLUSTER_STATE = "/clusterstate.json";
   
@@ -65,6 +67,8 @@ public class ZkStateReader {
   private static final long CLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("CLOUD_UPDATE_DELAY", "5000"));
 
   public static final String LEADER_ELECT_ZKNODE = "/leader_elect";
+
+  public static final String SHARD_LEADERS_ZKNODE = "leaders";
   
   //
   // convenience methods... should these go somewhere else?
@@ -377,32 +381,27 @@ public class ZkStateReader {
   
   public ZkNodeProps getLeaderProps(String collection, String shard) throws InterruptedException, KeeperException {
     int tries = 30;
-    ZkNodeProps props;
-    while (true) {
-      if (!zkClient
-          .exists("/collections/" + collection + "/leader_elect/" + shard + "/leader")) {
-        if (tries-- == 0) {
-          throw new RuntimeException("No registered leader was found");
-        }
-        Thread.sleep(1000);
-        continue;
-      }
-      String leaderPath = "/collections/" + collection + "/leader_elect/" + shard + "/leader";
-      List<String> leaderChildren = zkClient.getChildren(
-          leaderPath, null);
-      if (leaderChildren.size() > 0) {
-        String leader = leaderChildren.get(0);
-        byte[] data = zkClient.getData(leaderPath + "/" + leader, null, null);
-        props = ZkNodeProps.load(data);
-        break;
-      } else {
-        if (tries-- == 0) {
-          throw new RuntimeException("No registered leader was found");
+    while (tries-- > 0) {
+      if (cloudState != null) {
+        Slice slice = cloudState.getSlice(collection, shard);
+        if (slice != null) {
+          for (ZkNodeProps nodeProps : slice.getShards().values()) {
+            if (nodeProps.containsKey(ZkStateReader.LEADER_PROP)) {
+              return nodeProps;
+            }
+          }
         }
-        Thread.sleep(1000);
       }
+      Thread.sleep(200);
+      updateCloudState(true);
     }
-    return props;
+    throw new RuntimeException("No registered leader was found, collection:" + collection + " slice:" + shard);
+  }
+
+  public static String getShardLeadersPath(String collection, String shardId) {
+    return COLLECTIONS_ZKNODE + "/" + collection + "/"
+        + SHARD_LEADERS_ZKNODE + (shardId != null ? ("/" + shardId)
+        : "");
   }
   
 }