You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ro...@apache.org on 2014/12/01 18:25:47 UTC

svn commit: r1642718 [8/12] - in /lucene/dev/branches/lucene2878: ./ dev-tools/ dev-tools/eclipse/dot.settings/ dev-tools/idea/.idea/ dev-tools/idea/lucene/benchmark/src/ dev-tools/idea/lucene/highlighter/ dev-tools/maven/ dev-tools/maven/solr/webapp/ ...

Modified: lucene/dev/branches/lucene2878/solr/contrib/uima/src/test-files/uima/solr/collection1/conf/schema.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2878/solr/contrib/uima/src/test-files/uima/solr/collection1/conf/schema.xml?rev=1642718&r1=1642717&r2=1642718&view=diff
==============================================================================
--- lucene/dev/branches/lucene2878/solr/contrib/uima/src/test-files/uima/solr/collection1/conf/schema.xml (original)
+++ lucene/dev/branches/lucene2878/solr/contrib/uima/src/test-files/uima/solr/collection1/conf/schema.xml Mon Dec  1 17:25:39 2014
@@ -361,7 +361,7 @@
           documentation for more information on pattern and replacement
           string syntax.
 
-          http://docs.oracle.com/javase/7/docs/api/java/util/regex/package-summary.html
+          http://docs.oracle.com/javase/8/docs/api/java/util/regex/package-summary.html
         -->
         <filter class="solr.PatternReplaceFilterFactory" pattern="([^a-z])"
           replacement="" replace="all" />

Modified: lucene/dev/branches/lucene2878/solr/contrib/uima/src/test-files/uima/uima-tokenizers-schema.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2878/solr/contrib/uima/src/test-files/uima/uima-tokenizers-schema.xml?rev=1642718&r1=1642717&r2=1642718&view=diff
==============================================================================
--- lucene/dev/branches/lucene2878/solr/contrib/uima/src/test-files/uima/uima-tokenizers-schema.xml (original)
+++ lucene/dev/branches/lucene2878/solr/contrib/uima/src/test-files/uima/uima-tokenizers-schema.xml Mon Dec  1 17:25:39 2014
@@ -357,7 +357,7 @@
           documentation for more information on pattern and replacement
           string syntax.
 
-          http://docs.oracle.com/javase/7/docs/api/java/util/regex/package-summary.html
+          http://docs.oracle.com/javase/8/docs/api/java/util/regex/package-summary.html
         -->
         <filter class="solr.PatternReplaceFilterFactory" pattern="([^a-z])"
           replacement="" replace="all" />

Modified: lucene/dev/branches/lucene2878/solr/core/build.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2878/solr/core/build.xml?rev=1642718&r1=1642717&r2=1642718&view=diff
==============================================================================
--- lucene/dev/branches/lucene2878/solr/core/build.xml (original)
+++ lucene/dev/branches/lucene2878/solr/core/build.xml Mon Dec  1 17:25:39 2014
@@ -62,8 +62,8 @@
 
   <target name="resolve" depends="ivy-availability-check,ivy-fail,ivy-configure">
     <sequential>
-      <ivy:retrieve conf="compile,compile.hadoop" type="jar,bundle" sync="${ivy.sync}" log="download-only"/>
-      <ivy:retrieve conf="test,test.DfsMiniCluster" type="jar,bundle,test" sync="${ivy.sync}" log="download-only"
+      <ivy:retrieve conf="compile,compile.hadoop" type="jar,bundle" sync="${ivy.sync}" log="download-only" symlink="${ivy.symlink}"/>
+      <ivy:retrieve conf="test,test.DfsMiniCluster" type="jar,bundle,test" sync="${ivy.sync}" log="download-only" symlink="${ivy.symlink}"
                     pattern="${test.lib.dir}/[artifact]-[revision](-[classifier]).[ext]"/>
     </sequential>
   </target>
@@ -92,7 +92,7 @@
   <target name="resolve-javacc" xmlns:ivy="antlib:org.apache.ivy.ant">
     <!-- setup a "fake" JavaCC distribution folder in ${build.dir} to make JavaCC ANT task happy: -->
     <ivy:retrieve organisation="net.java.dev.javacc" module="javacc" revision="5.0"
-      inline="true" transitive="false" type="jar" sync="true"
+      inline="true" transitive="false" type="jar" sync="true" symlink="${ivy.symlink}"
       pattern="${build.dir}/javacc/bin/lib/[artifact].[ext]"/>
   </target>
 

Modified: lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1642718&r1=1642717&r2=1642718&view=diff
==============================================================================
--- lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (original)
+++ lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java Mon Dec  1 17:25:39 2014
@@ -90,6 +90,7 @@ public class DistributedQueue {
     TreeMap<Long,String> orderedChildren = new TreeMap<>();
 
     List<String> childNames = zookeeper.getChildren(dir, watcher, true);
+    stats.setQueueLength(childNames.size());
     for (String childName : childNames) {
       try {
         // Check format
@@ -117,6 +118,7 @@ public class DistributedQueue {
       throws KeeperException, InterruptedException {
 
     List<String> childNames = zookeeper.getChildren(dir, null, true);
+    stats.setQueueLength(childNames.size());
     for (String childName : childNames) {
       if (childName != null) {
         try {

Modified: lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1642718&r1=1642717&r2=1642718&view=diff
==============================================================================
--- lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Mon Dec  1 17:25:39 2014
@@ -1,10 +1,7 @@
 package org.apache.solr.cloud;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.Replica;
@@ -28,6 +25,7 @@ import org.apache.zookeeper.KeeperExcept
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -136,13 +134,12 @@ class ShardLeaderElectionContextBase ext
     try {
       RetryUtil.retryOnThrowable(NodeExistsException.class, 15000, 1000,
           new RetryCmd() {
-            
             @Override
             public void execute() throws Throwable {
-              zkClient.makePath(leaderPath, ZkStateReader.toJSON(leaderProps),
-                  CreateMode.EPHEMERAL, true);
+              zkClient.makePath(leaderPath, ZkStateReader.toJSON(leaderProps), CreateMode.EPHEMERAL, true);
             }
-          });
+          }
+      );
     } catch (Throwable t) {
       if (t instanceof OutOfMemoryError) {
         throw (OutOfMemoryError) t;
@@ -152,7 +149,7 @@ class ShardLeaderElectionContextBase ext
     
     assert shardId != null;
     ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION,
-        Overseer.OverseerAction.LEADER.toLower(), ZkStateReader.SHARD_ID_PROP, shardId,
+        OverseerAction.LEADER.toLower(), ZkStateReader.SHARD_ID_PROP, shardId,
         ZkStateReader.COLLECTION_PROP, collection, ZkStateReader.BASE_URL_PROP,
         leaderProps.getProperties().get(ZkStateReader.BASE_URL_PROP),
         ZkStateReader.CORE_NAME_PROP,
@@ -205,7 +202,7 @@ final class ShardLeaderElectionContext e
     String coreName = leaderProps.getStr(ZkStateReader.CORE_NAME_PROP);
     
     // clear the leader in clusterstate
-    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.LEADER.toLower(),
+    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
         ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP,
         collection);
     Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
@@ -407,13 +404,19 @@ final class ShardLeaderElectionContext e
     
     Slice slices = zkController.getClusterState().getSlice(collection, shardId);
     int cnt = 0;
-    while (true && !isClosed && !cc.isShutDown()) {
+    while (!isClosed && !cc.isShutDown()) {
       // wait for everyone to be up
       if (slices != null) {
         int found = 0;
         try {
           found = zkClient.getChildren(shardsElectZkPath, null, true).size();
         } catch (KeeperException e) {
+          if (e instanceof KeeperException.SessionExpiredException) {
+            // if the session has expired, then another election will be launched, so
+            // quit here
+            throw new SolrException(ErrorCode.SERVER_ERROR,
+                                    "ZK session expired - cancelling election for " + collection + " " + shardId);
+          }
           SolrException.log(log,
               "Error checking for the number of election participants", e);
         }

Modified: lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1642718&r1=1642717&r2=1642718&view=diff
==============================================================================
--- lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/Overseer.java Mon Dec  1 17:25:39 2014
@@ -17,46 +17,41 @@ package org.apache.solr.cloud;
  * the License.
  */
 
-import static java.util.Collections.singletonMap;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARD_UNIQUE;
-import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_ACTIVE_NODES;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_PROP_PREFIX;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.collect.ImmutableSet;
 import org.apache.commons.lang.StringUtils;
 import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.cloud.overseer.ClusterStateMutator;
+import org.apache.solr.cloud.overseer.CollectionMutator;
+import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.cloud.overseer.ReplicaMutator;
+import org.apache.solr.cloud.overseer.SliceMutator;
+import org.apache.solr.cloud.overseer.ZkStateWriter;
+import org.apache.solr.cloud.overseer.ZkWriteCommand;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.DocRouter;
-import org.apache.solr.common.cloud.ImplicitDocRouter;
 import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.RoutingRule;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CollectionParams;
@@ -73,7 +68,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Cluster leader. Responsible node assignments, cluster state file?
+ * Cluster leader. Responsible for processing state updates, node assignments, creating/deleting
+ * collections, shards, replicas and setting various properties.
  */
 public class Overseer implements Closeable {
   public static final String QUEUE_OPERATION = "operation";
@@ -90,53 +86,12 @@ public class Overseer implements Closeab
   @Deprecated
   public static final String REMOVESHARD = "removeshard";
 
-  /**
-   * Enum of actions supported by the overseer only.
-   *
-   * There are other actions supported which are public and defined
-   * in {@link org.apache.solr.common.params.CollectionParams.CollectionAction}
-   */
-  public static enum OverseerAction {
-    LEADER,
-    DELETECORE,
-    ADDROUTINGRULE,
-    REMOVEROUTINGRULE,
-    UPDATESHARDSTATE,
-    STATE,
-    QUIT;
-
-    public static OverseerAction get(String p) {
-      if (p != null) {
-        try {
-          return OverseerAction.valueOf(p.toUpperCase(Locale.ROOT));
-        } catch (Exception ex) {
-        }
-      }
-      return null;
-    }
-
-    public boolean isEqual(String s) {
-      return s != null && toString().equals(s.toUpperCase(Locale.ROOT));
-    }
-
-    public String toLower() {
-      return toString().toLowerCase(Locale.ROOT);
-    }
-  }
-
-
   public static final int STATE_UPDATE_DELAY = 1500;  // delay between cloud state updates
 
   private static Logger log = LoggerFactory.getLogger(Overseer.class);
 
   static enum LeaderStatus {DONT_KNOW, NO, YES}
 
-  public static final String preferredLeaderProp = COLL_PROP_PREFIX + "preferredleader";
-
-  public static final Set<String> sliceUniqueBooleanProperties = ImmutableSet.of(preferredLeaderProp);
-
-  private long lastUpdatedTime = 0;
-
   private class ClusterStateUpdater implements Runnable, Closeable {
     
     private final ZkStateReader reader;
@@ -159,10 +114,6 @@ public class Overseer implements Closeab
     private Map clusterProps;
     private boolean isClosed = false;
 
-    private final Map<String, Object> updateNodes = new LinkedHashMap<>();
-    private boolean isClusterStateModified = false;
-
-
     public ClusterStateUpdater(final ZkStateReader reader, final String myId, Stats zkStats) {
       this.zkClient = reader.getZkClient();
       this.zkStats = zkStats;
@@ -205,7 +156,9 @@ public class Overseer implements Closeab
               reader.updateClusterState(true);
               ClusterState clusterState = reader.getClusterState();
               log.info("Replaying operations from work queue.");
-              
+
+              ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats);
+
               while (head != null) {
                 isLeader = amILeader();
                 if (LeaderStatus.NO == isLeader) {
@@ -213,33 +166,18 @@ public class Overseer implements Closeab
                 }
                 else if (LeaderStatus.YES == isLeader) {
                   final ZkNodeProps message = ZkNodeProps.load(head);
-                  final String operation = message.getStr(QUEUE_OPERATION);
-                  final TimerContext timerContext = stats.time(operation);
-                  try {
-                    clusterState = processMessage(clusterState, message, operation);
-                    stats.success(operation);
-                  } catch (Exception e) {
-                    // generally there is nothing we can do - in most cases, we have
-                    // an issue that will fail again on retry or we cannot communicate with     a
-                    // ZooKeeper in which case another Overseer should take over
-                    // TODO: if ordering for the message is not important, we could
-                    // track retries and put it back on the end of the queue
-                    log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e);
-                    stats.error(operation);
-                  } finally {
-                    timerContext.stop();
-                  }
-                  updateZkStates(clusterState);
-                  
+                  log.info("processMessage: queueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message);
+                  clusterState = processQueueItem(message, clusterState, zkStateWriter);
                   workQueue.poll(); // poll-ing removes the element we got by peek-ing
                 }
                 else {
                   log.info("am_i_leader unclear {}", isLeader);                  
                   // re-peek below in case our 'head' value is out-of-date by now
                 }
-                
                 head = workQueue.peek();
               }
+              // force flush at the end of the loop
+              clusterState = zkStateWriter.writePendingUpdates();
             }
           } catch (KeeperException e) {
             if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
@@ -259,8 +197,10 @@ public class Overseer implements Closeab
       }
       
       log.info("Starting to work on the main queue");
-      int lastStateFormat = -1; // sentinel
       try {
+        ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats);
+        ClusterState clusterState = null;
+        boolean refreshClusterState = true; // let's refresh in the first iteration
         while (!this.isClosed) {
           isLeader = amILeader();
           if (LeaderStatus.NO == isLeader) {
@@ -289,56 +229,46 @@ public class Overseer implements Closeab
           }
           synchronized (reader.getUpdateLock()) {
             try {
-              reader.updateClusterState(true);
-              ClusterState clusterState = reader.getClusterState();
+              if (refreshClusterState) {
+                reader.updateClusterState(true);
+                clusterState = reader.getClusterState();
+                refreshClusterState = false;
+
+                // if there were any errors while processing
+                // the state queue, items would have been left in the
+                // work queue so let's process those first
+                byte[] data = workQueue.peek();
+                boolean hadWorkItems = data != null;
+                while (data != null)  {
+                  final ZkNodeProps message = ZkNodeProps.load(data);
+                  log.info("processMessage: queueSize: {}, message = {}", workQueue.getStats().getQueueLength(), message);
+                  clusterState = processQueueItem(message, clusterState, zkStateWriter);
+                  workQueue.poll(); // poll-ing removes the element we got by peek-ing
+                  data = workQueue.peek();
+                }
+                // force flush at the end of the loop
+                if (hadWorkItems) {
+                  clusterState = zkStateWriter.writePendingUpdates();
+                }
+              }
 
               while (head != null) {
                 final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
-                final String operation = message.getStr(QUEUE_OPERATION);
-
-                // we batch updates for the main cluster state together (stateFormat=1)
-                // but if we encounter a message for a collection with a stateFormat different than the last
-                // then we stop batching at that point
-                String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
-                if (collection == null) collection = message.getStr("name");
-                if (collection != null) {
-                  DocCollection docCollection = clusterState.getCollectionOrNull(collection);
-                  if (lastStateFormat != -1 && docCollection != null && docCollection.getStateFormat() != lastStateFormat)  {
-                    lastStateFormat = docCollection.getStateFormat();
-                    break;
-                  }
-                  if (docCollection != null)  {
-                    lastStateFormat = docCollection.getStateFormat();
-                  }
-                }
-
-                final TimerContext timerContext = stats.time(operation);
-                try {
-                  clusterState = processMessage(clusterState, message, operation);
-                  stats.success(operation);
-                } catch (Exception e) {
-                  // generally there is nothing we can do - in most cases, we have
-                  // an issue that will fail again on retry or we cannot communicate with
-                  // ZooKeeper in which case another Overseer should take over
-                  // TODO: if ordering for the message is not important, we could
-                  // track retries and put it back on the end of the queue
-                  log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e);
-                  stats.error(operation);
-                } finally {
-                  timerContext.stop();
-                }
+                log.info("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion());
+                clusterState = processQueueItem(message, clusterState, zkStateWriter);
                 workQueue.offer(head.getBytes());
 
                 stateUpdateQueue.poll();
 
-                if (isClosed || System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS)) break;
-                if(!updateNodes.isEmpty()) break;
+                if (isClosed) break;
                 // if an event comes in the next 100ms batch it together
                 head = stateUpdateQueue.peek(100);
               }
-              updateZkStates(clusterState);
+              // we should force write all pending updates because the next iteration might sleep until there
+              // are more items in the main queue
+              clusterState = zkStateWriter.writePendingUpdates();
               // clean work queue
-              while (workQueue.poll() != null) ;
+              while (workQueue.poll() != null);
 
             } catch (KeeperException e) {
               if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
@@ -346,12 +276,13 @@ public class Overseer implements Closeab
                 return;
               }
               log.error("Exception in Overseer main queue loop", e);
+              refreshClusterState = true; // it might have been a bad version error
             } catch (InterruptedException e) {
               Thread.currentThread().interrupt();
               return;
-
             } catch (Exception e) {
               log.error("Exception in Overseer main queue loop", e);
+              refreshClusterState = true; // it might have been a bad version error
             }
           }
 
@@ -368,57 +299,28 @@ public class Overseer implements Closeab
       }
     }
 
-    private void updateZkStates(ClusterState clusterState) throws KeeperException, InterruptedException {
-      TimerContext timerContext = stats.time("update_state");
-      boolean success = false;
+    private ClusterState processQueueItem(ZkNodeProps message, ClusterState clusterState, ZkStateWriter zkStateWriter) throws KeeperException, InterruptedException {
+      final String operation = message.getStr(QUEUE_OPERATION);
+      ZkWriteCommand zkWriteCommand = null;
+      final TimerContext timerContext = stats.time(operation);
       try {
-        if (!updateNodes.isEmpty()) {
-          for (Entry<String,Object> e : updateNodes.entrySet()) {
-            if (e.getValue() == null) {
-              if (zkClient.exists(e.getKey(), true)) zkClient.delete(e.getKey(), 0, true);
-            } else {
-              byte[] data = ZkStateReader.toJSON(e.getValue());
-              if (zkClient.exists(e.getKey(), true)) {
-                log.info("going to update_collection {}", e.getKey());
-                zkClient.setData(e.getKey(), data, true);
-              } else {
-                log.info("going to create_collection {}", e.getKey());
-                String parentPath = e.getKey().substring(0, e.getKey().lastIndexOf('/'));
-                if (!zkClient.exists(parentPath, true)) {
-                  // if the /collections/collection_name path doesn't exist then it means that
-                  // 1) the user invoked a DELETE collection API and the OverseerCollectionProcessor has deleted
-                  // this zk path.
-                  // 2) these are most likely old "state" messages which are only being processed now because
-                  // if they were new "state" messages then in legacy mode, a new collection would have been
-                  // created with stateFormat = 1 (which is the default state format)
-                  // 3) these can't be new "state" messages created for a new collection because
-                  // otherwise the OverseerCollectionProcessor would have already created this path
-                  // as part of the create collection API call -- which is the only way in which a collection
-                  // with stateFormat > 1 can possibly be created
-                  continue;
-                }
-                zkClient.create(e.getKey(), data, CreateMode.PERSISTENT, true);
-              }
-            }
-          }
-          updateNodes.clear();
-        }
-        
-        if (isClusterStateModified) {
-          lastUpdatedTime = System.nanoTime();
-          zkClient.setData(ZkStateReader.CLUSTER_STATE,
-              ZkStateReader.toJSON(clusterState), true);
-          isClusterStateModified = false;
-        }
-        success = true;
+        zkWriteCommand = processMessage(clusterState, message, operation);
+        stats.success(operation);
+      } catch (Exception e) {
+        // generally there is nothing we can do - in most cases, we have
+        // an issue that will fail again on retry or we cannot communicate with     a
+        // ZooKeeper in which case another Overseer should take over
+        // TODO: if ordering for the message is not important, we could
+        // track retries and put it back on the end of the queue
+        log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e);
+        stats.error(operation);
       } finally {
         timerContext.stop();
-        if (success)  {
-          stats.success("update_state");
-        } else  {
-          stats.error("update_state");
-        }
       }
+      if (zkWriteCommand != null) {
+        clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand);
+      }
+      return clusterState;
     }
 
     private void checkIfIamStillLeader() {
@@ -460,41 +362,32 @@ public class Overseer implements Closeab
       }
     }
 
-    private ClusterState processMessage(ClusterState clusterState,
+    private ZkWriteCommand processMessage(ClusterState clusterState,
         final ZkNodeProps message, final String operation) {
-
       CollectionParams.CollectionAction collectionAction = CollectionParams.CollectionAction.get(operation);
       if (collectionAction != null) {
         switch (collectionAction) {
           case CREATE:
-            clusterState = buildCollection(clusterState, message);
-            break;
+            return new ClusterStateMutator(getZkStateReader()).createCollection(clusterState, message);
           case DELETE:
-            clusterState = removeCollection(clusterState, message);
-            break;
+            return new ClusterStateMutator(getZkStateReader()).deleteCollection(clusterState, message);
           case CREATESHARD:
-            clusterState = createShard(clusterState, message);
-            break;
+            return new CollectionMutator(getZkStateReader()).createShard(clusterState, message);
           case DELETESHARD:
-            clusterState = removeShard(clusterState, message);
-            break;
+            return new CollectionMutator(getZkStateReader()).deleteShard(clusterState, message);
           case ADDREPLICA:
-            clusterState = createReplica(clusterState, message);
-            break;
+            return new SliceMutator(getZkStateReader()).addReplica(clusterState, message);
           case CLUSTERPROP:
             handleProp(message);
-            break;
           case ADDREPLICAPROP:
-            clusterState = addReplicaProp(clusterState, message);
-            break;
+            return new ReplicaMutator(getZkStateReader()).addReplicaProperty(clusterState, message);
           case DELETEREPLICAPROP:
-            clusterState = deleteReplicaProp(clusterState, message);
-            break;
+            return new ReplicaMutator(getZkStateReader()).removeReplicaProperty(clusterState, message);
           case BALANCESHARDUNIQUE:
-            ExclusiveSliceProperty dProp = new ExclusiveSliceProperty(this, clusterState, message);
+            ExclusiveSliceProperty dProp = new ExclusiveSliceProperty(clusterState, message);
             if (dProp.balanceProperty()) {
               String collName = message.getStr(ZkStateReader.COLLECTION_PROP);
-              clusterState = newState(clusterState, singletonMap(collName, dProp.getDocCollection()));
+              return new ZkWriteCommand(collName, dProp.getDocCollection());
             }
             break;
           default:
@@ -506,27 +399,17 @@ public class Overseer implements Closeab
         if (overseerAction != null) {
           switch (overseerAction) {
             case STATE:
-              if (isLegacy(clusterProps)) {
-                clusterState = updateState(clusterState, message);
-              } else {
-                clusterState = updateStateNew(clusterState, message);
-              }
-              break;
+              return new ReplicaMutator(getZkStateReader()).setState(clusterState, message);
             case LEADER:
-              clusterState = setShardLeader(clusterState, message);
-              break;
+              return new SliceMutator(getZkStateReader()).setShardLeader(clusterState, message);
             case DELETECORE:
-              clusterState = removeCore(clusterState, message);
-              break;
+              return new SliceMutator(getZkStateReader()).removeReplica(clusterState, message);
             case ADDROUTINGRULE:
-              clusterState = addRoutingRule(clusterState, message);
-              break;
+              return new SliceMutator(getZkStateReader()).addRoutingRule(clusterState, message);
             case REMOVEROUTINGRULE:
-              clusterState = removeRoutingRule(clusterState, message);
-              break;
+              return new SliceMutator(getZkStateReader()).removeRoutingRule(clusterState, message);
             case UPDATESHARDSTATE:
-              clusterState = updateShardState(clusterState, message);
-              break;
+              return new SliceMutator(getZkStateReader()).updateShardState(clusterState, message);
             case QUIT:
               if (myId.equals(message.get("id"))) {
                 log.info("Quit command received {}", LeaderElector.getNodeName(myId));
@@ -545,14 +428,11 @@ public class Overseer implements Closeab
           // specified in CollectionAction. See SOLR-6115. Remove this in 5.0
           switch (operation) {
             case OverseerCollectionProcessor.CREATECOLLECTION:
-              clusterState = buildCollection(clusterState, message);
-              break;
+              return new ClusterStateMutator(getZkStateReader()).createCollection(clusterState, message);
             case REMOVECOLLECTION:
-              clusterState = removeCollection(clusterState, message);
-              break;
+              return new ClusterStateMutator(getZkStateReader()).deleteCollection(clusterState, message);
             case REMOVESHARD:
-              clusterState = removeShard(clusterState, message);
-              break;
+              return new CollectionMutator(getZkStateReader()).deleteShard(clusterState, message);
             default:
               throw new RuntimeException("unknown operation:" + operation
                   + " contents:" + message.getProperties());
@@ -560,125 +440,7 @@ public class Overseer implements Closeab
         }
       }
 
-      return clusterState;
-    }
-
-    private ClusterState addReplicaProp(ClusterState clusterState, ZkNodeProps message) {
-
-      if (checkKeyExistence(message, ZkStateReader.COLLECTION_PROP) == false ||
-          checkKeyExistence(message, ZkStateReader.SHARD_ID_PROP) == false ||
-          checkKeyExistence(message, ZkStateReader.REPLICA_PROP) == false ||
-          checkKeyExistence(message, ZkStateReader.PROPERTY_PROP) == false ||
-          checkKeyExistence(message, ZkStateReader.PROPERTY_VALUE_PROP) == false) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-            "Overseer SETREPLICAPROPERTY requires " +
-                ZkStateReader.COLLECTION_PROP + " and " + ZkStateReader.SHARD_ID_PROP + " and " +
-                ZkStateReader.REPLICA_PROP + " and " + ZkStateReader.PROPERTY_PROP + " and " +
-                ZkStateReader.PROPERTY_VALUE_PROP + " no action taken.");
-      }
-
-      String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
-      String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
-      String replicaName = message.getStr(ZkStateReader.REPLICA_PROP);
-      String property = message.getStr(ZkStateReader.PROPERTY_PROP).toLowerCase(Locale.ROOT);
-      if (StringUtils.startsWith(property, COLL_PROP_PREFIX) == false) {
-        property = OverseerCollectionProcessor.COLL_PROP_PREFIX + property;
-      }
-      property = property.toLowerCase(Locale.ROOT);
-      String propVal = message.getStr(ZkStateReader.PROPERTY_VALUE_PROP);
-      String shardUnique = message.getStr(OverseerCollectionProcessor.SHARD_UNIQUE);
-
-      boolean isUnique = false;
-
-      if (sliceUniqueBooleanProperties.contains(property)) {
-        if (StringUtils.isNotBlank(shardUnique) && Boolean.parseBoolean(shardUnique) == false) {
-          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Overseer SETREPLICAPROPERTY for " +
-              property + " cannot have " + OverseerCollectionProcessor.SHARD_UNIQUE + " set to anything other than" +
-              "'true'. No action taken");
-        }
-        isUnique = true;
-      } else {
-        isUnique = Boolean.parseBoolean(shardUnique);
-      }
-
-      Replica replica = clusterState.getReplica(collectionName, replicaName);
-
-      if (replica == null) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not find collection/slice/replica " +
-            collectionName + "/" + sliceName + "/" + replicaName + " no action taken.");
-      }
-      log.info("Setting property " + property + " with value: " + propVal +
-          " for collection: " + collectionName + ". Full message: " + message);
-      if (StringUtils.equalsIgnoreCase(replica.getStr(property), propVal)) return clusterState; // already the value we're going to set
-
-      // OK, there's no way we won't change the cluster state now
-      Map<String,Replica> replicas = clusterState.getSlice(collectionName, sliceName).getReplicasCopy();
-      if (isUnique == false) {
-        replicas.get(replicaName).getProperties().put(property, propVal);
-      } else { // Set prop for this replica, but remove it for all others.
-        for (Replica rep : replicas.values()) {
-          if (rep.getName().equalsIgnoreCase(replicaName)) {
-            rep.getProperties().put(property, propVal);
-          } else {
-            rep.getProperties().remove(property);
-          }
-        }
-      }
-      Slice newSlice = new Slice(sliceName, replicas, clusterState.getSlice(collectionName, sliceName).shallowCopy());
-      return updateSlice(clusterState, collectionName, newSlice);
-    }
-
-    private ClusterState deleteReplicaProp(ClusterState clusterState, ZkNodeProps message) {
-
-      if (checkKeyExistence(message, ZkStateReader.COLLECTION_PROP) == false ||
-          checkKeyExistence(message, ZkStateReader.SHARD_ID_PROP) == false ||
-          checkKeyExistence(message, ZkStateReader.REPLICA_PROP) == false ||
-          checkKeyExistence(message, ZkStateReader.PROPERTY_PROP) == false) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-            "Overseer DELETEREPLICAPROPERTY requires " +
-                ZkStateReader.COLLECTION_PROP + " and " + ZkStateReader.SHARD_ID_PROP + " and " +
-                ZkStateReader.REPLICA_PROP + " and " + ZkStateReader.PROPERTY_PROP + " no action taken.");
-      }
-      String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
-      String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
-      String replicaName = message.getStr(ZkStateReader.REPLICA_PROP);
-      String property = message.getStr(ZkStateReader.PROPERTY_PROP).toLowerCase(Locale.ROOT);
-      if (StringUtils.startsWith(property, COLL_PROP_PREFIX) == false) {
-        property = OverseerCollectionProcessor.COLL_PROP_PREFIX + property;
-      }
-
-      Replica replica = clusterState.getReplica(collectionName, replicaName);
-
-      if (replica == null) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not find collection/slice/replica " +
-            collectionName + "/" + sliceName + "/" + replicaName + " no action taken.");
-      }
-
-      log.info("Deleting property " + property + " for collection: " + collectionName +
-          " slice " + sliceName + " replica " + replicaName + ". Full message: " + message);
-      String curProp = replica.getStr(property);
-      if (curProp == null) return clusterState; // not there anyway, nothing to do.
-
-      Map<String, Replica> replicas = clusterState.getSlice(collectionName, sliceName).getReplicasCopy();
-      replica = replicas.get(replicaName);
-      replica.getProperties().remove(property);
-      Slice newSlice = new Slice(sliceName, replicas, clusterState.getSlice(collectionName, sliceName).shallowCopy());
-      return updateSlice(clusterState, collectionName, newSlice);
-    }
-
-    private ClusterState setShardLeader(ClusterState clusterState, ZkNodeProps message) {
-      StringBuilder sb = new StringBuilder();
-      String baseUrl = message.getStr(ZkStateReader.BASE_URL_PROP);
-      String coreName = message.getStr(ZkStateReader.CORE_NAME_PROP);
-      sb.append(baseUrl);
-      if (baseUrl != null && !baseUrl.endsWith("/")) sb.append("/");
-      sb.append(coreName == null ? "" : coreName);
-      if (!(sb.substring(sb.length() - 1).equals("/"))) sb.append("/");
-      clusterState = setShardLeader(clusterState,
-          message.getStr(ZkStateReader.COLLECTION_PROP),
-          message.getStr(ZkStateReader.SHARD_ID_PROP),
-          sb.length() > 0 ? sb.toString() : null);
-      return clusterState;
+      return ZkStateWriter.NO_OP;
     }
 
     private void handleProp(ZkNodeProps message)  {
@@ -700,179 +462,6 @@ public class Overseer implements Closeab
       }
     }
 
-    private ClusterState createReplica(ClusterState clusterState, ZkNodeProps message) {
-      log.info("createReplica() {} ", message);
-      String coll = message.getStr(ZkStateReader.COLLECTION_PROP);
-      if (!checkCollectionKeyExistence(message)) return clusterState;
-      String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
-      DocCollection collection = clusterState.getCollection(coll);
-      Slice sl = collection.getSlice(slice);
-      if(sl == null){
-        log.error("Invalid Collection/Slice {}/{} ",coll,slice);
-        return clusterState;
-      }
-
-      String coreNodeName = Assign.assignNode(coll, clusterState);
-      Replica replica = new Replica(coreNodeName,
-          makeMap(
-          ZkStateReader.CORE_NAME_PROP, message.getStr(ZkStateReader.CORE_NAME_PROP),
-          ZkStateReader.BASE_URL_PROP,message.getStr(ZkStateReader.BASE_URL_PROP),
-          ZkStateReader.STATE_PROP,message.getStr(ZkStateReader.STATE_PROP)));
-      sl.getReplicasMap().put(coreNodeName, replica);
-      return newState(clusterState, singletonMap(coll, collection));
-    }
-
-    private ClusterState buildCollection(ClusterState clusterState, ZkNodeProps message) {
-      String collection = message.getStr("name");
-      log.info("building a new collection: " + collection);
-      if(clusterState.hasCollection(collection) ){
-        log.warn("Collection {} already exists. exit" ,collection);
-        return clusterState;
-      }
-
-      ArrayList<String> shardNames = new ArrayList<>();
-
-      if(ImplicitDocRouter.NAME.equals( message.getStr("router.name",DocRouter.DEFAULT_NAME))){
-        getShardNames(shardNames,message.getStr("shards",DocRouter.DEFAULT_NAME));
-      } else {
-        int numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, -1);
-        if(numShards<1) throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,"numShards is a required parameter for 'compositeId' router");
-        getShardNames(numShards, shardNames);
-      }
-
-      return createCollection(clusterState, collection, shardNames, message);
-    }
-
-    private ClusterState updateShardState(ClusterState clusterState, ZkNodeProps message) {
-      String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
-      if (!checkCollectionKeyExistence(message)) return clusterState;
-      log.info("Update shard state invoked for collection: " + collection + " with message: " + message);
-      for (String key : message.keySet()) {
-        if (ZkStateReader.COLLECTION_PROP.equals(key)) continue;
-        if (QUEUE_OPERATION.equals(key)) continue;
-
-        Slice slice = clusterState.getSlice(collection, key);
-        if (slice == null)  {
-          throw new RuntimeException("Overseer.updateShardState unknown collection: " + collection + " slice: " + key);
-        }
-        log.info("Update shard state " + key + " to " + message.getStr(key));
-        Map<String, Object> props = slice.shallowCopy();
-        if (Slice.RECOVERY.equals(props.get(Slice.STATE)) && Slice.ACTIVE.equals(message.getStr(key))) {
-          props.remove(Slice.PARENT);
-        }
-        props.put(Slice.STATE, message.getStr(key));
-        Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props);
-        clusterState = updateSlice(clusterState, collection, newSlice);
-      }
-
-      return clusterState;
-    }
-
-    private ClusterState addRoutingRule(ClusterState clusterState, ZkNodeProps message) {
-      String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
-      if (!checkCollectionKeyExistence(message)) return clusterState;
-      String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
-      String routeKey = message.getStr("routeKey");
-      String range = message.getStr("range");
-      String targetCollection = message.getStr("targetCollection");
-      String targetShard = message.getStr("targetShard");
-      String expireAt = message.getStr("expireAt");
-
-      Slice slice = clusterState.getSlice(collection, shard);
-      if (slice == null)  {
-        throw new RuntimeException("Overseer.addRoutingRule unknown collection: " + collection + " slice:" + shard);
-      }
-
-      Map<String, RoutingRule> routingRules = slice.getRoutingRules();
-      if (routingRules == null)
-        routingRules = new HashMap<>();
-      RoutingRule r = routingRules.get(routeKey);
-      if (r == null) {
-        Map<String, Object> map = new HashMap<>();
-        map.put("routeRanges", range);
-        map.put("targetCollection", targetCollection);
-        map.put("expireAt", expireAt);
-        RoutingRule rule = new RoutingRule(routeKey, map);
-        routingRules.put(routeKey, rule);
-      } else  {
-        // add this range
-        Map<String, Object> map = r.shallowCopy();
-        map.put("routeRanges", map.get("routeRanges") + "," + range);
-        map.put("expireAt", expireAt);
-        routingRules.put(routeKey, new RoutingRule(routeKey, map));
-      }
-
-      Map<String, Object> props = slice.shallowCopy();
-      props.put("routingRules", routingRules);
-
-      Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props);
-      clusterState = updateSlice(clusterState, collection, newSlice);
-      return clusterState;
-    }
-
-    private boolean checkCollectionKeyExistence(ZkNodeProps message) {
-      return checkKeyExistence(message, ZkStateReader.COLLECTION_PROP);
-    }
-    
-    private boolean checkKeyExistence(ZkNodeProps message, String key) {
-      String value = message.getStr(key);
-      if (value == null || value.trim().length() == 0) {
-        log.error("Skipping invalid Overseer message because it has no " + key + " specified: " + message);
-        return false;
-      }
-      return true;
-    }
-
-    private ClusterState removeRoutingRule(ClusterState clusterState, ZkNodeProps message) {
-      String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
-      if (!checkCollectionKeyExistence(message)) return clusterState;
-      String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
-      String routeKeyStr = message.getStr("routeKey");
-
-      log.info("Overseer.removeRoutingRule invoked for collection: " + collection
-          + " shard: " + shard + " routeKey: " + routeKeyStr);
-
-      Slice slice = clusterState.getSlice(collection, shard);
-      if (slice == null)  {
-        log.warn("Unknown collection: " + collection + " shard: " + shard);
-        return clusterState;
-      }
-      Map<String, RoutingRule> routingRules = slice.getRoutingRules();
-      if (routingRules != null) {
-        routingRules.remove(routeKeyStr); // no rules left
-        Map<String, Object> props = slice.shallowCopy();
-        props.put("routingRules", routingRules);
-        Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props);
-        clusterState = updateSlice(clusterState, collection, newSlice);
-      }
-
-      return clusterState;
-    }
-
-    private ClusterState createShard(ClusterState clusterState, ZkNodeProps message) {
-      String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
-      if (!checkCollectionKeyExistence(message)) return clusterState;
-      String shardId = message.getStr(ZkStateReader.SHARD_ID_PROP);
-      Slice slice = clusterState.getSlice(collection, shardId);
-      if (slice == null)  {
-        Map<String, Replica> replicas = Collections.EMPTY_MAP;
-        Map<String, Object> sliceProps = new HashMap<>();
-        String shardRange = message.getStr(ZkStateReader.SHARD_RANGE_PROP);
-        String shardState = message.getStr(ZkStateReader.SHARD_STATE_PROP);
-        String shardParent = message.getStr(ZkStateReader.SHARD_PARENT_PROP);
-        sliceProps.put(Slice.RANGE, shardRange);
-        sliceProps.put(Slice.STATE, shardState);
-        if (shardParent != null)  {
-          sliceProps.put(Slice.PARENT, shardParent);
-        }
-        slice = new Slice(shardId, replicas, sliceProps);
-        clusterState = updateSlice(clusterState, collection, slice);
-      } else  {
-        log.error("Unable to create Shard: " + shardId + " because it already exists in collection: " + collection);
-      }
-      return clusterState;
-    }
-
     private LeaderStatus amILeader() {
       TimerContext timerContext = stats.time("am_i_leader");
       boolean success = true;
@@ -907,525 +496,7 @@ public class Overseer implements Closeab
       return LeaderStatus.NO;
     }
 
-    private ClusterState updateStateNew(ClusterState clusterState, ZkNodeProps message) {
-      String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
-      if (!checkCollectionKeyExistence(message)) return clusterState;
-      String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
-
-      if(collection==null || sliceName == null){
-        log.error("Invalid collection and slice {}", message);
-        return clusterState;
-      }
-      Slice slice = clusterState.getSlice(collection, sliceName);
-      if(slice == null){
-        log.error("No such slice exists {}", message);
-        return clusterState;
-      }
-
-      return updateState(clusterState, message);
-    }
-    
-      /**
-       * Try to assign core to the cluster. 
-       */
-      private ClusterState updateState(ClusterState clusterState, final ZkNodeProps message) {
-        final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
-        if (!checkCollectionKeyExistence(message)) return clusterState;
-        Integer numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, null);
-        log.info("Update state numShards={} message={}", numShards, message);
-
-        List<String> shardNames  = new ArrayList<>();
-
-        //collection does not yet exist, create placeholders if num shards is specified
-        boolean collectionExists = clusterState.hasCollection(collection);
-        if (!collectionExists && numShards!=null) {
-          getShardNames(numShards, shardNames);
-          clusterState = createCollection(clusterState, collection, shardNames, message);
-        }
-        String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
-
-        String coreNodeName = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
-        if (coreNodeName == null) {
-          coreNodeName = getAssignedCoreNodeName(clusterState, message);
-          if (coreNodeName != null) {
-            log.info("node=" + coreNodeName + " is already registered");
-          } else {
-            // if coreNodeName is null, auto assign one
-            coreNodeName = Assign.assignNode(collection, clusterState);
-          }
-          message.getProperties().put(ZkStateReader.CORE_NODE_NAME_PROP,
-              coreNodeName);
-        }
-
-        // use the provided non null shardId
-        if (sliceName == null) {
-          //get shardId from ClusterState
-          sliceName = getAssignedId(clusterState, coreNodeName, message);
-          if (sliceName != null) {
-            log.info("shard=" + sliceName + " is already registered");
-          }
-        }
-        if(sliceName == null) {
-          //request new shardId 
-          if (collectionExists) {
-            // use existing numShards
-            numShards = clusterState.getCollection(collection).getSlices().size();
-            log.info("Collection already exists with " + ZkStateReader.NUM_SHARDS_PROP + "=" + numShards);
-          }
-          sliceName = Assign.assignShard(collection, clusterState, numShards);
-          log.info("Assigning new node to shard shard=" + sliceName);
-        }
-
-        Slice slice = clusterState.getSlice(collection, sliceName);
-
-        Map<String,Object> replicaProps = new LinkedHashMap<>();
-
-        replicaProps.putAll(message.getProperties());
-        // System.out.println("########## UPDATE MESSAGE: " + JSONUtil.toJSON(message));
-        if (slice != null) {
-          Replica oldReplica = slice.getReplicasMap().get(coreNodeName);
-          if (oldReplica != null) {
-            if (oldReplica.containsKey(ZkStateReader.LEADER_PROP)) {
-              replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP));
-            }
-            // Move custom props over.
-            for (Map.Entry<String, Object> ent : oldReplica.getProperties().entrySet()) {
-              if (ent.getKey().startsWith(COLL_PROP_PREFIX)) {
-                replicaProps.put(ent.getKey(), ent.getValue());
-              }
-            }
-          }
-        }
-
-        // we don't put these in the clusterstate
-          replicaProps.remove(ZkStateReader.NUM_SHARDS_PROP);
-          replicaProps.remove(ZkStateReader.CORE_NODE_NAME_PROP);
-          replicaProps.remove(ZkStateReader.SHARD_ID_PROP);
-          replicaProps.remove(ZkStateReader.COLLECTION_PROP);
-          replicaProps.remove(QUEUE_OPERATION);
-
-          // remove any props with null values
-          Set<Entry<String,Object>> entrySet = replicaProps.entrySet();
-          List<String> removeKeys = new ArrayList<>();
-          for (Entry<String,Object> entry : entrySet) {
-            if (entry.getValue() == null) {
-              removeKeys.add(entry.getKey());
-            }
-          }
-          for (String removeKey : removeKeys) {
-            replicaProps.remove(removeKey);
-          }
-          replicaProps.remove(ZkStateReader.CORE_NODE_NAME_PROP);
-          // remove shard specific properties
-          String shardRange = (String) replicaProps.remove(ZkStateReader.SHARD_RANGE_PROP);
-          String shardState = (String) replicaProps.remove(ZkStateReader.SHARD_STATE_PROP);
-          String shardParent = (String) replicaProps.remove(ZkStateReader.SHARD_PARENT_PROP);
-
-
-          Replica replica = new Replica(coreNodeName, replicaProps);
-
-         // TODO: where do we get slice properties in this message?  or should there be a separate create-slice message if we want that?
-
-          Map<String,Object> sliceProps = null;
-          Map<String,Replica> replicas;
-
-          if (slice != null) {
-            clusterState = checkAndCompleteShardSplit(clusterState, collection, coreNodeName, sliceName, replicaProps);
-            // get the current slice again because it may have been updated due to checkAndCompleteShardSplit method
-            slice = clusterState.getSlice(collection, sliceName);
-            sliceProps = slice.getProperties();
-            replicas = slice.getReplicasCopy();
-          } else {
-            replicas = new HashMap<>(1);
-            sliceProps = new HashMap<>();
-            sliceProps.put(Slice.RANGE, shardRange);
-            sliceProps.put(Slice.STATE, shardState);
-            sliceProps.put(Slice.PARENT, shardParent);
-          }
-
-          replicas.put(replica.getName(), replica);
-          slice = new Slice(sliceName, replicas, sliceProps);
-
-          ClusterState newClusterState = updateSlice(clusterState, collection, slice);
-          return newClusterState;
-      }
-
-
-
-    private ClusterState checkAndCompleteShardSplit(ClusterState state, String collection, String coreNodeName, String sliceName, Map<String,Object> replicaProps) {
-      Slice slice = state.getSlice(collection, sliceName);
-      Map<String, Object> sliceProps = slice.getProperties();
-      String sliceState = slice.getState();
-      if (Slice.RECOVERY.equals(sliceState))  {
-        log.info("Shard: {} is in recovery state", sliceName);
-        // is this replica active?
-        if (ZkStateReader.ACTIVE.equals(replicaProps.get(ZkStateReader.STATE_PROP))) {
-          log.info("Shard: {} is in recovery state and coreNodeName: {} is active", sliceName, coreNodeName);
-          // are all other replicas also active?
-          boolean allActive = true;
-          for (Entry<String, Replica> entry : slice.getReplicasMap().entrySet()) {
-            if (coreNodeName.equals(entry.getKey()))  continue;
-            if (!Slice.ACTIVE.equals(entry.getValue().getStr(Slice.STATE))) {
-              allActive = false;
-              break;
-            }
-          }
-          if (allActive)  {
-            log.info("Shard: {} - all replicas are active. Finding status of fellow sub-shards", sliceName);
-            // find out about other sub shards
-            Map<String, Slice> allSlicesCopy = new HashMap<>(state.getSlicesMap(collection));
-            List<Slice> subShardSlices = new ArrayList<>();
-            outer:
-            for (Entry<String, Slice> entry : allSlicesCopy.entrySet()) {
-              if (sliceName.equals(entry.getKey()))
-                continue;
-              Slice otherSlice = entry.getValue();
-              if (Slice.RECOVERY.equals(otherSlice.getState())) {
-                if (slice.getParent() != null && slice.getParent().equals(otherSlice.getParent()))  {
-                  log.info("Shard: {} - Fellow sub-shard: {} found", sliceName, otherSlice.getName());
-                  // this is a fellow sub shard so check if all replicas are active
-                  for (Entry<String, Replica> sliceEntry : otherSlice.getReplicasMap().entrySet()) {
-                    if (!ZkStateReader.ACTIVE.equals(sliceEntry.getValue().getStr(ZkStateReader.STATE_PROP)))  {
-                      allActive = false;
-                      break outer;
-                    }
-                  }
-                  log.info("Shard: {} - Fellow sub-shard: {} has all replicas active", sliceName, otherSlice.getName());
-                  subShardSlices.add(otherSlice);
-                }
-              }
-            }
-            if (allActive)  {
-              // hurray, all sub shard replicas are active
-              log.info("Shard: {} - All replicas across all fellow sub-shards are now ACTIVE. Preparing to switch shard states.", sliceName);
-              String parentSliceName = (String) sliceProps.remove(Slice.PARENT);
-
-              Map<String, Object> propMap = new HashMap<>();
-              propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
-              propMap.put(parentSliceName, Slice.INACTIVE);
-              propMap.put(sliceName, Slice.ACTIVE);
-              for (Slice subShardSlice : subShardSlices) {
-                propMap.put(subShardSlice.getName(), Slice.ACTIVE);
-              }
-              propMap.put(ZkStateReader.COLLECTION_PROP, collection);
-              ZkNodeProps m = new ZkNodeProps(propMap);
-              state = updateShardState(state, m);
-            }
-          }
-        }
-      }
-      return state;
-    }
-
-    private ClusterState createCollection(ClusterState state, String collectionName, List<String> shards , ZkNodeProps message) {
-        log.info("Create collection {} with shards {}", collectionName, shards);
-
-        Map<String, Object> routerSpec = DocRouter.getRouterSpec(message);
-        String routerName = routerSpec.get("name") == null ? DocRouter.DEFAULT_NAME : (String) routerSpec.get("name");
-        DocRouter router = DocRouter.getDocRouter(routerName);
-
-        List<DocRouter.Range> ranges = router.partitionRange(shards.size(), router.fullRange());
-
-
-
-        Map<String, Slice> newSlices = new LinkedHashMap<>();
-
-        for (int i = 0; i < shards.size(); i++) {
-          String sliceName = shards.get(i);
-
-          Map<String, Object> sliceProps = new LinkedHashMap<>(1);
-          sliceProps.put(Slice.RANGE, ranges == null? null: ranges.get(i));
-
-          newSlices.put(sliceName, new Slice(sliceName, null, sliceProps));
-        }
-
-        // TODO: fill in with collection properties read from the /collections/<collectionName> node
-        Map<String,Object> collectionProps = new HashMap<>();
-
-        for (Entry<String, Object> e : OverseerCollectionProcessor.COLL_PROPS.entrySet()) {
-          Object val = message.get(e.getKey());
-          if(val == null){
-            val = OverseerCollectionProcessor.COLL_PROPS.get(e.getKey());
-          }
-          if(val != null) collectionProps.put(e.getKey(),val);
-        }
-        collectionProps.put(DocCollection.DOC_ROUTER, routerSpec);
-
-      if (message.getStr("fromApi") == null) {
-        collectionProps.put("autoCreated", "true");
-      }
-      
-      String znode = message.getInt(DocCollection.STATE_FORMAT, 1) == 1 ? null
-          : ZkStateReader.getCollectionPath(collectionName);
-      
-      DocCollection newCollection = new DocCollection(collectionName,
-          newSlices, collectionProps, router, -1, znode);
-      
-      isClusterStateModified = true;
-      
-      log.info("state version {} {}", collectionName, newCollection.getStateFormat());
-      
-      return newState(state, singletonMap(newCollection.getName(), newCollection));
-    }
-
-      /*
-       * Return an already assigned id or null if not assigned
-       */
-      private String getAssignedId(final ClusterState state, final String nodeName,
-          final ZkNodeProps coreState) {
-        Collection<Slice> slices = state.getSlices(coreState.getStr(ZkStateReader.COLLECTION_PROP));
-        if (slices != null) {
-          for (Slice slice : slices) {
-            if (slice.getReplicasMap().get(nodeName) != null) {
-              return slice.getName();
-            }
-          }
-        }
-        return null;
-      }
-      
-      private String getAssignedCoreNodeName(ClusterState state, ZkNodeProps message) {
-        Collection<Slice> slices = state.getSlices(message.getStr(ZkStateReader.COLLECTION_PROP));
-        if (slices != null) {
-          for (Slice slice : slices) {
-            for (Replica replica : slice.getReplicas()) {
-              String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
-              String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
-              
-              String msgNodeName = message.getStr(ZkStateReader.NODE_NAME_PROP);
-              String msgCore = message.getStr(ZkStateReader.CORE_NAME_PROP);
-              
-              if (nodeName.equals(msgNodeName) && core.equals(msgCore)) {
-                return replica.getName();
-              }
-            }
-          }
-        }
-        return null;
-      }
-
-    private ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) {
-        // System.out.println("###!!!### OLD CLUSTERSTATE: " + JSONUtil.toJSON(state.getCollectionStates()));
-        // System.out.println("Updating slice:" + slice);
-        DocCollection newCollection = null;
-        DocCollection coll = state.getCollectionOrNull(collectionName) ;
-        Map<String,Slice> slices;
-        
-        if (coll == null) {
-          //  when updateSlice is called on a collection that doesn't exist, it's currently when a core is publishing itself
-          // without explicitly creating a collection.  In this current case, we assume custom sharding with an "implicit" router.
-          slices = new LinkedHashMap<>(1);
-          slices.put(slice.getName(), slice);
-          Map<String,Object> props = new HashMap<>(1);
-          props.put(DocCollection.DOC_ROUTER, ZkNodeProps.makeMap("name",ImplicitDocRouter.NAME));
-          newCollection = new DocCollection(collectionName, slices, props, new ImplicitDocRouter());
-        } else {
-          slices = new LinkedHashMap<>(coll.getSlicesMap()); // make a shallow copy
-          slices.put(slice.getName(), slice);
-          newCollection = coll.copyWithSlices(slices);
-        }
-
-        // System.out.println("###!!!### NEW CLUSTERSTATE: " + JSONUtil.toJSON(newCollections));
-
-        return newState(state, singletonMap(collectionName, newCollection));
-      }
-      
-      private ClusterState setShardLeader(ClusterState state, String collectionName, String sliceName, String leaderUrl) {
-        DocCollection coll = state.getCollectionOrNull(collectionName);
-
-        if(coll == null) {
-          log.error("Could not mark shard leader for non existing collection:" + collectionName);
-          return state;
-        }
-
-        Map<String, Slice> slices = coll.getSlicesMap();
-        // make a shallow copy and add it to the new collection
-        slices = new LinkedHashMap<>(slices);
-
-        Slice slice = slices.get(sliceName);
-        if (slice == null) {
-          slice = coll.getSlice(sliceName);
-        }
-
-        if (slice == null) {
-          log.error("Could not mark leader for non existing/active slice:" + sliceName);
-          return state;
-        } else {
-          // TODO: consider just putting the leader property on the shard, not on individual replicas
-
-          Replica oldLeader = slice.getLeader();
-
-          final Map<String,Replica> newReplicas = new LinkedHashMap<>();
-
-          for (Replica replica : slice.getReplicas()) {
-
-            // TODO: this should only be calculated once and cached somewhere?
-            String coreURL = ZkCoreNodeProps.getCoreUrl(replica.getStr(ZkStateReader.BASE_URL_PROP), replica.getStr(ZkStateReader.CORE_NAME_PROP));
-
-            if (replica == oldLeader && !coreURL.equals(leaderUrl)) {
-              Map<String,Object> replicaProps = new LinkedHashMap<>(replica.getProperties());
-              replicaProps.remove(Slice.LEADER);
-              replica = new Replica(replica.getName(), replicaProps);
-            } else if (coreURL.equals(leaderUrl)) {
-              Map<String,Object> replicaProps = new LinkedHashMap<>(replica.getProperties());
-              replicaProps.put(Slice.LEADER, "true");  // TODO: allow booleans instead of strings
-              replica = new Replica(replica.getName(), replicaProps);
-            }
-
-            newReplicas.put(replica.getName(), replica);
-          }
-
-          Map<String,Object> newSliceProps = slice.shallowCopy();
-          newSliceProps.put(Slice.REPLICAS, newReplicas);
-          Slice newSlice = new Slice(slice.getName(), newReplicas, slice.getProperties());
-          slices.put(newSlice.getName(), newSlice);
-        }
-
-
-        DocCollection newCollection = coll.copyWithSlices(slices);
-        return newState(state, singletonMap(collectionName, newCollection));
-      }
-
-    private ClusterState newState(ClusterState state, Map<String, DocCollection> colls) {
-      for (Entry<String, DocCollection> e : colls.entrySet()) {
-        DocCollection c = e.getValue();
-        if (c == null) {
-          isClusterStateModified = true;
-          state = state.copyWith(singletonMap(e.getKey(), (DocCollection) null));
-          updateNodes.put(ZkStateReader.getCollectionPath(e.getKey()) ,null);
-          continue;
-        }
-
-        if (c.getStateFormat() > 1) {
-          updateNodes.put(ZkStateReader.getCollectionPath(c.getName()),
-              new ClusterState(-1, Collections.<String>emptySet(), singletonMap(c.getName(), c)));
-        } else {
-          isClusterStateModified = true;
-        }
-        state = state.copyWith(singletonMap(e.getKey(), c));
-
-      }
-      return state;
-    }
-
-    /*
-     * Remove collection from cloudstate
-     */
-    private ClusterState removeCollection(final ClusterState clusterState, ZkNodeProps message) {
-      final String collection = message.getStr("name");
-      if (!checkKeyExistence(message, "name")) return clusterState;
-      DocCollection coll = clusterState.getCollectionOrNull(collection);
-      if(coll == null) return  clusterState;
-
-      isClusterStateModified = true;
-      if (coll.getStateFormat() > 1) {
-        try {
-          log.info("Deleting state for collection : {}", collection);
-          zkClient.delete(ZkStateReader.getCollectionPath(collection), -1, true);
-        } catch (Exception e) {
-          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to remove collection state :" + collection);
-        }
-      }
-      return newState(clusterState, singletonMap(coll.getName(),(DocCollection) null));
-    }
-    /*
-     * Remove collection slice from cloudstate
-     */
-    private ClusterState removeShard(final ClusterState clusterState, ZkNodeProps message) {
-      final String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
-      final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
-      if (!checkCollectionKeyExistence(message)) return clusterState;
-
-      log.info("Removing collection: " + collection + " shard: " + sliceId + " from clusterstate");
-
-      DocCollection coll = clusterState.getCollection(collection);
-
-      Map<String, Slice> newSlices = new LinkedHashMap<>(coll.getSlicesMap());
-      newSlices.remove(sliceId);
-
-      DocCollection newCollection = coll.copyWithSlices(newSlices);
-      return newState(clusterState, singletonMap(collection,newCollection));
-    }
-
-    /*
-       * Remove core from cloudstate
-       */
-      private ClusterState removeCore(final ClusterState clusterState, ZkNodeProps message) {
-        final String cnn = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
-        final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
-        if (!checkCollectionKeyExistence(message)) return clusterState;
-
-        DocCollection coll = clusterState.getCollectionOrNull(collection) ;
-        if (coll == null) {
-          // TODO: log/error that we didn't find it?
-          // just in case, remove the zk collection node
-          try {
-            zkClient.clean("/collections/" + collection);
-          } catch (InterruptedException e) {
-            SolrException.log(log, "Cleaning up collection in zk was interrupted:" + collection, e);
-            Thread.currentThread().interrupt();
-          } catch (KeeperException e) {
-            SolrException.log(log, "Problem cleaning up collection in zk:" + collection, e);
-          }
-          return clusterState;
-        }
-
-        Map<String, Slice> newSlices = new LinkedHashMap<>();
-        boolean lastSlice = false;
-        for (Slice slice : coll.getSlices()) {
-          Replica replica = slice.getReplica(cnn);
-          if (replica != null) {
-            Map<String, Replica> newReplicas = slice.getReplicasCopy();
-            newReplicas.remove(cnn);
-            // TODO TODO TODO!!! if there are no replicas left for the slice, and the slice has no hash range, remove it
-            // if (newReplicas.size() == 0 && slice.getRange() == null) {
-            // if there are no replicas left for the slice remove it
-            if (newReplicas.size() == 0) {
-              slice = null;
-              lastSlice = true;
-            } else {
-              slice = new Slice(slice.getName(), newReplicas, slice.getProperties());
-            }
-          }
-
-          if (slice != null) {
-            newSlices.put(slice.getName(), slice);
-          }
-        }
-
-        if (lastSlice) {
-          // remove all empty pre allocated slices
-          for (Slice slice : coll.getSlices()) {
-            if (slice.getReplicas().size() == 0) {
-              newSlices.remove(slice.getName());
-            }
-          }
-        }
-
-        // if there are no slices left in the collection, remove it?
-        if (newSlices.size() == 0) {
-
-          // TODO: it might be better logically to have this in ZkController
-          // but for tests (it's easier) it seems better for the moment to leave CoreContainer and/or
-          // ZkController out of the Overseer.
-          try {
-            zkClient.clean("/collections/" + collection);
-          } catch (InterruptedException e) {
-            SolrException.log(log, "Cleaning up collection in zk was interrupted:" + collection, e);
-            Thread.currentThread().interrupt();
-          } catch (KeeperException e) {
-            SolrException.log(log, "Problem cleaning up collection in zk:" + collection, e);
-          }
-          return newState(clusterState,singletonMap(collection, (DocCollection) null));
-
-        } else {
-          DocCollection newCollection = coll.copyWithSlices(newSlices);
-          return newState(clusterState,singletonMap(collection,newCollection));
-        }
-
-     }
-
-      @Override
+    @Override
       public void close() {
         this.isClosed = true;
       }
@@ -1433,7 +504,6 @@ public class Overseer implements Closeab
   }
   // Class to encapsulate processing replica properties that have at most one replica hosting a property per slice.
   private class ExclusiveSliceProperty {
-    private ClusterStateUpdater updater;
     private ClusterState clusterState;
     private final boolean onlyActiveNodes;
     private final String property;
@@ -1455,8 +525,7 @@ public class Overseer implements Closeab
 
     private int assigned = 0;
 
-    ExclusiveSliceProperty(ClusterStateUpdater updater, ClusterState clusterState, ZkNodeProps message) {
-      this.updater = updater;
+    ExclusiveSliceProperty(ClusterState clusterState, ZkNodeProps message) {
       this.clusterState = clusterState;
       String tmp = message.getStr(ZkStateReader.PROPERTY_PROP);
       if (StringUtils.startsWith(tmp, OverseerCollectionProcessor.COLL_PROP_PREFIX) == false) {
@@ -1473,7 +542,7 @@ public class Overseer implements Closeab
 
       Boolean shardUnique = Boolean.parseBoolean(message.getStr(SHARD_UNIQUE));
       if (shardUnique == false &&
-          Overseer.sliceUniqueBooleanProperties.contains(this.property) == false) {
+          SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(this.property) == false) {
         throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Balancing properties amongst replicas in a slice requires that"
             + " the property be a pre-defined property (e.g. 'preferredLeader') or that 'shardUnique' be set to 'true' " +
             " Property: " + this.property + " shardUnique: " + Boolean.toString(shardUnique));
@@ -1717,7 +786,8 @@ public class Overseer implements Closeab
 
       balanceUnassignedReplicas();
       for (Slice newSlice : changedSlices.values()) {
-        clusterState = updater.updateSlice(clusterState, collectionName, newSlice);
+        DocCollection docCollection = CollectionMutator.updateSlice(collectionName, clusterState.getCollection(collectionName), newSlice);
+        clusterState = ClusterStateMutator.newState(clusterState, collectionName, docCollection);
       }
       return true;
     }
@@ -1733,28 +803,6 @@ public class Overseer implements Closeab
     }
   }
 
-  static void getShardNames(Integer numShards, List<String> shardNames) {
-    if(numShards == null)
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "numShards" + " is a required param");
-    for (int i = 0; i < numShards; i++) {
-      final String sliceName = "shard" + (i + 1);
-      shardNames.add(sliceName);
-    }
-
-  }
-
-  static void getShardNames(List<String> shardNames, String shards) {
-    if(shards ==null)
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "shards" + " is a required param");
-    for (String s : shards.split(",")) {
-      if(s ==null || s.trim().isEmpty()) continue;
-      shardNames.add(s.trim());
-    }
-    if(shardNames.isEmpty())
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "shards" + " is a required param");
-
-  }
-
   class OverseerThread extends Thread implements Closeable {
 
     protected volatile boolean isClosed;
@@ -1846,7 +894,10 @@ public class Overseer implements Closeab
     ccThread.start();
     arfoThread.start();
   }
-  
+
+  public Stats getStats() {
+    return stats;
+  }
   
   /**
    * For tests.
@@ -1964,6 +1015,7 @@ public class Overseer implements Closeab
     static final int MAX_STORED_FAILURES = 10;
 
     final Map<String, Stat> stats = new ConcurrentHashMap<>();
+    private volatile int queueLength;
 
     public Map<String, Stat> getStats() {
       return stats;
@@ -2034,6 +1086,14 @@ public class Overseer implements Closeab
         return ret;
       }
     }
+
+    public int getQueueLength() {
+      return queueLength;
+    }
+
+    public void setQueueLength(int queueLength) {
+      this.queueLength = queueLength;
+    }
   }
 
   public static class Stat  {

Modified: lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1642718&r1=1642717&r2=1642718&view=diff
==============================================================================
--- lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/branches/lucene2878/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Mon Dec  1 17:25:39 2014
@@ -68,6 +68,8 @@ import org.apache.solr.client.solrj.resp
 import org.apache.solr.cloud.Assign.Node;
 import org.apache.solr.cloud.DistributedQueue.QueueEvent;
 import org.apache.solr.cloud.Overseer.LeaderStatus;
+import org.apache.solr.cloud.overseer.ClusterStateMutator;
+import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.Aliases;
@@ -467,7 +469,7 @@ public class OverseerCollectionProcessor
     }
     //now ask the current leader to QUIT , so that the designate can takeover
     Overseer.getInQueue(zkStateReader.getZkClient()).offer(
-        ZkStateReader.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.QUIT.toLower(),
+        ZkStateReader.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower(),
             "id",getLeaderId(zkStateReader.getZkClient()))));
 
   }
@@ -698,7 +700,7 @@ public class OverseerCollectionProcessor
     SolrZkClient zkClient = zkStateReader.getZkClient();
     DistributedQueue inQueue = Overseer.getInQueue(zkClient);
     Map<String, Object> propMap = new HashMap<>();
-    propMap.put(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.LEADER.toLower());
+    propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower());
     propMap.put(COLLECTION_PROP, collectionName);
     propMap.put(SHARD_ID_PROP, shardId);
     propMap.put(BASE_URL_PROP, baseURL);
@@ -1148,7 +1150,7 @@ public class OverseerCollectionProcessor
 
   private void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws KeeperException, InterruptedException {
     ZkNodeProps m = new ZkNodeProps(
-        Overseer.QUEUE_OPERATION, Overseer.OverseerAction.DELETECORE.toLower(),
+        Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
         ZkStateReader.CORE_NAME_PROP, core,
         ZkStateReader.NODE_NAME_PROP, replica.getStr(ZkStateReader.NODE_NAME_PROP),
         ZkStateReader.COLLECTION_PROP, collectionName,
@@ -1760,7 +1762,7 @@ public class OverseerCollectionProcessor
         log.info("Replication factor is 1 so switching shard states");
         DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
         Map<String, Object> propMap = new HashMap<>();
-        propMap.put(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.UPDATESHARDSTATE.toLower());
+        propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
         propMap.put(slice, Slice.INACTIVE);
         for (String subSlice : subSlices) {
           propMap.put(subSlice, Slice.ACTIVE);
@@ -1772,7 +1774,7 @@ public class OverseerCollectionProcessor
         log.info("Requesting shard state be set to 'recovery'");
         DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
         Map<String, Object> propMap = new HashMap<>();
-        propMap.put(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.UPDATESHARDSTATE.toLower());
+        propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
         for (String subSlice : subSlices) {
           propMap.put(subSlice, Slice.RECOVERY);
         }
@@ -2062,7 +2064,7 @@ public class OverseerCollectionProcessor
     completeAsyncRequest(asyncId, requestMap, results);
 
     ZkNodeProps m = new ZkNodeProps(
-        Overseer.QUEUE_OPERATION, Overseer.OverseerAction.ADDROUTINGRULE.toLower(),
+        Overseer.QUEUE_OPERATION, OverseerAction.ADDROUTINGRULE.toLower(),
         COLLECTION_PROP, sourceCollection.getName(),
         SHARD_ID_PROP, sourceSlice.getName(),
         "routeKey", SolrIndexSplitter.getRouteKey(splitKey) + "!",
@@ -2315,10 +2317,10 @@ public class OverseerCollectionProcessor
       String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
       List<String> shardNames = new ArrayList<>();
       if(ImplicitDocRouter.NAME.equals(router)){
-        Overseer.getShardNames(shardNames, message.getStr("shards",null));
+        ClusterStateMutator.getShardNames(shardNames, message.getStr("shards", null));
         numSlices = shardNames.size();
       } else {
-        Overseer.getShardNames(numSlices,shardNames);
+        ClusterStateMutator.getShardNames(numSlices, shardNames);
       }
 
       if (numSlices == null ) {