You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by gc...@apache.org on 2015/08/06 08:07:33 UTC

svn commit: r1694406 [4/4] - in /lucene/dev/trunk/solr: core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/cloud/overseer/ core/src/java/org/apache/solr/handler/admin/ core/src/test/org/apache/solr/cloud/ solrj/src/test/org/apache/solr/...

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java?rev=1694406&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java Thu Aug  6 06:07:32 2015
@@ -0,0 +1,80 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.common.cloud.ZkNodeProps;
+
+/**
+ * Interface for processing messages received by an {@link OverseerProcessor}
+ */
+public interface OverseerMessageHandler {
+
+  /**
+   * @param message the message to process
+   * @param operation the operation to process
+   *
+   * @return response
+   */
+  SolrResponse processMessage(ZkNodeProps message, String operation);
+
+  /**
+   * @return the name of the OverseerMessageHandler
+   */
+  String getName();
+
+  /**
+   * @param operation the operation to be timed
+   *
+   * @return the name of the timer to use for the operation
+   */
+  String getTimerName(String operation);
+
+  /**
+   * @param message the message being processed
+   *
+   * @return the taskKey for the message for handling task exclusivity
+   */
+  String getTaskKey(ZkNodeProps message);
+
+  /**
+   * @param taskKey the key associated with the task, cached from getTaskKey
+   * @param message the message being processed
+   */
+  void markExclusiveTask(String taskKey, ZkNodeProps message);
+  
+  /**
+   * @param taskKey the key associated with the task
+   * @param operation the operation being processed
+   */
+  void unmarkExclusiveTask(String taskKey, String operation);
+
+  /**
+   * @param taskKey the key associated with the task
+   * @param message the message being processed
+   *
+   * @return the exclusive marking
+   */
+  ExclusiveMarking checkExclusiveMarking(String taskKey, ZkNodeProps message);
+
+  enum ExclusiveMarking {
+    NOTDETERMINED,    // not enough context, fall back to the processor (i.e. look at running tasks)
+    EXCLUSIVE,
+    NONEXCLUSIVE
+  }
+}

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java?rev=1694406&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java Thu Aug  6 06:07:32 2015
@@ -0,0 +1,112 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.ShardHandlerFactory;
+import org.apache.solr.handler.component.ShardRequest;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Responsible for prioritization of Overseer nodes, for example with the
+ * ADDROLE collection command.
+ */
+public class OverseerNodePrioritizer {
+
+  private static Logger log = LoggerFactory.getLogger(OverseerNodePrioritizer.class);
+
+  private final ZkStateReader zkStateReader;
+  private final String adminPath;
+  private final ShardHandlerFactory shardHandlerFactory;
+
+  public OverseerNodePrioritizer(ZkStateReader zkStateReader, String adminPath, ShardHandlerFactory shardHandlerFactory) {
+    this.zkStateReader = zkStateReader;
+    this.adminPath = adminPath;
+    this.shardHandlerFactory = shardHandlerFactory;
+  }
+
+  public synchronized void prioritizeOverseerNodes(String overseerId) throws KeeperException, InterruptedException {
+    SolrZkClient zk = zkStateReader.getZkClient();
+    if(!zk.exists(ZkStateReader.ROLES,true))return;
+    Map m = (Map) Utils.fromJSON(zk.getData(ZkStateReader.ROLES, null, new Stat(), true));
+
+    List overseerDesignates = (List) m.get("overseer");
+    if(overseerDesignates==null || overseerDesignates.isEmpty()) return;
+    String ldr = OverseerProcessor.getLeaderNode(zk);
+    if(overseerDesignates.contains(ldr)) return;
+    log.info("prioritizing overseer nodes at {} overseer designates are {}", overseerId, overseerDesignates);
+    List<String> electionNodes = OverseerProcessor.getSortedElectionNodes(zk, OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE);
+    if(electionNodes.size()<2) return;
+    log.info("sorted nodes {}", electionNodes);
+
+    String designateNodeId = null;
+    for (String electionNode : electionNodes) {
+      if(overseerDesignates.contains( LeaderElector.getNodeName(electionNode))){
+        designateNodeId = electionNode;
+        break;
+      }
+    }
+
+    if(designateNodeId == null){
+      log.warn("No live overseer designate ");
+      return;
+    }
+    if(!designateNodeId.equals( electionNodes.get(1))) { //checking if it is already at no:1
+      log.info("asking node {} to come join election at head", designateNodeId);
+      invokeOverseerOp(designateNodeId, "rejoinAtHead"); //ask designate to come first
+      log.info("asking the old first in line {} to rejoin election  ",electionNodes.get(1) );
+      invokeOverseerOp(electionNodes.get(1), "rejoin");//ask second inline to go behind
+    }
+    //now ask the current leader to QUIT , so that the designate can takeover
+    Overseer.getInQueue(zkStateReader.getZkClient()).offer(
+        Utils.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower(),
+            "id", OverseerProcessor.getLeaderId(zkStateReader.getZkClient()))));
+
+  }
+
+  private void invokeOverseerOp(String electionNode, String op) {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+    params.set(CoreAdminParams.ACTION, CoreAdminAction.OVERSEEROP.toString());
+    params.set("op", op);
+    params.set("qt", adminPath);
+    params.set("electionNode", electionNode);
+    ShardRequest sreq = new ShardRequest();
+    sreq.purpose = 1;
+    String replica = zkStateReader.getBaseUrlForNodeName(LeaderElector.getNodeName(electionNode));
+    sreq.shards = new String[]{replica};
+    sreq.actualShards = sreq.shards;
+    sreq.params = params;
+    shardHandler.submit(sreq, replica, sreq.params);
+    shardHandler.takeCompletedOrError();
+  }
+}

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerProcessor.java?rev=1694406&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerProcessor.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerProcessor.java Thu Aug  6 06:07:32 2015
@@ -0,0 +1,571 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.cloud.DistributedQueue.QueueEvent;
+import org.apache.solr.cloud.Overseer.LeaderStatus;
+import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandlerFactory;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.stats.TimerContext;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+/**
+ * A generic processor run in the Overseer, used for handling items added
+ * to a distributed work queue.  Has support for handling exclusive tasks
+ * (i.e. tasks that should not run in parallel with each other).
+ *
+ * An {@link OverseerMessageHandlerSelector} determines which
+ * {@link OverseerMessageHandler} handles specific messages in the
+ * queue.
+ */
+public class OverseerProcessor implements Runnable, Closeable {
+
+  public int maxParallelThreads = 10;
+
+  public ExecutorService tpe ;
+
+  private static Logger log = LoggerFactory
+      .getLogger(OverseerProcessor.class);
+
+  private DistributedQueue workQueue;
+  private DistributedMap runningMap;
+  private DistributedMap completedMap;
+  private DistributedMap failureMap;
+
+  // Set that maintains a list of all the tasks that are running. This is keyed on zk id of the task.
+  final private Set runningTasks;
+
+  // List of completed tasks. This is used to clean up workQueue in zk.
+  final private HashMap<String, QueueEvent> completedTasks;
+
+  private String myId;
+
+  private final ShardHandlerFactory shardHandlerFactory;
+
+  private String adminPath;
+
+  private ZkStateReader zkStateReader;
+
+  private boolean isClosed;
+
+  private Overseer.Stats stats;
+
+  // Set of tasks that have been picked up for processing but not cleaned up from zk work-queue.
+  // It may contain tasks that have completed execution, have been entered into the completed/failed map in zk but not
+  // deleted from the work-queue as that is a batched operation.
+  final private Set<String> runningZKTasks;
+  private final Object waitLock = new Object();
+
+  private OverseerMessageHandlerSelector selector;
+
+  private OverseerNodePrioritizer prioritizer;
+
+  public OverseerProcessor(ZkStateReader zkStateReader, String myId,
+                                        final ShardHandlerFactory shardHandlerFactory,
+                                        String adminPath,
+                                        Overseer.Stats stats,
+                                        OverseerMessageHandlerSelector selector,
+                                        OverseerNodePrioritizer prioritizer,
+                                        DistributedQueue workQueue,
+                                        DistributedMap runningMap,
+                                        DistributedMap completedMap,
+                                        DistributedMap failureMap) {
+    this.zkStateReader = zkStateReader;
+    this.myId = myId;
+    this.shardHandlerFactory = shardHandlerFactory;
+    this.adminPath = adminPath;
+    this.stats = stats;
+    this.selector = selector;
+    this.prioritizer = prioritizer;
+    this.workQueue = workQueue;
+    this.runningMap = runningMap;
+    this.completedMap = completedMap;
+    this.failureMap = failureMap;
+    this.runningZKTasks = new HashSet<>();
+    this.runningTasks = new HashSet();
+    this.completedTasks = new HashMap<>();
+  }
+
+  @Override
+  public void run() {
+    log.info("Process current queue of overseer operations");
+    LeaderStatus isLeader = amILeader();
+    while (isLeader == LeaderStatus.DONT_KNOW) {
+      log.debug("am_i_leader unclear {}", isLeader);
+      isLeader = amILeader();  // not a no, not a yes, try ask again
+    }
+
+    String oldestItemInWorkQueue = null;
+    // hasLeftOverItems - used for avoiding re-execution of async tasks that were processed by a previous Overseer.
+    // This variable is set in case there's any task found on the workQueue when the OCP starts up and
+    // the id for the queue tail is used as a marker to check for the task in completed/failed map in zk.
+    // Beyond the marker, all tasks can safely be assumed to have never been executed.
+    boolean hasLeftOverItems = true;
+
+    try {
+      oldestItemInWorkQueue = workQueue.getTailId();
+    } catch (KeeperException e) {
+      // We don't need to handle this. This is just a fail-safe which comes in handy in skipping already processed
+      // async calls.
+      SolrException.log(log, "", e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+
+    if (oldestItemInWorkQueue == null)
+      hasLeftOverItems = false;
+    else
+      log.debug("Found already existing elements in the work-queue. Last element: {}", oldestItemInWorkQueue);
+
+    try {
+      prioritizer.prioritizeOverseerNodes(myId);
+    } catch (Exception e) {
+      log.error("Unable to prioritize overseer ", e);
+    }
+
+    // TODO: Make maxThreads configurable.
+
+    this.tpe = new ExecutorUtil.MDCAwareThreadPoolExecutor(5, 100, 0L, TimeUnit.MILLISECONDS,
+        new SynchronousQueue<Runnable>(),
+        new DefaultSolrThreadFactory("OverseerThreadFactory"));
+    try {
+      while (!this.isClosed) {
+        try {
+          isLeader = amILeader();
+          if (LeaderStatus.NO == isLeader) {
+            break;
+          } else if (LeaderStatus.YES != isLeader) {
+            log.debug("am_i_leader unclear {}", isLeader);
+            continue; // not a no, not a yes, try asking again
+          }
+
+          log.debug("Cleaning up work-queue. #Running tasks: {}", runningTasks.size());
+          cleanUpWorkQueue();
+
+          printTrackingMaps();
+
+          boolean waited = false;
+
+          while (runningTasks.size() > maxParallelThreads) {
+            synchronized (waitLock) {
+              waitLock.wait(100);//wait for 100 ms or till a task is complete
+            }
+            waited = true;
+          }
+
+          if (waited)
+            cleanUpWorkQueue();
+
+          List<QueueEvent> heads = workQueue.peekTopN(maxParallelThreads, runningZKTasks, 2000L);
+
+          if (heads == null)
+            continue;
+
+          log.debug("Got {} tasks from work-queue : [{}]", heads.size(), heads.toString());
+
+          if (isClosed) break;
+
+          for (QueueEvent head : heads) {
+            final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
+            OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message);
+            String taskKey = messageHandler.getTaskKey(message);
+            final String asyncId = message.getStr(ASYNC);
+            if (hasLeftOverItems) {
+              if (head.getId().equals(oldestItemInWorkQueue))
+                hasLeftOverItems = false;
+              if (asyncId != null && (completedMap.contains(asyncId) || failureMap.contains(asyncId))) {
+                log.debug("Found already processed task in workQueue, cleaning up. AsyncId [{}]",asyncId );
+                workQueue.remove(head);
+                continue;
+              }
+            }
+
+            if (!checkExclusivity(messageHandler, message, head.getId())) {
+              log.debug("Exclusivity check failed for [{}]", message.toString());
+              continue;
+            }
+
+            try {
+              markTaskAsRunning(messageHandler, head, taskKey, asyncId, message);
+              log.debug("Marked task [{}] as running", head.getId());
+            } catch (KeeperException.NodeExistsException e) {
+              // This should never happen
+              log.error("Tried to pick up task [{}] when it was already running!", head.getId());
+            } catch (InterruptedException e) {
+              log.error("Thread interrupted while trying to pick task for execution.", head.getId());
+              Thread.currentThread().interrupt();
+            }
+
+            log.info(messageHandler.getName() + ": Get the message id:" + head.getId() + " message:" + message.toString());
+            String operation = message.getStr(Overseer.QUEUE_OPERATION);
+            Runner runner = new Runner(messageHandler, message,
+                operation, head);
+            tpe.execute(runner);
+          }
+
+        } catch (KeeperException e) {
+          if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
+            log.warn("Overseer cannot talk to ZK");
+            return;
+          }
+          SolrException.log(log, "", e);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return;
+        } catch (Exception e) {
+          SolrException.log(log, "", e);
+        }
+      }
+    } finally {
+      this.close();
+    }
+  }
+
+  protected boolean checkExclusivity(OverseerMessageHandler messageHandler, ZkNodeProps message, String id)
+      throws KeeperException, InterruptedException {
+    String taskKey = messageHandler.getTaskKey(message);
+
+    if(taskKey == null)
+      return true;
+
+    OverseerMessageHandler.ExclusiveMarking marking = messageHandler.checkExclusiveMarking(taskKey, message);
+    switch (marking) {
+      case NOTDETERMINED:
+        break;
+      case EXCLUSIVE:
+        return true;
+      case NONEXCLUSIVE:
+        return false;
+      default:
+        throw new IllegalArgumentException("Undefined marking: " + marking);
+    }
+
+    if(runningZKTasks.contains(id))
+      return false;
+
+    return true;
+  }
+
+  private void cleanUpWorkQueue() throws KeeperException, InterruptedException {
+    synchronized (completedTasks) {
+      for (String id : completedTasks.keySet()) {
+        workQueue.remove(completedTasks.get(id));
+        runningZKTasks.remove(id);
+      }
+      completedTasks.clear();
+    }
+  }
+
+  public void close() {
+    isClosed = true;
+    if(tpe != null) {
+      if (!tpe.isShutdown()) {
+        tpe.shutdown();
+        try {
+          tpe.awaitTermination(60, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          log.warn("Thread interrupted while waiting for OCP threadpool close.");
+          Thread.currentThread().interrupt();
+        } finally {
+          if (!tpe.isShutdown())
+            tpe.shutdownNow();
+        }
+      }
+    }
+  }
+
+  public static List<String> getSortedOverseerNodeNames(SolrZkClient zk) throws KeeperException, InterruptedException {
+    List<String> children = null;
+    try {
+      children = zk.getChildren(OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE, null, true);
+    } catch (Exception e) {
+      log.warn("error ", e);
+      return new ArrayList<>();
+    }
+    LeaderElector.sortSeqs(children);
+    ArrayList<String> nodeNames = new ArrayList<>(children.size());
+    for (String c : children) nodeNames.add(LeaderElector.getNodeName(c));
+    return nodeNames;
+  }
+
+  public static List<String> getSortedElectionNodes(SolrZkClient zk, String path) throws KeeperException, InterruptedException {
+    List<String> children = null;
+    try {
+      children = zk.getChildren(path, null, true);
+      LeaderElector.sortSeqs(children);
+      return children;
+    } catch (Exception e) {
+      throw e;
+    }
+
+  }
+
+  public static String getLeaderNode(SolrZkClient zkClient) throws KeeperException, InterruptedException {
+    String id = getLeaderId(zkClient);
+    return id==null ?
+        null:
+        LeaderElector.getNodeName( id);
+  }
+
+  public static String getLeaderId(SolrZkClient zkClient) throws KeeperException,InterruptedException{
+    byte[] data = null;
+    try {
+      data = zkClient.getData("/overseer_elect/leader", null, new Stat(), true);
+    } catch (KeeperException.NoNodeException e) {
+      return null;
+    }
+    Map m = (Map) Utils.fromJSON(data);
+    return  (String) m.get("id");
+  }
+
+  protected LeaderStatus amILeader() {
+    String statsName = "collection_am_i_leader";
+    TimerContext timerContext = stats.time(statsName);
+    boolean success = true;
+    try {
+      ZkNodeProps props = ZkNodeProps.load(zkStateReader.getZkClient().getData(
+          "/overseer_elect/leader", null, null, true));
+      if (myId.equals(props.getStr("id"))) {
+        return LeaderStatus.YES;
+      }
+    } catch (KeeperException e) {
+      success = false;
+      if (e.code() == KeeperException.Code.CONNECTIONLOSS) {
+        log.error("", e);
+        return LeaderStatus.DONT_KNOW;
+      } else if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
+        log.info("", e);
+      } else {
+        log.warn("", e);
+      }
+    } catch (InterruptedException e) {
+      success = false;
+      Thread.currentThread().interrupt();
+    } finally {
+      timerContext.stop();
+      if (success)  {
+        stats.success(statsName);
+      } else  {
+        stats.error(statsName);
+      }
+    }
+    log.info("According to ZK I (id=" + myId + ") am no longer a leader.");
+    return LeaderStatus.NO;
+  }
+
+  public boolean isClosed() {
+    return isClosed;
+  }
+
+  @SuppressWarnings("unchecked")
+  private void markTaskAsRunning(OverseerMessageHandler messageHandler, QueueEvent head, String taskKey,
+                                 String asyncId, ZkNodeProps message)
+      throws KeeperException, InterruptedException {
+    synchronized (runningZKTasks) {
+      runningZKTasks.add(head.getId());
+    }
+
+    synchronized (runningTasks) {
+      runningTasks.add(head.getId());
+    }
+
+    messageHandler.markExclusiveTask(taskKey, message);
+
+    if(asyncId != null)
+      runningMap.put(asyncId, null);
+  }
+  
+  protected class Runner implements Runnable {
+    ZkNodeProps message;
+    String operation;
+    SolrResponse response;
+    QueueEvent head;
+    OverseerMessageHandler messageHandler;
+  
+    public Runner(OverseerMessageHandler messageHandler, ZkNodeProps message, String operation, QueueEvent head) {
+      this.message = message;
+      this.operation = operation;
+      this.head = head;
+      this.messageHandler = messageHandler;
+      response = null;
+    }
+
+
+    public void run() {
+      String statsName = messageHandler.getTimerName(operation);
+      final TimerContext timerContext = stats.time(statsName);
+
+      boolean success = false;
+      final String asyncId = message.getStr(ASYNC);
+      String taskKey = messageHandler.getTaskKey(message);
+
+      try {
+        try {
+          log.debug("Runner processing {}", head.getId());
+          response = messageHandler.processMessage(message, operation);
+        } finally {
+          timerContext.stop();
+          updateStats(statsName);
+        }
+
+        if(asyncId != null) {
+          if (response != null && (response.getResponse().get("failure") != null 
+              || response.getResponse().get("exception") != null)) {
+            failureMap.put(asyncId, SolrResponse.serializable(response));
+            log.debug("Updated failed map for task with zkid:[{}]", head.getId());
+          } else {
+            completedMap.put(asyncId, SolrResponse.serializable(response));
+            log.debug("Updated completed map for task with zkid:[{}]", head.getId());
+          }
+        } else {
+          head.setBytes(SolrResponse.serializable(response));
+          log.debug("Completed task:[{}]", head.getId());
+        }
+
+        markTaskComplete(messageHandler, head.getId(), asyncId, taskKey);
+        log.debug("Marked task [{}] as completed.", head.getId());
+        printTrackingMaps();
+
+        log.info(messageHandler.getName() + ": Message id:" + head.getId() +
+            " complete, response:" + response.getResponse().toString());
+        success = true;
+      } catch (KeeperException e) {
+        SolrException.log(log, "", e);
+      } catch (InterruptedException e) {
+        // Reset task from tracking data structures so that it can be retried.
+        resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey);
+        log.warn("Resetting task {} as the thread was interrupted.", head.getId());
+        Thread.currentThread().interrupt();
+      } finally {
+        if(!success) {
+          // Reset task from tracking data structures so that it can be retried.
+          resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey);
+        }
+        synchronized (waitLock){
+          waitLock.notifyAll();
+        }
+      }
+    }
+
+    private void markTaskComplete(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey)
+        throws KeeperException, InterruptedException {
+      synchronized (completedTasks) {
+        completedTasks.put(id, head);
+      }
+
+      synchronized (runningTasks) {
+        runningTasks.remove(id);
+      }
+
+      if(asyncId != null)
+        runningMap.remove(asyncId);
+
+      messageHandler.unmarkExclusiveTask(taskKey, operation);
+    }
+
+    private void resetTaskWithException(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey) {
+      log.warn("Resetting task: {}, requestid: {}, taskKey: {}", id, asyncId, taskKey);
+      try {
+        if (asyncId != null)
+          runningMap.remove(asyncId);
+
+        synchronized (runningTasks) {
+          runningTasks.remove(id);
+        }
+
+        messageHandler.unmarkExclusiveTask(taskKey, operation);
+      } catch (KeeperException e) {
+        SolrException.log(log, "", e);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+
+    }
+
+    private void updateStats(String statsName) {
+      if (isSuccessful()) {
+        stats.success(statsName);
+      } else {
+        stats.error(statsName);
+        stats.storeFailureDetails(statsName, message, response);
+      }
+    }
+
+    private boolean isSuccessful() {
+      if(response == null)
+        return false;
+      return !(response.getResponse().get("failure") != null || response.getResponse().get("exception") != null);
+    }
+  }
+
+  private void printTrackingMaps() {
+    if(log.isDebugEnabled()) {
+      synchronized (runningTasks) {
+        log.debug("RunningTasks: {}", runningTasks.toString());
+      }
+      synchronized (completedTasks) {
+        log.debug("CompletedTasks: {}", completedTasks.keySet().toString());
+      }
+      synchronized (runningZKTasks) {
+        log.debug("RunningZKTasks: {}", runningZKTasks.toString());
+      }
+    }
+  }
+
+
+
+  String getId(){
+    return myId;
+  }
+
+  /**
+   * An interface to determine which {@link OverseerMessageHandler}
+   * handles a given message.  This could be a single OverseerMessageHandler
+   * for the case where a single type of message is handled (e.g. collection
+   * messages only) , or a different handler could be selected based on the
+   * contents of the message.
+   */
+  public interface OverseerMessageHandlerSelector {
+    OverseerMessageHandler selectOverseerMessageHandler(ZkNodeProps message);
+  }
+
+}

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java Thu Aug  6 06:07:32 2015
@@ -24,7 +24,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.solr.cloud.OverseerCollectionProcessor;
+import org.apache.solr.cloud.OverseerCollectionMessageHandler;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
@@ -87,10 +87,10 @@ public class ClusterStateMutator {
 
     Map<String, Object> collectionProps = new HashMap<>();
 
-    for (Map.Entry<String, Object> e : OverseerCollectionProcessor.COLL_PROPS.entrySet()) {
+    for (Map.Entry<String, Object> e : OverseerCollectionMessageHandler.COLL_PROPS.entrySet()) {
       Object val = message.get(e.getKey());
       if (val == null) {
-        val = OverseerCollectionProcessor.COLL_PROPS.get(e.getKey());
+        val = OverseerCollectionMessageHandler.COLL_PROPS.get(e.getKey());
       }
       if (val != null) collectionProps.put(e.getKey(), val);
     }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java Thu Aug  6 06:07:32 2015
@@ -28,7 +28,7 @@ import java.util.Set;
 import org.apache.commons.lang.StringUtils;
 import org.apache.solr.cloud.Assign;
 import org.apache.solr.cloud.Overseer;
-import org.apache.solr.cloud.OverseerCollectionProcessor;
+import org.apache.solr.cloud.OverseerCollectionMessageHandler;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
@@ -40,7 +40,7 @@ import org.apache.solr.common.util.Utils
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_PROP_PREFIX;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
 import static org.apache.solr.cloud.overseer.CollectionMutator.checkCollectionKeyExistence;
 import static org.apache.solr.cloud.overseer.CollectionMutator.checkKeyExistence;
 import static org.apache.solr.common.params.CommonParams.NAME;
@@ -107,18 +107,18 @@ public class ReplicaMutator {
     String replicaName = message.getStr(ZkStateReader.REPLICA_PROP);
     String property = message.getStr(ZkStateReader.PROPERTY_PROP).toLowerCase(Locale.ROOT);
     if (StringUtils.startsWith(property, COLL_PROP_PREFIX) == false) {
-      property = OverseerCollectionProcessor.COLL_PROP_PREFIX + property;
+      property = OverseerCollectionMessageHandler.COLL_PROP_PREFIX + property;
     }
     property = property.toLowerCase(Locale.ROOT);
     String propVal = message.getStr(ZkStateReader.PROPERTY_VALUE_PROP);
-    String shardUnique = message.getStr(OverseerCollectionProcessor.SHARD_UNIQUE);
+    String shardUnique = message.getStr(OverseerCollectionMessageHandler.SHARD_UNIQUE);
 
     boolean isUnique = false;
 
     if (SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(property)) {
       if (StringUtils.isNotBlank(shardUnique) && Boolean.parseBoolean(shardUnique) == false) {
         throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Overseer ADDREPLICAPROP for " +
-            property + " cannot have " + OverseerCollectionProcessor.SHARD_UNIQUE + " set to anything other than" +
+            property + " cannot have " + OverseerCollectionMessageHandler.SHARD_UNIQUE + " set to anything other than" +
             "'true'. No action taken");
       }
       isUnique = true;
@@ -170,7 +170,7 @@ public class ReplicaMutator {
     String replicaName = message.getStr(ZkStateReader.REPLICA_PROP);
     String property = message.getStr(ZkStateReader.PROPERTY_PROP).toLowerCase(Locale.ROOT);
     if (StringUtils.startsWith(property, COLL_PROP_PREFIX) == false) {
-      property = OverseerCollectionProcessor.COLL_PROP_PREFIX + property;
+      property = OverseerCollectionMessageHandler.COLL_PROP_PREFIX + property;
     }
 
     Replica replica = clusterState.getReplica(collectionName, replicaName);

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java Thu Aug  6 06:07:32 2015
@@ -37,7 +37,7 @@ import org.apache.solr.common.util.Utils
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_PROP_PREFIX;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
 import static org.apache.solr.cloud.overseer.CollectionMutator.checkCollectionKeyExistence;
 import static org.apache.solr.common.util.Utils.makeMap;
 import static org.apache.solr.common.params.CommonParams.NAME;

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Thu Aug  6 06:07:32 2015
@@ -71,16 +71,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_CONF;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_PROP_PREFIX;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET_SHUFFLE;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_ACTIVE_NODES;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_IF_DOWN;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.REQUESTID;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARD_UNIQUE;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.ONLY_ACTIVE_NODES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.ONLY_IF_DOWN;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.REQUESTID;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARD_UNIQUE;
 import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
 import static org.apache.solr.common.cloud.DocCollection.RULE;
 import static org.apache.solr.common.cloud.DocCollection.SNITCH;

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java Thu Aug  6 06:07:32 2015
@@ -25,7 +25,7 @@ import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.solr.cloud.LeaderElector;
-import org.apache.solr.cloud.OverseerCollectionProcessor;
+import org.apache.solr.cloud.OverseerProcessor;
 import org.apache.solr.cloud.overseer.SliceMutator;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
@@ -160,7 +160,7 @@ class RebalanceLeaders {
 
       ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
 
-      List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+      List<String> electionNodes = OverseerProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
           ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
 
       if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway.
@@ -193,7 +193,7 @@ class RebalanceLeaders {
       throws KeeperException, InterruptedException {
 
     ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
-    List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+    List<String> electionNodes = OverseerProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
         ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
 
     // First, queue up the preferred leader at the head of the queue.
@@ -210,12 +210,12 @@ class RebalanceLeaders {
       return; // let's not continue if we didn't get what we expect. Possibly we're offline etc..
     }
 
-    List<String> electionNodesTmp = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+    List<String> electionNodesTmp = OverseerProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
         ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
 
 
     // Now find other nodes that have the same sequence number as this node and re-queue them at the end of the queue.
-    electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+    electionNodes = OverseerProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
         ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
 
     for (String thisNode : electionNodes) {
@@ -238,7 +238,7 @@ class RebalanceLeaders {
     int oldSeq = LeaderElector.getSeq(electionNode);
     for (int idx = 0; idx < 600; ++idx) {
       ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
-      List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+      List<String> electionNodes = OverseerProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
           ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
       for (String testNode : electionNodes) {
         if (LeaderElector.getNodeName(testNode).equals(nodeName) && oldSeq != LeaderElector.getSeq(testNode)) {

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java Thu Aug  6 06:07:32 2015
@@ -50,7 +50,7 @@ public class AsyncMigrateRouteKeyTest ex
     String message;
     params = new ModifiableSolrParams();
     params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
-    params.set(OverseerCollectionProcessor.REQUESTID, asyncId);
+    params.set(OverseerCollectionMessageHandler.REQUESTID, asyncId);
     // This task takes long enough to run. Also check for the current state of the task to be running.
     message = sendStatusRequestWithRetry(params, 5);
     assertEquals("found " + asyncId + " in running tasks", message);

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java Thu Aug  6 06:07:32 2015
@@ -45,7 +45,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.*;
 
-import static org.apache.solr.cloud.OverseerCollectionProcessor.*;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.*;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
 
@@ -398,7 +398,7 @@ public class BaseCdcrDistributedZkTest e
     }
     Integer replicationFactor = (Integer) collectionProps.get(REPLICATION_FACTOR);
     if (replicationFactor == null) {
-      replicationFactor = (Integer) OverseerCollectionProcessor.COLL_PROPS.get(REPLICATION_FACTOR);
+      replicationFactor = (Integer) OverseerCollectionMessageHandler.COLL_PROPS.get(REPLICATION_FACTOR);
     }
 
     if (confSetName != null) {

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java Thu Aug  6 06:07:32 2015
@@ -571,10 +571,10 @@ public class BasicDistributedZkTest exte
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set("action", CollectionAction.CREATE.toString());
 
-    params.set(OverseerCollectionProcessor.NUM_SLICES, numShards);
+    params.set(OverseerCollectionMessageHandler.NUM_SLICES, numShards);
     params.set(ZkStateReader.REPLICATION_FACTOR, numReplicas);
     params.set(ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode);
-    if (createNodeSetStr != null) params.set(OverseerCollectionProcessor.CREATE_NODE_SET, createNodeSetStr);
+    if (createNodeSetStr != null) params.set(OverseerCollectionMessageHandler.CREATE_NODE_SET, createNodeSetStr);
 
     int clientIndex = clients.size() > 1 ? random().nextInt(2) : 0;
     List<Integer> list = new ArrayList<>();

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java Thu Aug  6 06:07:32 2015
@@ -82,7 +82,7 @@ import org.apache.solr.core.SolrInfoMBea
 import org.apache.solr.servlet.SolrDispatchFilter;
 import org.junit.Test;
 
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
 import static org.apache.solr.common.util.Utils.makeMap;
 import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
@@ -442,7 +442,7 @@ public class CollectionsAPIDistributedZk
     String nn1 = ((SolrDispatchFilter) jettys.get(0).getDispatchFilter().getFilter()).getCores().getZkController().getNodeName();
     String nn2 =  ((SolrDispatchFilter) jettys.get(1).getDispatchFilter().getFilter()).getCores().getZkController().getNodeName();
     
-    params.set(OverseerCollectionProcessor.CREATE_NODE_SET, nn1 + "," + nn2);
+    params.set(OverseerCollectionMessageHandler.CREATE_NODE_SET, nn1 + "," + nn2);
     request = new QueryRequest(params);
     request.setPath("/admin/collections");
     gotExp = false;

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CustomCollectionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CustomCollectionTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CustomCollectionTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CustomCollectionTest.java Thu Aug  6 06:07:32 2015
@@ -47,8 +47,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
 import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DeleteLastCustomShardedReplicaTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DeleteLastCustomShardedReplicaTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DeleteLastCustomShardedReplicaTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DeleteLastCustomShardedReplicaTest.java Thu Aug  6 06:07:32 2015
@@ -36,8 +36,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
 import static org.apache.solr.common.util.Utils.makeMap;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
 

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java Thu Aug  6 06:07:32 2015
@@ -40,8 +40,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_IF_DOWN;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.ONLY_IF_DOWN;
 import static org.apache.solr.common.util.Utils.makeMap;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java Thu Aug  6 06:07:32 2015
@@ -39,7 +39,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
 

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java Thu Aug  6 06:07:32 2015
@@ -106,7 +106,7 @@ public class OverseerCollectionProcessor
         DistributedQueue workQueue, DistributedMap runningMap,
         DistributedMap completedMap,
         DistributedMap failureMap) {
-      super(zkStateReader, myId, shardHandlerFactory, adminPath, new Overseer.Stats(), workQueue, runningMap, completedMap, failureMap);
+      super(zkStateReader, myId, shardHandlerFactory, adminPath, new Overseer.Stats(), null, new OverseerNodePrioritizer(zkStateReader, adminPath, shardHandlerFactory), workQueue, runningMap, completedMap, failureMap);
     }
     
     @Override
@@ -407,14 +407,14 @@ public class OverseerCollectionProcessor
         ZkStateReader.REPLICATION_FACTOR, replicationFactor.toString(),
         "name", COLLECTION_NAME,
         "collection.configName", CONFIG_NAME,
-        OverseerCollectionProcessor.NUM_SLICES, numberOfSlices.toString(),
+        OverseerCollectionMessageHandler.NUM_SLICES, numberOfSlices.toString(),
         ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode.toString()
     );
     if (sendCreateNodeList) {
-      propMap.put(OverseerCollectionProcessor.CREATE_NODE_SET,
+      propMap.put(OverseerCollectionMessageHandler.CREATE_NODE_SET,
           (createNodeList != null)?StrUtils.join(createNodeList, ','):null);
-      if (OverseerCollectionProcessor.CREATE_NODE_SET_SHUFFLE_DEFAULT != createNodeSetShuffle || random().nextBoolean()) {
-        propMap.put(OverseerCollectionProcessor.CREATE_NODE_SET_SHUFFLE, createNodeSetShuffle);
+      if (OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE_DEFAULT != createNodeSetShuffle || random().nextBoolean()) {
+        propMap.put(OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE, createNodeSetShuffle);
       }
     }
 
@@ -590,7 +590,7 @@ public class OverseerCollectionProcessor
       }
     }
     
-    if (random().nextBoolean()) Collections.shuffle(createNodeList, OverseerCollectionProcessor.RANDOM);
+    if (random().nextBoolean()) Collections.shuffle(createNodeList, OverseerCollectionMessageHandler.RANDOM);
     
     List<SubmitCapture> submitCaptures = null;
     if (collectionExceptedToBeCreated) {

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java Thu Aug  6 06:07:32 2015
@@ -41,9 +41,9 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.getLeaderNode;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.getSortedOverseerNodeNames;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
 import static org.apache.solr.common.util.Utils.makeMap;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java Thu Aug  6 06:07:32 2015
@@ -50,7 +50,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
 

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java Thu Aug  6 06:07:32 2015
@@ -43,7 +43,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 
-import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARD_UNIQUE;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARD_UNIQUE;
 
 public class TestCollectionAPI extends ReplicaPropertiesBase {
 

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestMiniSolrCloudCluster.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestMiniSolrCloudCluster.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestMiniSolrCloudCluster.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestMiniSolrCloudCluster.java Thu Aug  6 06:07:32 2015
@@ -295,7 +295,7 @@ public class TestMiniSolrCloudCluster ex
 
       // create collection
       final String asyncId = (random().nextBoolean() ? null : "asyncId("+collectionName+".create)="+random().nextInt());
-      createCollection(miniCluster, collectionName, OverseerCollectionProcessor.CREATE_NODE_SET_EMPTY, asyncId);
+      createCollection(miniCluster, collectionName, OverseerCollectionMessageHandler.CREATE_NODE_SET_EMPTY, asyncId);
       if (asyncId != null) {
         assertEquals("did not see async createCollection completion", "completed", AbstractFullDistribZkTestBase.getRequestStateAfterCompletion(asyncId, 330, cloudSolrClient));
       }

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRequestStatusCollectionAPI.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRequestStatusCollectionAPI.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRequestStatusCollectionAPI.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRequestStatusCollectionAPI.java Thu Aug  6 06:07:32 2015
@@ -63,7 +63,7 @@ public class TestRequestStatusCollection
     params = new ModifiableSolrParams();
 
     params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
-    params.set(OverseerCollectionProcessor.REQUESTID, "1000");
+    params.set(OverseerCollectionMessageHandler.REQUESTID, "1000");
 
     try {
       message = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS);
@@ -76,7 +76,7 @@ public class TestRequestStatusCollection
     // Check for a random (hopefully non-existent request id
     params = new ModifiableSolrParams();
     params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.REQUESTSTATUS.toString());
-    params.set(OverseerCollectionProcessor.REQUESTID, "9999999");
+    params.set(OverseerCollectionMessageHandler.REQUESTID, "9999999");
     try {
       r = sendRequest(params);
       status = (NamedList) r.get("status");
@@ -101,7 +101,7 @@ public class TestRequestStatusCollection
     // Check for the request to be completed.
       params = new ModifiableSolrParams();
       params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
-      params.set(OverseerCollectionProcessor.REQUESTID, "1001");
+      params.set(OverseerCollectionMessageHandler.REQUESTID, "1001");
     try {
       message = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS);
     } catch (SolrServerException | IOException e) {
@@ -128,7 +128,7 @@ public class TestRequestStatusCollection
     params = new ModifiableSolrParams();
 
       params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
-      params.set(OverseerCollectionProcessor.REQUESTID, "1002");
+      params.set(OverseerCollectionMessageHandler.REQUESTID, "1002");
 
     try {
       message = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS);

Modified: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java Thu Aug  6 06:07:32 2015
@@ -66,7 +66,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
 
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
 import static org.apache.solr.common.util.Utils.makeMap;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;

Modified: lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java (original)
+++ lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java Thu Aug  6 06:07:32 2015
@@ -81,9 +81,9 @@ import org.noggit.JSONWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
 import static org.apache.solr.common.util.Utils.makeMap;
 
 /**
@@ -1540,7 +1540,7 @@ public abstract class AbstractFullDistri
     }
     Integer replicationFactor = (Integer) collectionProps.get(ZkStateReader.REPLICATION_FACTOR);
     if(replicationFactor==null){
-      replicationFactor = (Integer) OverseerCollectionProcessor.COLL_PROPS.get(ZkStateReader.REPLICATION_FACTOR);
+      replicationFactor = (Integer) OverseerCollectionMessageHandler.COLL_PROPS.get(ZkStateReader.REPLICATION_FACTOR);
     }
 
     if (confSetName != null) {

Modified: lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java (original)
+++ lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java Thu Aug  6 06:07:32 2015
@@ -319,7 +319,7 @@ public class MiniSolrCloudCluster {
     params.set("replicationFactor", replicationFactor);
     params.set("collection.configName", configName);
     if (null != createNodeSet) {
-      params.set(OverseerCollectionProcessor.CREATE_NODE_SET, createNodeSet);
+      params.set(OverseerCollectionMessageHandler.CREATE_NODE_SET, createNodeSet);
     }
     if (null != asyncId) {
       params.set(CommonAdminParams.ASYNC, asyncId);