You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2014/03/05 02:35:34 UTC

svn commit: r1574281 - in /lucene/dev/branches/branch_4x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/cloud/ solr/core/src/test/org/apache/solr/cloud/

Author: markrmiller
Date: Wed Mar  5 01:35:33 2014
New Revision: 1574281

URL: http://svn.apache.org/r1574281
Log:
SOLR-5811: The Overseer will retry work items until success, which is a serious problem if you hit a bad work item.

Modified:
    lucene/dev/branches/branch_4x/   (props changed)
    lucene/dev/branches/branch_4x/solr/   (props changed)
    lucene/dev/branches/branch_4x/solr/CHANGES.txt   (contents, props changed)
    lucene/dev/branches/branch_4x/solr/core/   (props changed)
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java

Modified: lucene/dev/branches/branch_4x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/CHANGES.txt?rev=1574281&r1=1574280&r2=1574281&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/solr/CHANGES.txt Wed Mar  5 01:35:33 2014
@@ -70,6 +70,9 @@ Bug Fixes
 * SOLR-5761: HttpSolrServer has a few fields that can be set via setters but
   are not volatile. (Mark Miller, Gregory Chanan)
 
+* SOLR-5811: The Overseer will retry work items until success, which is a serious
+  problem if you hit a bad work item. (Mark Miller)
+
 Optimizations
 ----------------------
 * SOLR-1880: Distributed Search skips GET_FIELDS stage if EXECUTE_QUERY

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1574281&r1=1574280&r2=1574281&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java Wed Mar  5 01:35:33 2014
@@ -123,7 +123,16 @@ public class Overseer {
                 else if (LeaderStatus.YES == isLeader) {
                   final ZkNodeProps message = ZkNodeProps.load(head);
                   final String operation = message.getStr(QUEUE_OPERATION);
-                  clusterState = processMessage(clusterState, message, operation);
+                  try {
+                    clusterState = processMessage(clusterState, message, operation);
+                  } catch (Exception e) {
+                    // generally there is nothing we can do - in most cases, we have
+                    // an issue that will fail again on retry or we cannot communicate with
+                    // ZooKeeper in which case another Overseer should take over
+                    // TODO: if ordering for the message is not important, we could
+                    // track retries and put it back on the end of the queue
+                    log.error("Could not process Overseer message", e);
+                  }
                   zkClient.setData(ZkStateReader.CLUSTER_STATE,
                       ZkStateReader.toJSON(clusterState), true);
                   
@@ -189,8 +198,16 @@ public class Overseer {
             while (head != null) {
               final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
               final String operation = message.getStr(QUEUE_OPERATION);
-
-              clusterState = processMessage(clusterState, message, operation);
+              try {
+                clusterState = processMessage(clusterState, message, operation);
+              } catch (Exception e) {
+                // generally there is nothing we can do - in most cases, we have
+                // an issue that will fail again on retry or we cannot communicate with
+                // ZooKeeper in which case another Overseer should take over
+                // TODO: if ordering for the message is not important, we could
+                // track retries and put it back on the end of the queue
+                log.error("Could not process Overseer message", e);
+              }
               workQueue.offer(head.getBytes());
 
               stateUpdateQueue.poll();
@@ -294,6 +311,7 @@ public class Overseer {
     private ClusterState createReplica(ClusterState clusterState, ZkNodeProps message) {
       log.info("createReplica() {} ", message);
       String coll = message.getStr(ZkStateReader.COLLECTION_PROP);
+      checkCollection(message, coll);
       String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
       Slice sl = clusterState.getSlice(coll, slice);
       if(sl == null){
@@ -334,6 +352,7 @@ public class Overseer {
 
     private ClusterState updateShardState(ClusterState clusterState, ZkNodeProps message) {
       String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
+      checkCollection(message, collection);
       log.info("Update shard state invoked for collection: " + collection + " with message: " + message);
       for (String key : message.keySet()) {
         if (ZkStateReader.COLLECTION_PROP.equals(key)) continue;
@@ -358,6 +377,7 @@ public class Overseer {
 
     private ClusterState addRoutingRule(ClusterState clusterState, ZkNodeProps message) {
       String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
+      checkCollection(message, collection);
       String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
       String routeKey = message.getStr("routeKey");
       String range = message.getStr("range");
@@ -397,8 +417,15 @@ public class Overseer {
       return clusterState;
     }
 
+    private void checkCollection(ZkNodeProps message, String collection) {
+      if (collection == null || collection.trim().length() == 0) {
+        log.error("Skipping invalid Overseer message because it has no collection specified: " + message);
+      }
+    }
+
     private ClusterState removeRoutingRule(ClusterState clusterState, ZkNodeProps message) {
       String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
+      checkCollection(message, collection);
       String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
       String routeKeyStr = message.getStr("routeKey");
 
@@ -424,6 +451,7 @@ public class Overseer {
 
     private ClusterState createShard(ClusterState clusterState, ZkNodeProps message) {
       String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
+      checkCollection(message, collection);
       String shardId = message.getStr(ZkStateReader.SHARD_ID_PROP);
       Slice slice = clusterState.getSlice(collection, shardId);
       if (slice == null)  {
@@ -470,6 +498,7 @@ public class Overseer {
 
     private ClusterState updateStateNew(ClusterState clusterState, ZkNodeProps message) {
       String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
+      checkCollection(message, collection);
       String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
 
       if(collection==null || sliceName == null){
@@ -490,9 +519,7 @@ public class Overseer {
        */
       private ClusterState updateState(ClusterState state, final ZkNodeProps message) {
         final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
-        assert collection.length() > 0 : message;
-        
-
+        checkCollection(message, collection);
         Integer numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, null);
         log.info("Update state numShards={} message={}", numShards, message);
 
@@ -851,9 +878,7 @@ public class Overseer {
       private ClusterState removeCollection(final ClusterState clusterState, ZkNodeProps message) {
 
         final String collection = message.getStr("name");
-
-//        final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
-//        newCollections.remove(collection);
+        checkCollection(message, collection);
 
 //        ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newCollections);
         return clusterState.copyWith(singletonMap(collection, (DocCollection)null));
@@ -864,6 +889,7 @@ public class Overseer {
      */
     private ClusterState removeShard(final ClusterState clusterState, ZkNodeProps message) {
       final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
+      checkCollection(message, collection);
       final String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
 
       log.info("Removing collection: " + collection + " shard: " + sliceId + " from clusterstate");
@@ -889,6 +915,7 @@ public class Overseer {
         String cnn = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
 
         final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
+        checkCollection(message, collection);
 
 //        final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
 //        DocCollection coll = newCollections.get(collection);

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1574281&r1=1574280&r2=1574281&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java Wed Mar  5 01:35:33 2014
@@ -1064,6 +1064,12 @@ public final class ZkController {
     final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
     final String collection = cd.getCloudDescriptor().getCollectionName();
     assert collection != null;
+    
+    if (collection == null || collection.trim().length() == 0) {
+      log.error("No collection was specified.");
+      return;
+    }
+    
     ElectionContext context = electionContexts.remove(new ContextKey(collection, coreNodeName));
     
     if (context != null) {

Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java?rev=1574281&r1=1574280&r2=1574281&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java Wed Mar  5 01:35:33 2014
@@ -64,18 +64,18 @@ public class OverseerTest extends SolrTe
   private List<Overseer> overseers = new ArrayList<Overseer>();
   private List<ZkStateReader> readers = new ArrayList<ZkStateReader>();
   
+  private String collection = "collection1";
+  
   public static class MockZKController{
     
     private final SolrZkClient zkClient;
     private final ZkStateReader zkStateReader;
     private final String nodeName;
-    private final String collection;
     private final LeaderElector elector;
     private final Map<String, ElectionContext> electionContext = Collections.synchronizedMap(new HashMap<String, ElectionContext>());
     
-    public MockZKController(String zkAddress, String nodeName, String collection) throws InterruptedException, TimeoutException, IOException, KeeperException {
+    public MockZKController(String zkAddress, String nodeName) throws InterruptedException, TimeoutException, IOException, KeeperException {
       this.nodeName = nodeName;
-      this.collection = collection;
       zkClient = new SolrZkClient(zkAddress, TIMEOUT);
       zkStateReader = new ZkStateReader(zkClient);
       zkStateReader.createClusterStateWatchersAndUpdate();
@@ -105,7 +105,7 @@ public class OverseerTest extends SolrTe
       zkClient.close();
     }
     
-    public String publishState(String coreName, String coreNodeName, String stateName, int numShards)
+    public String publishState(String collection, String coreName, String coreNodeName, String stateName, int numShards)
         throws KeeperException, InterruptedException, IOException {
       if (stateName == null) {
         ElectionContext ec = electionContext.remove(coreName);
@@ -134,41 +134,40 @@ public class OverseerTest extends SolrTe
         q.offer(ZkStateReader.toJSON(m));
       }
       
-      for (int i = 0; i < 120; i++) {
-        String shardId = getShardId("http://" + nodeName + "/solr/", coreName);
-        if (shardId != null) {
-          try {
-            zkClient.makePath("/collections/" + collection + "/leader_elect/"
-                + shardId + "/election", true);
-          } catch (NodeExistsException nee) {}
-          ZkNodeProps props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
-              "http://" + nodeName + "/solr/", ZkStateReader.NODE_NAME_PROP,
-              nodeName, ZkStateReader.CORE_NAME_PROP, coreName,
-              ZkStateReader.SHARD_ID_PROP, shardId,
-              ZkStateReader.COLLECTION_PROP, collection,
-              ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
-          ShardLeaderElectionContextBase ctx = new ShardLeaderElectionContextBase(
-              elector, shardId, collection, nodeName + "_" + coreName, props,
-              zkStateReader);
-          elector.setup(ctx);
-          elector.joinElection(ctx, false);
-          return shardId;
+      if (collection.length() > 0) {
+        for (int i = 0; i < 120; i++) {
+          String shardId = getShardId(collection, coreNodeName);
+          if (shardId != null) {
+            try {
+              zkClient.makePath("/collections/" + collection + "/leader_elect/"
+                  + shardId + "/election", true);
+            } catch (NodeExistsException nee) {}
+            ZkNodeProps props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
+                "http://" + nodeName + "/solr/", ZkStateReader.NODE_NAME_PROP,
+                nodeName, ZkStateReader.CORE_NAME_PROP, coreName,
+                ZkStateReader.SHARD_ID_PROP, shardId,
+                ZkStateReader.COLLECTION_PROP, collection,
+                ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
+            ShardLeaderElectionContextBase ctx = new ShardLeaderElectionContextBase(
+                elector, shardId, collection, nodeName + "_" + coreName, props,
+                zkStateReader);
+            elector.setup(ctx);
+            elector.joinElection(ctx, false);
+            return shardId;
+          }
+          Thread.sleep(500);
         }
-        Thread.sleep(500);
       }
       return null;
     }
     
-    private String getShardId(final String baseUrl, final String coreName) {
-      Map<String,Slice> slices = zkStateReader.getClusterState().getSlicesMap(
-          collection);
+    private String getShardId(String collection, String coreNodeName) {
+      Map<String,Slice> slices = zkStateReader.getClusterState().getSlicesMap(collection);
       if (slices != null) {
         for (Slice slice : slices.values()) {
           for (Replica replica : slice.getReplicas()) {
-            // TODO: for really large clusters, we could 'index' on this
-            String rbaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
-            String rcore = replica.getStr(ZkStateReader.CORE_NAME_PROP);
-            if (baseUrl.equals(rbaseUrl) && coreName.equals(rcore)) {
+            String cnn = replica.getName();
+            if (coreNodeName.equals(cnn)) {
               return slice.getName();
             }
           }
@@ -226,17 +225,17 @@ public class OverseerTest extends SolrTe
       ZkStateReader reader = new ZkStateReader(zkClient);
       reader.createClusterStateWatchersAndUpdate();
       
-      zkController = new MockZKController(server.getZkAddress(), "127.0.0.1", "collection1");
+      zkController = new MockZKController(server.getZkAddress(), "127.0.0.1");
 
       final int numShards=6;
       
       for (int i = 0; i < numShards; i++) {
-        assertNotNull("shard got no id?", zkController.publishState("core" + (i+1), "node" + (i+1), ZkStateReader.ACTIVE, 3));
+        assertNotNull("shard got no id?", zkController.publishState(collection, "core" + (i+1), "node" + (i+1), ZkStateReader.ACTIVE, 3));
       }
-
-      assertEquals(2, reader.getClusterState().getSlice("collection1", "shard1").getReplicasMap().size());
-      assertEquals(2, reader.getClusterState().getSlice("collection1", "shard2").getReplicasMap().size());
-      assertEquals(2, reader.getClusterState().getSlice("collection1", "shard3").getReplicasMap().size());
+      Map<String,Replica> rmap = reader.getClusterState().getSlice("collection1", "shard1").getReplicasMap();
+      assertEquals(rmap.toString(), 2, rmap.size());
+      assertEquals(rmap.toString(), 2, reader.getClusterState().getSlice("collection1", "shard2").getReplicasMap().size());
+      assertEquals(rmap.toString(), 2, reader.getClusterState().getSlice("collection1", "shard3").getReplicasMap().size());
       
       //make sure leaders are in cloud state
       assertNotNull(reader.getLeaderUrl("collection1", "shard1", 15000));
@@ -259,6 +258,81 @@ public class OverseerTest extends SolrTe
   }
 
   @Test
+  public void testBadQueueItem() throws Exception {
+    String zkDir = dataDir.getAbsolutePath() + File.separator
+        + "zookeeper/server1/data";
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    MockZKController zkController = null;
+    SolrZkClient zkClient = null;
+    SolrZkClient overseerClient = null;
+
+    try {
+      server.run();
+      AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
+      AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
+      
+      zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
+      zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
+
+      overseerClient = electNewOverseer(server.getZkAddress());
+
+      ZkStateReader reader = new ZkStateReader(zkClient);
+      reader.createClusterStateWatchersAndUpdate();
+      
+      zkController = new MockZKController(server.getZkAddress(), "127.0.0.1");
+
+      final int numShards=3;
+      
+      for (int i = 0; i < numShards; i++) {
+        assertNotNull("shard got no id?", zkController.publishState(collection, "core" + (i+1), "node" + (i+1), ZkStateReader.ACTIVE, 3));
+      }
+
+      assertEquals(1, reader.getClusterState().getSlice(collection, "shard1").getReplicasMap().size());
+      assertEquals(1, reader.getClusterState().getSlice(collection, "shard2").getReplicasMap().size());
+      assertEquals(1, reader.getClusterState().getSlice(collection, "shard3").getReplicasMap().size());
+      
+      //make sure leaders are in cloud state
+      assertNotNull(reader.getLeaderUrl(collection, "shard1", 15000));
+      assertNotNull(reader.getLeaderUrl(collection, "shard2", 15000));
+      assertNotNull(reader.getLeaderUrl(collection, "shard3", 15000));
+      
+      // publish a bad queue item
+      String emptyCollectionName = "";
+      zkController.publishState(emptyCollectionName, "core0", "node0", ZkStateReader.ACTIVE, 1);
+      zkController.publishState(emptyCollectionName, "core0", "node0", null, 1);
+      
+      // make sure the Overseer is still processing items
+      for (int i = 0; i < numShards; i++) {
+        assertNotNull("shard got no id?", zkController.publishState("collection2", "core" + (i+1), "node" + (i+1), ZkStateReader.ACTIVE, 3));
+      }
+
+      assertEquals(1, reader.getClusterState().getSlice("collection2", "shard1").getReplicasMap().size());
+      assertEquals(1, reader.getClusterState().getSlice("collection2", "shard2").getReplicasMap().size());
+      assertEquals(1, reader.getClusterState().getSlice("collection2", "shard3").getReplicasMap().size());
+      
+      //make sure leaders are in cloud state
+      assertNotNull(reader.getLeaderUrl("collection2", "shard1", 15000));
+      assertNotNull(reader.getLeaderUrl("collection2", "shard2", 15000));
+      assertNotNull(reader.getLeaderUrl("collection2", "shard3", 15000));
+      
+    } finally {
+      if (DEBUG) {
+        if (zkController != null) {
+          zkClient.printLayoutToStdOut();
+        }
+      }
+      close(zkClient);
+      if (zkController != null) {
+        zkController.close();
+      }
+      close(overseerClient);
+      server.shutdown();
+    }
+  }
+  
+  @Test
   public void testShardAssignmentBigger() throws Exception {
     String zkDir = dataDir.getAbsolutePath() + File.separator
         + "zookeeper/server1/data";
@@ -289,7 +363,7 @@ public class OverseerTest extends SolrTe
       reader.createClusterStateWatchersAndUpdate();
 
       for (int i = 0; i < nodeCount; i++) {
-        controllers[i] = new MockZKController(server.getZkAddress(), "node" + i, "collection1");
+        controllers[i] = new MockZKController(server.getZkAddress(), "node" + i);
       }      
       for (int i = 0; i < nodeCount; i++) {
         nodeExecutors[i] = Executors.newFixedThreadPool(1, new DefaultSolrThreadFactory("testShardAssignment"));
@@ -306,7 +380,7 @@ public class OverseerTest extends SolrTe
             final String coreName = "core" + slot;
             
             try {
-              ids[slot]=controllers[slot % nodeCount].publishState(coreName, "node" + slot, ZkStateReader.ACTIVE, sliceCount);
+              ids[slot]=controllers[slot % nodeCount].publishState(collection, coreName, "node" + slot, ZkStateReader.ACTIVE, sliceCount);
             } catch (Throwable e) {
               e.printStackTrace();
               fail("register threw exception:" + e.getClass());
@@ -551,21 +625,20 @@ public class OverseerTest extends SolrTe
       reader = new ZkStateReader(zkClient);
       reader.createClusterStateWatchersAndUpdate();
       
-      mockController = new MockZKController(server.getZkAddress(), "node1",
-          "collection1");
+      mockController = new MockZKController(server.getZkAddress(), "node1");
       
       overseerClient = electNewOverseer(server.getZkAddress());
       
       Thread.sleep(1000);
-      mockController.publishState("core1", "core_node1",
+      mockController.publishState(collection, "core1", "core_node1",
           ZkStateReader.RECOVERING, 1);
       
-      waitForCollections(reader, "collection1");
+      waitForCollections(reader, collection);
       verifyStatus(reader, ZkStateReader.RECOVERING);
       
       int version = getClusterStateVersion(zkClient);
       
-      mockController.publishState("core1", "core_node1", ZkStateReader.ACTIVE,
+      mockController.publishState(collection, "core1", "core_node1", ZkStateReader.ACTIVE,
           1);
       
       while (version == getClusterStateVersion(zkClient));
@@ -575,7 +648,7 @@ public class OverseerTest extends SolrTe
       overseerClient.close();
       Thread.sleep(1000); // wait for overseer to get killed
       
-      mockController.publishState("core1", "core_node1",
+      mockController.publishState(collection, "core1", "core_node1",
           ZkStateReader.RECOVERING, 1);
       version = getClusterStateVersion(zkClient);
       
@@ -588,13 +661,13 @@ public class OverseerTest extends SolrTe
       assertEquals("Live nodes count does not match", 1, reader
           .getClusterState().getLiveNodes().size());
       assertEquals("Shard count does not match", 1, reader.getClusterState()
-          .getSlice("collection1", "shard1").getReplicasMap().size());
+          .getSlice(collection, "shard1").getReplicasMap().size());
       version = getClusterStateVersion(zkClient);
-      mockController.publishState("core1", "core_node1", null, 1);
+      mockController.publishState(collection, "core1", "core_node1", null, 1);
       while (version == getClusterStateVersion(zkClient));
       Thread.sleep(500);
       assertFalse("collection1 should be gone after publishing the null state",
-          reader.getClusterState().getCollections().contains("collection1"));
+          reader.getClusterState().getCollections().contains(collection));
     } finally {
       close(mockController);
       close(overseerClient);
@@ -676,17 +749,17 @@ public class OverseerTest extends SolrTe
 
       for (int i = 0; i < atLeast(4); i++) {
         killCounter.incrementAndGet(); //for each round allow 1 kill
-        mockController = new MockZKController(server.getZkAddress(), "node1", "collection1");
-        mockController.publishState("core1", "node1", "state1",1);
+        mockController = new MockZKController(server.getZkAddress(), "node1");
+        mockController.publishState(collection, "core1", "node1", "state1",1);
         if(mockController2!=null) {
           mockController2.close();
           mockController2 = null;
         }
-        mockController.publishState("core1", "node1","state2",1);
-        mockController2 = new MockZKController(server.getZkAddress(), "node2", "collection1");
-        mockController.publishState("core1", "node1", "state1",1);
+        mockController.publishState(collection, "core1", "node1","state2",1);
+        mockController2 = new MockZKController(server.getZkAddress(), "node2");
+        mockController.publishState(collection, "core1", "node1", "state1",1);
         verifyShardLeader(reader, "collection1", "shard1", "core1");
-        mockController2.publishState("core4", "node2", "state2" ,1);
+        mockController2.publishState(collection, "core4", "node2", "state2" ,1);
         mockController.close();
         mockController = null;
         verifyShardLeader(reader, "collection1", "shard1", "core4");
@@ -729,11 +802,11 @@ public class OverseerTest extends SolrTe
       reader = new ZkStateReader(controllerClient);
       reader.createClusterStateWatchersAndUpdate();
 
-      mockController = new MockZKController(server.getZkAddress(), "node1", "collection1");
+      mockController = new MockZKController(server.getZkAddress(), "node1");
       
       overseerClient = electNewOverseer(server.getZkAddress());
 
-      mockController.publishState("core1", "core_node1", ZkStateReader.RECOVERING, 1);
+      mockController.publishState(collection, "core1", "core_node1", ZkStateReader.RECOVERING, 1);
 
       waitForCollections(reader, "collection1");
       
@@ -743,8 +816,8 @@ public class OverseerTest extends SolrTe
 
       int version = getClusterStateVersion(controllerClient);
       
-      mockController = new MockZKController(server.getZkAddress(), "node1", "collection1");
-      mockController.publishState("core1", "core_node1", ZkStateReader.RECOVERING, 1);
+      mockController = new MockZKController(server.getZkAddress(), "node1");
+      mockController.publishState(collection, "core1", "core_node1", ZkStateReader.RECOVERING, 1);
 
       while (version == getClusterStateVersion(controllerClient));
       
@@ -794,11 +867,11 @@ public class OverseerTest extends SolrTe
       reader = new ZkStateReader(controllerClient);
       reader.createClusterStateWatchersAndUpdate();
 
-      mockController = new MockZKController(server.getZkAddress(), "node1", "collection1");
+      mockController = new MockZKController(server.getZkAddress(), "node1");
       
       overseerClient = electNewOverseer(server.getZkAddress());
 
-      mockController.publishState("core1", "node1", ZkStateReader.RECOVERING, 12);
+      mockController.publishState(collection, "core1", "node1", ZkStateReader.RECOVERING, 12);
 
       waitForCollections(reader, "collection1");