You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2018/11/29 18:19:24 UTC

[15/16] lucene-solr:master: SOLR-12801: Make massive improvements to the tests.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/Overseer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 74781d7..91b7e74 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.solr.cloud;
 
+import static org.apache.solr.common.params.CommonParams.ID;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
@@ -26,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.codahale.metrics.Timer;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.impl.ClusterStateProvider;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
@@ -39,9 +40,11 @@ import org.apache.solr.cloud.overseer.ReplicaMutator;
 import org.apache.solr.cloud.overseer.SliceMutator;
 import org.apache.solr.cloud.overseer.ZkStateWriter;
 import org.apache.solr.cloud.overseer.ZkWriteCommand;
+import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.SolrCloseable;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ConnectionManager;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -53,7 +56,7 @@ import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.CloudConfig;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.handler.admin.CollectionsHandler;
-import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.HttpShardHandler;
 import org.apache.solr.logging.MDCLoggingContext;
 import org.apache.solr.update.UpdateShardHandler;
 import org.apache.zookeeper.CreateMode;
@@ -61,7 +64,7 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.common.params.CommonParams.ID;
+import com.codahale.metrics.Timer;
 
 /**
  * Cluster leader. Responsible for processing state updates, node assignments, creating/deleting
@@ -107,7 +110,7 @@ public class Overseer implements SolrCloseable {
     public ClusterStateUpdater(final ZkStateReader reader, final String myId, Stats zkStats) {
       this.zkClient = reader.getZkClient();
       this.zkStats = zkStats;
-      this.stateUpdateQueue = getStateUpdateQueue(zkClient, zkStats);
+      this.stateUpdateQueue = getStateUpdateQueue(zkStats);
       this.workQueue = getInternalWorkQueue(zkClient, zkStats);
       this.failureMap = getFailureMap(zkClient);
       this.runningMap = getRunningMap(zkClient);
@@ -188,6 +191,8 @@ public class Overseer implements SolrCloseable {
               // the workQueue is empty now, use stateUpdateQueue as fallback queue
               fallbackQueue = stateUpdateQueue;
               fallbackQueueSize = 0;
+            } catch (AlreadyClosedException e) {
+              return;
             } catch (KeeperException.SessionExpiredException e) {
               log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
               return;
@@ -211,6 +216,8 @@ public class Overseer implements SolrCloseable {
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             return;
+          } catch (AlreadyClosedException e) {
+
           } catch (Exception e) {
             log.error("Exception in Overseer main queue loop", e);
           }
@@ -247,6 +254,8 @@ public class Overseer implements SolrCloseable {
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             return;
+          } catch (AlreadyClosedException e) {
+  
           } catch (Exception e) {
             log.error("Exception in Overseer main queue loop", e);
             refreshClusterState = true; // it might have been a bad version error
@@ -308,8 +317,10 @@ public class Overseer implements SolrCloseable {
       byte[] data;
       try {
         data = zkClient.getData(path, null, stat, true);
+      } catch (AlreadyClosedException e) {
+        return;
       } catch (Exception e) {
-        log.error("could not read the "+path+" data" ,e);
+        log.warn("Error communicating with ZooKeeper", e);
         return;
       }
       try {
@@ -437,6 +448,11 @@ public class Overseer implements SolrCloseable {
       } catch (InterruptedException e) {
         success = false;
         Thread.currentThread().interrupt();
+      } catch (AlreadyClosedException e) {
+        success = false;
+      } catch (Exception e) {
+        success = false;
+        log.warn("Unexpected exception", e);
       } finally {
         timerContext.stop();
         if (success)  {
@@ -495,7 +511,7 @@ public class Overseer implements SolrCloseable {
 
   private final ZkStateReader reader;
 
-  private final ShardHandler shardHandler;
+  private final HttpShardHandler shardHandler;
 
   private final UpdateShardHandler updateShardHandler;
 
@@ -507,11 +523,11 @@ public class Overseer implements SolrCloseable {
 
   private Stats stats;
   private String id;
-  private boolean closed;
+  private volatile boolean closed;
   private CloudConfig config;
 
   // overseer not responsible for closing reader
-  public Overseer(ShardHandler shardHandler,
+  public Overseer(HttpShardHandler shardHandler,
       UpdateShardHandler updateShardHandler, String adminPath,
       final ZkStateReader reader, ZkController zkController, CloudConfig config)
       throws KeeperException, InterruptedException {
@@ -541,7 +557,7 @@ public class Overseer implements SolrCloseable {
 
     ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
 
-    OverseerNodePrioritizer overseerPrioritizer = new OverseerNodePrioritizer(reader, adminPath, shardHandler.getShardHandlerFactory());
+    OverseerNodePrioritizer overseerPrioritizer = new OverseerNodePrioritizer(reader, getStateUpdateQueue(), adminPath, shardHandler.getShardHandlerFactory(), updateShardHandler.getDefaultHttpClient());
     overseerCollectionConfigSetProcessor = new OverseerCollectionConfigSetProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this, overseerPrioritizer);
     ccThread = new OverseerThread(ccTg, overseerCollectionConfigSetProcessor, "OverseerCollectionConfigSetProcessor-" + id);
     ccThread.setDaemon(true);
@@ -554,9 +570,8 @@ public class Overseer implements SolrCloseable {
     updaterThread.start();
     ccThread.start();
     triggerThread.start();
-    if (this.id != null) {
-      assert ObjectReleaseTracker.track(this);
-    }
+ 
+    assert ObjectReleaseTracker.track(this);
   }
 
   public Stats getStats() {
@@ -595,16 +610,13 @@ public class Overseer implements SolrCloseable {
   }
   
   public synchronized void close() {
-    if (closed) return;
     if (this.id != null) {
       log.info("Overseer (id=" + id + ") closing");
     }
-    
-    doClose();
     this.closed = true;
-    if (this.id != null) {
-      assert ObjectReleaseTracker.release(this);
-    }
+    doClose();
+
+    assert ObjectReleaseTracker.release(this);
   }
 
   @Override
@@ -660,11 +672,10 @@ public class Overseer implements SolrCloseable {
    * <p>
    * This method will create the /overseer znode in ZooKeeper if it does not exist already.
    *
-   * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
    * @return a {@link ZkDistributedQueue} object
    */
-  public static ZkDistributedQueue getStateUpdateQueue(final SolrZkClient zkClient) {
-    return getStateUpdateQueue(zkClient, new Stats());
+  ZkDistributedQueue getStateUpdateQueue() {
+    return getStateUpdateQueue(new Stats());
   }
 
   /**
@@ -672,13 +683,15 @@ public class Overseer implements SolrCloseable {
    * This method should not be used directly by anyone other than the Overseer itself.
    * This method will create the /overseer znode in ZooKeeper if it does not exist already.
    *
-   * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
    * @param zkStats  a {@link Stats} object which tracks statistics for all zookeeper operations performed by this queue
    * @return a {@link ZkDistributedQueue} object
    */
-  static ZkDistributedQueue getStateUpdateQueue(final SolrZkClient zkClient, Stats zkStats) {
-    createOverseerNode(zkClient);
-    return new ZkDistributedQueue(zkClient, "/overseer/queue", zkStats, STATE_UPDATE_MAX_QUEUE);
+  ZkDistributedQueue getStateUpdateQueue(Stats zkStats) {
+    return new ZkDistributedQueue(reader.getZkClient(), "/overseer/queue", zkStats, STATE_UPDATE_MAX_QUEUE, new ConnectionManager.IsClosed(){
+      public boolean isClosed() {
+        return Overseer.this.isClosed() || zkController.getCoreContainer().isShutDown();
+      }
+    });
   }
 
   /**
@@ -697,31 +710,26 @@ public class Overseer implements SolrCloseable {
    * @return a {@link ZkDistributedQueue} object
    */
   static ZkDistributedQueue getInternalWorkQueue(final SolrZkClient zkClient, Stats zkStats) {
-    createOverseerNode(zkClient);
     return new ZkDistributedQueue(zkClient, "/overseer/queue-work", zkStats);
   }
 
   /* Internal map for failed tasks, not to be used outside of the Overseer */
   static DistributedMap getRunningMap(final SolrZkClient zkClient) {
-    createOverseerNode(zkClient);
     return new DistributedMap(zkClient, "/overseer/collection-map-running");
   }
 
   /* Size-limited map for successfully completed tasks*/
   static DistributedMap getCompletedMap(final SolrZkClient zkClient) {
-    createOverseerNode(zkClient);
     return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-completed", NUM_RESPONSES_TO_STORE, (child) -> getAsyncIdsMap(zkClient).remove(child));
   }
 
   /* Map for failed tasks, not to be used outside of the Overseer */
   static DistributedMap getFailureMap(final SolrZkClient zkClient) {
-    createOverseerNode(zkClient);
     return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-failure", NUM_RESPONSES_TO_STORE, (child) -> getAsyncIdsMap(zkClient).remove(child));
   }
   
   /* Map of async IDs currently in use*/
   static DistributedMap getAsyncIdsMap(final SolrZkClient zkClient) {
-    createOverseerNode(zkClient);
     return new DistributedMap(zkClient, "/overseer/async_ids");
   }
 
@@ -740,7 +748,7 @@ public class Overseer implements SolrCloseable {
    * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
    * @return a {@link ZkDistributedQueue} object
    */
-  static OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient) {
+  OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient) {
     return getCollectionQueue(zkClient, new Stats());
   }
 
@@ -758,8 +766,7 @@ public class Overseer implements SolrCloseable {
    * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
    * @return a {@link ZkDistributedQueue} object
    */
-  static OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats) {
-    createOverseerNode(zkClient);
+  OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats) {
     return new OverseerTaskQueue(zkClient, "/overseer/collection-queue-work", zkStats);
   }
 
@@ -778,7 +785,7 @@ public class Overseer implements SolrCloseable {
    * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
    * @return a {@link ZkDistributedQueue} object
    */
-  static OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient)  {
+  OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient)  {
     return getConfigSetQueue(zkClient, new Stats());
   }
 
@@ -801,15 +808,14 @@ public class Overseer implements SolrCloseable {
    * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
    * @return a {@link ZkDistributedQueue} object
    */
-  static OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient, Stats zkStats) {
+  OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient, Stats zkStats) {
     // For now, we use the same queue as the collection queue, but ensure
     // that the actions are prefixed with a unique string.
-    createOverseerNode(zkClient);
     return getCollectionQueue(zkClient, zkStats);
   }
   
 
-  private static void createOverseerNode(final SolrZkClient zkClient) {
+  private void createOverseerNode(final SolrZkClient zkClient) {
     try {
       zkClient.create("/overseer", new byte[0], CreateMode.PERSISTENT, true);
     } catch (KeeperException.NodeExistsException e) {
@@ -823,6 +829,7 @@ public class Overseer implements SolrCloseable {
       throw new RuntimeException(e);
     }
   }
+  
   public static boolean isLegacy(ZkStateReader stateReader) {
     String legacyProperty = stateReader.getClusterProperty(ZkStateReader.LEGACY_CLOUD, "false");
     return "true".equals(legacyProperty);
@@ -837,4 +844,11 @@ public class Overseer implements SolrCloseable {
     return reader;
   }
 
+  public void offerStateUpdate(byte[] data) throws KeeperException, InterruptedException {
+    if (zkController.getZkClient().isClosed()) {
+      throw new AlreadyClosedException();
+    }
+    getStateUpdateQueue().offer(data);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
index e8d85ce..78ddc82 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
@@ -16,16 +16,16 @@
  */
 package org.apache.solr.cloud;
 
+import static org.apache.solr.cloud.OverseerConfigSetMessageHandler.CONFIGSETS_ACTION_PREFIX;
+
 import java.io.IOException;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.handler.component.ShardHandler;
-import org.apache.solr.handler.component.ShardHandlerFactory;
-
-import static org.apache.solr.cloud.OverseerConfigSetMessageHandler.CONFIGSETS_ACTION_PREFIX;
+import org.apache.solr.handler.component.HttpShardHandler;
+import org.apache.solr.handler.component.HttpShardHandlerFactory;
 
 /**
  * An {@link OverseerTaskProcessor} that handles:
@@ -35,18 +35,18 @@ import static org.apache.solr.cloud.OverseerConfigSetMessageHandler.CONFIGSETS_A
 public class OverseerCollectionConfigSetProcessor extends OverseerTaskProcessor {
 
    public OverseerCollectionConfigSetProcessor(ZkStateReader zkStateReader, String myId,
-                                               final ShardHandler shardHandler,
+                                               final HttpShardHandler shardHandler,
                                                String adminPath, Stats stats, Overseer overseer,
                                                OverseerNodePrioritizer overseerNodePrioritizer) {
     this(
         zkStateReader,
         myId,
-        shardHandler.getShardHandlerFactory(),
+        (HttpShardHandlerFactory) shardHandler.getShardHandlerFactory(),
         adminPath,
         stats,
         overseer,
         overseerNodePrioritizer,
-        Overseer.getCollectionQueue(zkStateReader.getZkClient(), stats),
+        overseer.getCollectionQueue(zkStateReader.getZkClient(), stats),
         Overseer.getRunningMap(zkStateReader.getZkClient()),
         Overseer.getCompletedMap(zkStateReader.getZkClient()),
         Overseer.getFailureMap(zkStateReader.getZkClient())
@@ -54,7 +54,7 @@ public class OverseerCollectionConfigSetProcessor extends OverseerTaskProcessor
   }
 
   protected OverseerCollectionConfigSetProcessor(ZkStateReader zkStateReader, String myId,
-                                        final ShardHandlerFactory shardHandlerFactory,
+                                        final HttpShardHandlerFactory shardHandlerFactory,
                                         String adminPath,
                                         Stats stats,
                                         Overseer overseer,
@@ -79,7 +79,7 @@ public class OverseerCollectionConfigSetProcessor extends OverseerTaskProcessor
   private static OverseerMessageHandlerSelector getOverseerMessageHandlerSelector(
       ZkStateReader zkStateReader,
       String myId,
-      final ShardHandlerFactory shardHandlerFactory,
+      final HttpShardHandlerFactory shardHandlerFactory,
       String adminPath,
       Stats stats,
       Overseer overseer,

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java b/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java
index 34ee041..6851141 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java
@@ -20,6 +20,7 @@ import java.lang.invoke.MethodHandles;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.http.client.HttpClient;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -28,6 +29,7 @@ import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.HttpShardHandlerFactory;
 import org.apache.solr.handler.component.ShardHandler;
 import org.apache.solr.handler.component.ShardHandlerFactory;
 import org.apache.solr.handler.component.ShardRequest;
@@ -49,10 +51,16 @@ public class OverseerNodePrioritizer {
   private final String adminPath;
   private final ShardHandlerFactory shardHandlerFactory;
 
-  public OverseerNodePrioritizer(ZkStateReader zkStateReader, String adminPath, ShardHandlerFactory shardHandlerFactory) {
+  private ZkDistributedQueue stateUpdateQueue;
+
+  private HttpClient httpClient;
+
+  public OverseerNodePrioritizer(ZkStateReader zkStateReader, ZkDistributedQueue stateUpdateQueue, String adminPath, ShardHandlerFactory shardHandlerFactory, HttpClient httpClient) {
     this.zkStateReader = zkStateReader;
     this.adminPath = adminPath;
     this.shardHandlerFactory = shardHandlerFactory;
+    this.stateUpdateQueue = stateUpdateQueue;
+    this.httpClient = httpClient;
   }
 
   public synchronized void prioritizeOverseerNodes(String overseerId) throws Exception {
@@ -88,7 +96,7 @@ public class OverseerNodePrioritizer {
       invokeOverseerOp(electionNodes.get(1), "rejoin");//ask second inline to go behind
     }
     //now ask the current leader to QUIT , so that the designate can takeover
-    Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(
+    stateUpdateQueue.offer(
         Utils.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower(),
             ID, OverseerTaskProcessor.getLeaderId(zkStateReader.getZkClient()))));
 
@@ -96,7 +104,7 @@ public class OverseerNodePrioritizer {
 
   private void invokeOverseerOp(String electionNode, String op) {
     ModifiableSolrParams params = new ModifiableSolrParams();
-    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+    ShardHandler shardHandler = ((HttpShardHandlerFactory)shardHandlerFactory).getShardHandler(httpClient);
     params.set(CoreAdminParams.ACTION, CoreAdminAction.OVERSEEROP.toString());
     params.set("op", op);
     params.set("qt", adminPath);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index febeec0..3b53a54 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -19,6 +19,7 @@ package org.apache.solr.cloud;
 import java.io.Closeable;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -36,6 +37,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.cloud.Overseer.LeaderStatus;
 import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
+import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -86,13 +88,13 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
   // List of completed tasks. This is used to clean up workQueue in zk.
   final private HashMap<String, QueueEvent> completedTasks;
 
-  private String myId;
+  private volatile String myId;
 
-  private ZkStateReader zkStateReader;
+  private volatile ZkStateReader zkStateReader;
 
   private boolean isClosed;
 
-  private Stats stats;
+  private volatile Stats stats;
 
   // Set of tasks that have been picked up for processing but not cleaned up from zk work-queue.
   // It may contain tasks that have completed execution, have been entered into the completed/failed map in zk but not
@@ -102,7 +104,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
   // be executed because they are blocked or the execution queue is full
   // This is an optimization to ensure that we do not read the same tasks
   // again and again from ZK.
-  final private Map<String, QueueEvent> blockedTasks = new LinkedHashMap<>();
+  final private Map<String, QueueEvent> blockedTasks = Collections.synchronizedMap(new LinkedHashMap<>());
   final private Predicate<String> excludedTasks = new Predicate<String>() {
     @Override
     public boolean test(String s) {
@@ -170,6 +172,8 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
       // We don't need to handle this. This is just a fail-safe which comes in handy in skipping already processed
       // async calls.
       SolrException.log(log, "", e);
+    } catch (AlreadyClosedException e) {
+      return;
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
     }
@@ -181,6 +185,8 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
 
     try {
       prioritizer.prioritizeOverseerNodes(myId);
+    } catch (AlreadyClosedException e) {
+        return;
     } catch (Exception e) {
       if (!zkStateReader.getZkClient().isClosed()) {
         log.error("Unable to prioritize overseer ", e);
@@ -203,14 +209,14 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
             continue; // not a no, not a yes, try asking again
           }
 
-          log.debug("Cleaning up work-queue. #Running tasks: {}", runningTasks.size());
+          log.debug("Cleaning up work-queue. #Running tasks: {} #Completed tasks: {}",  runningTasksSize(), completedTasks.size());
           cleanUpWorkQueue();
 
           printTrackingMaps();
 
           boolean waited = false;
 
-          while (runningTasks.size() > MAX_PARALLEL_TASKS) {
+          while (runningTasksSize() > MAX_PARALLEL_TASKS) {
             synchronized (waitLock) {
               waitLock.wait(100);//wait for 100 ms or till a task is complete
             }
@@ -229,7 +235,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
           // to clear out at least a few items in the queue before we read more items
           if (heads.size() < MAX_BLOCKED_TASKS) {
             //instead of reading MAX_PARALLEL_TASKS items always, we should only fetch as much as we can execute
-            int toFetch = Math.min(MAX_BLOCKED_TASKS - heads.size(), MAX_PARALLEL_TASKS - runningTasks.size());
+            int toFetch = Math.min(MAX_BLOCKED_TASKS - heads.size(), MAX_PARALLEL_TASKS - runningTasksSize());
             List<QueueEvent> newTasks = workQueue.peekTopN(toFetch, excludedTasks, 2000L);
             log.debug("Got {} tasks from work-queue : [{}]", newTasks.size(), newTasks);
             heads.addAll(newTasks);
@@ -251,7 +257,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
           for (QueueEvent head : heads) {
             if (!tooManyTasks) {
               synchronized (runningTasks) {
-                tooManyTasks = runningTasks.size() >= MAX_PARALLEL_TASKS;
+                tooManyTasks = runningTasksSize() >= MAX_PARALLEL_TASKS;
               }
             }
             if (tooManyTasks) {
@@ -260,7 +266,9 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
                 blockedTasks.put(head.getId(), head);
               continue;
             }
-            if (runningZKTasks.contains(head.getId())) continue;
+            synchronized (runningZKTasks) {
+              if (runningZKTasks.contains(head.getId())) continue;
+            }
             final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
             final String asyncId = message.getStr(ASYNC);
             if (hasLeftOverItems) {
@@ -316,6 +324,8 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           return;
+        } catch (AlreadyClosedException e) {
+
         } catch (Exception e) {
           SolrException.log(log, "", e);
         }
@@ -325,11 +335,19 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
     }
   }
 
+  private int runningTasksSize() {
+    synchronized (runningTasks) {
+      return runningTasks.size();
+    }
+  }
+
   private void cleanUpWorkQueue() throws KeeperException, InterruptedException {
     synchronized (completedTasks) {
       for (String id : completedTasks.keySet()) {
         workQueue.remove(completedTasks.get(id));
-        runningZKTasks.remove(id);
+        synchronized (runningTasks) {
+          runningZKTasks.remove(id);
+        }
       }
       completedTasks.clear();
     }
@@ -502,6 +520,8 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
         log.debug(messageHandler.getName() + ": Message id:" + head.getId() +
             " complete, response:" + response.getResponse().toString());
         success = true;
+      } catch (AlreadyClosedException e) {
+
       } catch (KeeperException e) {
         SolrException.log(log, "", e);
       } catch (InterruptedException e) {
@@ -513,7 +533,11 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
         lock.unlock();
         if (!success) {
           // Reset task from tracking data structures so that it can be retried.
-          resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey, message);
+          try {
+            resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey, message);
+          } catch(AlreadyClosedException e) {
+            
+          }
         }
         synchronized (waitLock){
           waitLock.notifyAll();
@@ -587,7 +611,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
         log.debug("CompletedTasks: {}", completedTasks.keySet().toString());
       }
       synchronized (runningZKTasks) {
-        log.debug("RunningZKTasks: {}", runningZKTasks.toString());
+        log.info("RunningZKTasks: {}", runningZKTasks.toString());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 67c15e8..9133266 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -63,7 +63,6 @@ import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.update.PeerSyncWithLeader;
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.update.UpdateLog.RecoveryInfo;
-import org.apache.solr.update.processor.DistributedUpdateProcessor;
 import org.apache.solr.util.RefCounted;
 import org.apache.solr.util.SolrPluginUtils;
 import org.apache.solr.util.plugin.NamedListInitializedPlugin;
@@ -71,18 +70,21 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This class may change in future and customisations are not supported
- * between versions in terms of API or back compat behaviour.
+ * This class may change in future and customisations are not supported between versions in terms of API or back compat
+ * behaviour.
+ * 
  * @lucene.experimental
  */
 public class RecoveryStrategy implements Runnable, Closeable {
 
   public static class Builder implements NamedListInitializedPlugin {
     private NamedList args;
+
     @Override
     public void init(NamedList args) {
       this.args = args;
     }
+
     // this should only be used from SolrCoreState
     public RecoveryStrategy create(CoreContainer cc, CoreDescriptor cd,
         RecoveryStrategy.RecoveryListener recoveryListener) {
@@ -90,6 +92,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
       SolrPluginUtils.invokeSetters(recoveryStrategy, args);
       return recoveryStrategy;
     }
+
     protected RecoveryStrategy newRecoveryStrategy(CoreContainer cc, CoreDescriptor cd,
         RecoveryStrategy.RecoveryListener recoveryListener) {
       return new RecoveryStrategy(cc, cd, recoveryListener);
@@ -98,15 +101,17 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private int waitForUpdatesWithStaleStatePauseMilliSeconds = Integer.getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 2500);
+  private int waitForUpdatesWithStaleStatePauseMilliSeconds = Integer
+      .getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 2500);
   private int maxRetries = 500;
-  private int startingRecoveryDelayMilliSeconds = 5000;
+  private int startingRecoveryDelayMilliSeconds = 2000;
 
   public static interface RecoveryListener {
     public void recovered();
+
     public void failed();
   }
-  
+
   private volatile boolean close = false;
 
   private RecoveryListener recoveryListener;
@@ -121,6 +126,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
   private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
   private final Replica.Type replicaType;
 
+  private CoreDescriptor coreDescriptor;
+
   protected RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
     this.cc = cc;
     this.coreName = cd.getName();
@@ -136,7 +143,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
     return waitForUpdatesWithStaleStatePauseMilliSeconds;
   }
 
-  final public void setWaitForUpdatesWithStaleStatePauseMilliSeconds(int waitForUpdatesWithStaleStatePauseMilliSeconds) {
+  final public void setWaitForUpdatesWithStaleStatePauseMilliSeconds(
+      int waitForUpdatesWithStaleStatePauseMilliSeconds) {
     this.waitForUpdatesWithStaleStatePauseMilliSeconds = waitForUpdatesWithStaleStatePauseMilliSeconds;
   }
 
@@ -185,10 +193,11 @@ public class RecoveryStrategy implements Runnable, Closeable {
       recoveryListener.failed();
     }
   }
-  
+
   /**
-   * This method may change in future and customisations are not supported
-   * between versions in terms of API or back compat behaviour.
+   * This method may change in future and customisations are not supported between versions in terms of API or back
+   * compat behaviour.
+   * 
    * @lucene.experimental
    */
   protected String getReplicateLeaderUrl(ZkNodeProps leaderprops) {
@@ -199,37 +208,38 @@ public class RecoveryStrategy implements Runnable, Closeable {
       throws SolrServerException, IOException {
 
     final String leaderUrl = getReplicateLeaderUrl(leaderprops);
-    
+
     log.info("Attempting to replicate from [{}].", leaderUrl);
-    
+
     // send commit
     commitOnLeader(leaderUrl);
-    
+
     // use rep handler directly, so we can do this sync rather than async
     SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
     ReplicationHandler replicationHandler = (ReplicationHandler) handler;
-    
+
     if (replicationHandler == null) {
       throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
           "Skipping recovery, no " + ReplicationHandler.PATH + " handler found");
     }
-    
+
     ModifiableSolrParams solrParams = new ModifiableSolrParams();
     solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl);
     solrParams.set(ReplicationHandler.SKIP_COMMIT_ON_MASTER_VERSION_ZERO, replicaType == Replica.Type.TLOG);
     // always download the tlogs from the leader when running with cdcr enabled. We need to have all the tlogs
     // to ensure leader failover doesn't cause missing docs on the target
-    if (core.getUpdateHandler().getUpdateLog() != null && core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog) {
+    if (core.getUpdateHandler().getUpdateLog() != null
+        && core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog) {
       solrParams.set(ReplicationHandler.TLOG_FILES, true);
     }
-    
+
     if (isClosed()) return; // we check closed on return
     boolean success = replicationHandler.doFetch(solrParams, false).getSuccessful();
-    
+
     if (!success) {
       throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed.");
     }
-    
+
     // solrcloud_debug
     if (log.isDebugEnabled()) {
       try {
@@ -245,7 +255,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
               + " from "
               + leaderUrl
               + " gen:"
-              + (core.getDeletionPolicy().getLatestCommit() != null ? "null" : core.getDeletionPolicy().getLatestCommit().getGeneration())
+              + (core.getDeletionPolicy().getLatestCommit() != null ? "null"
+                  : core.getDeletionPolicy().getLatestCommit().getGeneration())
               + " data:" + core.getDataDir()
               + " index:" + core.getIndexDir()
               + " newIndex:" + core.getNewIndexDir()
@@ -265,11 +276,13 @@ public class RecoveryStrategy implements Runnable, Closeable {
       IOException {
     try (HttpSolrClient client = new HttpSolrClient.Builder(leaderUrl)
         .withConnectionTimeout(30000)
+        .withHttpClient(cc.getUpdateShardHandler().getRecoveryOnlyHttpClient())
         .build()) {
       UpdateRequest ureq = new UpdateRequest();
       ureq.setParams(new ModifiableSolrParams());
-      ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
-//      ureq.getParams().set(UpdateParams.OPEN_SEARCHER, onlyLeaderIndexes);// Why do we need to open searcher if "onlyLeaderIndexes"?
+      // ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
+      // ureq.getParams().set(UpdateParams.OPEN_SEARCHER, onlyLeaderIndexes);// Why do we need to open searcher if
+      // "onlyLeaderIndexes"?
       ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
       ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
           client);
@@ -304,9 +317,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
       MDCLoggingContext.clear();
     }
   }
-  
+
   final public void doRecovery(SolrCore core) throws Exception {
-    if (core.getCoreDescriptor().getCloudDescriptor().requiresTransactionLog()) {
+    // we can lose our core descriptor, so store it now
+    this.coreDescriptor = core.getCoreDescriptor();
+
+    if (this.coreDescriptor.getCloudDescriptor().requiresTransactionLog()) {
       doSyncOrReplicateRecovery(core);
     } else {
       doReplicateOnlyRecovery(core);
@@ -316,14 +332,17 @@ public class RecoveryStrategy implements Runnable, Closeable {
   final private void doReplicateOnlyRecovery(SolrCore core) throws InterruptedException {
     boolean successfulRecovery = false;
 
-//  if (core.getUpdateHandler().getUpdateLog() != null) {
-//    SolrException.log(log, "'replicate-only' recovery strategy should only be used if no update logs are present, but this core has one: "
-//        + core.getUpdateHandler().getUpdateLog());
-//    return;
-//  }
-    while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
+    // if (core.getUpdateHandler().getUpdateLog() != null) {
+    // SolrException.log(log, "'replicate-only' recovery strategy should only be used if no update logs are present, but
+    // this core has one: "
+    // + core.getUpdateHandler().getUpdateLog());
+    // return;
+    // }
+    while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or
+                                                                                            // it will close channels
+                                                                                            // though
       try {
-        CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
+        CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
         ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
             cloudDesc.getCollectionName(), cloudDesc.getShardId());
         final String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
@@ -333,7 +352,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
         String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
 
-        boolean isLeader = leaderUrl.equals(ourUrl); //TODO: We can probably delete most of this code if we say this strategy can only be used for pull replicas
+        boolean isLeader = leaderUrl.equals(ourUrl); // TODO: We can probably delete most of this code if we say this
+                                                     // strategy can only be used for pull replicas
         if (isLeader && !cloudDesc.isLeader()) {
           throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
         }
@@ -342,14 +362,13 @@ public class RecoveryStrategy implements Runnable, Closeable {
           // we are now the leader - no one else must have been suitable
           log.warn("We have not yet recovered - but we are now the leader!");
           log.info("Finished recovery process.");
-          zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
+          zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
           return;
         }
 
-
         log.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leaderUrl,
             ourUrl);
-        zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
+        zkController.publish(this.coreDescriptor, Replica.State.RECOVERING);
 
         if (isClosed()) {
           log.info("Recovery for core {} has been closed", core.getName());
@@ -381,7 +400,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
           zkController.startReplicationFromLeader(coreName, false);
           log.info("Registering as Active after recovery.");
           try {
-            zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
+            zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
           } catch (Exception e) {
             log.error("Could not publish as ACTIVE after succesful recovery", e);
             successfulRecovery = false;
@@ -411,7 +430,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
           if (retries >= maxRetries) {
             SolrException.log(log, "Recovery failed - max retries exceeded (" + retries + ").");
             try {
-              recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor());
+              recoveryFailed(core, zkController, baseUrl, coreZkNodeName, this.coreDescriptor);
             } catch (Exception e) {
               SolrException.log(log, "Could not publish that recovery failed", e);
             }
@@ -457,7 +476,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
     if (ulog == null) {
       SolrException.log(log, "No UpdateLog found - cannot recover.");
       recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
-          core.getCoreDescriptor());
+          this.coreDescriptor);
       return;
     }
 
@@ -478,20 +497,22 @@ public class RecoveryStrategy implements Runnable, Closeable {
       try {
         int oldIdx = 0; // index of the start of the old list in the current list
         long firstStartingVersion = startingVersions.size() > 0 ? startingVersions.get(0) : 0;
-        
+
         for (; oldIdx < recentVersions.size(); oldIdx++) {
           if (recentVersions.get(oldIdx) == firstStartingVersion) break;
         }
-        
+
         if (oldIdx > 0) {
           log.info("Found new versions added after startup: num=[{}]", oldIdx);
-          log.info("currentVersions size={} range=[{} to {}]", recentVersions.size(), recentVersions.get(0), recentVersions.get(recentVersions.size()-1));
+          log.info("currentVersions size={} range=[{} to {}]", recentVersions.size(), recentVersions.get(0),
+              recentVersions.get(recentVersions.size() - 1));
         }
 
         if (startingVersions.isEmpty()) {
           log.info("startupVersions is empty");
         } else {
-          log.info("startupVersions size={} range=[{} to {}]", startingVersions.size(), startingVersions.get(0), startingVersions.get(startingVersions.size()-1));
+          log.info("startupVersions size={} range=[{} to {}]", startingVersions.size(), startingVersions.get(0),
+              startingVersions.get(startingVersions.size() - 1));
         }
       } catch (Exception e) {
         SolrException.log(log, "Error getting recent versions.", e);
@@ -501,7 +522,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
     if (recoveringAfterStartup) {
       // if we're recovering after startup (i.e. we have been down), then we need to know what the last versions were
-      // when we went down.  We may have received updates since then.
+      // when we went down. We may have received updates since then.
       recentVersions = startingVersions;
       try {
         if (ulog.existOldBufferLog()) {
@@ -523,10 +544,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
     final String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
     Future<RecoveryInfo> replayFuture = null;
-    while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
+    while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or
+                                                                                            // it will close channels
+                                                                                            // though
       try {
-        CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
-        final Replica leader = pingLeader(ourUrl, core.getCoreDescriptor(), true);
+        CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
+        final Replica leader = pingLeader(ourUrl, this.coreDescriptor, true);
         if (isClosed()) {
           log.info("RecoveryStrategy has been closed");
           break;
@@ -540,7 +563,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
           // we are now the leader - no one else must have been suitable
           log.warn("We have not yet recovered - but we are now the leader!");
           log.info("Finished recovery process.");
-          zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
+          zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
           return;
         }
 
@@ -548,37 +571,37 @@ public class RecoveryStrategy implements Runnable, Closeable {
         // recalling buffer updates will drop the old buffer tlog
         ulog.bufferUpdates();
 
-        log.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leader.getCoreUrl(),
+        log.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(),
+            leader.getCoreUrl(),
             ourUrl);
-        zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
-        
-        
+        zkController.publish(this.coreDescriptor, Replica.State.RECOVERING);
+
         final Slice slice = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName())
             .getSlice(cloudDesc.getShardId());
-            
+
         try {
           prevSendPreRecoveryHttpUriRequest.abort();
         } catch (NullPointerException e) {
           // okay
         }
-        
+
         if (isClosed()) {
           log.info("RecoveryStrategy has been closed");
           break;
         }
 
         sendPrepRecoveryCmd(leader.getBaseUrl(), leader.getCoreName(), slice);
-        
+
         if (isClosed()) {
           log.info("RecoveryStrategy has been closed");
           break;
         }
-        
+
         // we wait a bit so that any updates on the leader
-        // that started before they saw recovering state 
+        // that started before they saw recovering state
         // are sure to have finished (see SOLR-7141 for
         // discussion around current value)
-        //TODO since SOLR-11216, we probably won't need this
+        // TODO since SOLR-11216, we probably won't need this
         try {
           Thread.sleep(waitForUpdatesWithStaleStatePauseMilliSeconds);
         } catch (InterruptedException e) {
@@ -588,7 +611,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
         // first thing we just try to sync
         if (firstTime) {
           firstTime = false; // only try sync the first time through the loop
-          log.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leader.getCoreUrl(), recoveringAfterStartup);
+          log.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leader.getCoreUrl(),
+              recoveringAfterStartup);
           // System.out.println("Attempting to PeerSync from " + leaderUrl
           // + " i am:" + zkController.getNodeName());
           PeerSyncWithLeader peerSyncWithLeader = new PeerSyncWithLeader(core,
@@ -604,7 +628,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
             // solrcloud_debug
             cloudDebugLog(core, "synced");
-            
+
             log.info("Replaying updates buffered during PeerSync.");
             replayFuture = replay(core);
 
@@ -620,7 +644,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
           log.info("RecoveryStrategy has been closed");
           break;
         }
-        
+
         log.info("Starting Replication Recovery.");
 
         try {
@@ -658,12 +682,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
             if (replicaType == Replica.Type.TLOG) {
               zkController.startReplicationFromLeader(coreName, true);
             }
-            zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
+            zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
           } catch (Exception e) {
             log.error("Could not publish as ACTIVE after succesful recovery", e);
             successfulRecovery = false;
           }
-          
+
           if (successfulRecovery) {
             close = true;
             recoveryListener.recovered();
@@ -681,14 +705,14 @@ public class RecoveryStrategy implements Runnable, Closeable {
             log.info("RecoveryStrategy has been closed");
             break;
           }
-          
+
           log.error("Recovery failed - trying again... (" + retries + ")");
-          
+
           retries++;
           if (retries >= maxRetries) {
             SolrException.log(log, "Recovery failed - max retries exceeded (" + retries + ").");
             try {
-              recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor());
+              recoveryFailed(core, zkController, baseUrl, coreZkNodeName, this.coreDescriptor);
             } catch (Exception e) {
               SolrException.log(log, "Could not publish that recovery failed", e);
             }
@@ -699,12 +723,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
         }
 
         try {
-          // Wait an exponential interval between retries, start at 5 seconds and work up to a minute.
-          // If we're at attempt >= 4, there's no point computing pow(2, retries) because the result 
-          // will always be the minimum of the two (12). Since we sleep at 5 seconds sub-intervals in
-          // order to check if we were closed, 12 is chosen as the maximum loopCount (5s * 12 = 1m).
-          double loopCount = retries < 4 ? Math.min(Math.pow(2, retries), 12) : 12;
-          log.info("Wait [{}] seconds before trying to recover again (attempt={})", loopCount, retries);
+          // Wait an exponential interval between retries, start at 2 seconds and work up to a minute.
+          // Since we sleep at 2 seconds sub-intervals in
+          // order to check if we were closed, 30 is chosen as the maximum loopCount (2s * 30 = 1m).
+          double loopCount = Math.min(Math.pow(2, retries - 1), 30);
+          log.info("Wait [{}] seconds before trying to recover again (attempt={})",
+              loopCount * startingRecoveryDelayMilliSeconds, retries);
           for (int i = 0; i < loopCount; i++) {
             if (isClosed()) {
               log.info("RecoveryStrategy has been closed");
@@ -731,13 +755,15 @@ public class RecoveryStrategy implements Runnable, Closeable {
     log.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
   }
 
-  private final Replica pingLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown) throws Exception {
+  private final Replica pingLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown)
+      throws Exception {
     int numTried = 0;
     while (true) {
       CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
       DocCollection docCollection = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName());
       if (!isClosed() && mayPutReplicaAsDown && numTried == 1 &&
-          docCollection.getReplica(coreDesc.getCloudDescriptor().getCoreNodeName()).getState() == Replica.State.ACTIVE) {
+          docCollection.getReplica(coreDesc.getCloudDescriptor().getCoreNodeName())
+              .getState() == Replica.State.ACTIVE) {
         // this operation may take a long time, by putting replica into DOWN state, client won't query this replica
         zkController.publish(coreDesc, Replica.State.DOWN);
       }
@@ -763,6 +789,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
       try (HttpSolrClient httpSolrClient = new HttpSolrClient.Builder(leaderReplica.getCoreUrl())
           .withSocketTimeout(1000)
           .withConnectionTimeout(1000)
+          .withHttpClient(cc.getUpdateShardHandler().getRecoveryOnlyHttpClient())
           .build()) {
         SolrPingResponse resp = httpSolrClient.ping();
         return leaderReplica;
@@ -811,13 +838,13 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
     // the index may ahead of the tlog's caches after recovery, by calling this tlog's caches will be purged
     core.getUpdateHandler().getUpdateLog().openRealtimeSearcher();
-    
+
     // solrcloud_debug
     cloudDebugLog(core, "replayed");
-    
+
     return future;
   }
-  
+
   final private void cloudDebugLog(SolrCore core, String op) {
     if (!log.isDebugEnabled()) {
       return;
@@ -838,9 +865,9 @@ public class RecoveryStrategy implements Runnable, Closeable {
   }
 
   final public boolean isClosed() {
-    return close;
+    return close || cc.isShutDown();
   }
-  
+
   final private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice)
       throws SolrServerException, IOException, InterruptedException, ExecutionException {
 
@@ -858,8 +885,9 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
     int conflictWaitMs = zkController.getLeaderConflictResolveWait();
     // timeout after 5 seconds more than the max timeout (conflictWait + 3 seconds) on the server side
-    int readTimeout = conflictWaitMs + 8000;
-    try (HttpSolrClient client = new HttpSolrClient.Builder(leaderBaseUrl).build()) {
+    int readTimeout = conflictWaitMs + Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "8000"));
+    try (HttpSolrClient client = new HttpSolrClient.Builder(leaderBaseUrl)
+        .withHttpClient(cc.getUpdateShardHandler().getRecoveryOnlyHttpClient()).build()) {
       client.setConnectionTimeout(10000);
       client.setSoTimeout(readTimeout);
       HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
index f881b5d..957b321 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
@@ -39,11 +39,11 @@ import org.slf4j.LoggerFactory;
 public class ReplicateFromLeader {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private CoreContainer cc;
-  private String coreName;
+  private final CoreContainer cc;
+  private final String coreName;
 
-  private ReplicationHandler replicationProcess;
-  private long lastVersion = 0;
+  private volatile ReplicationHandler replicationProcess;
+  private volatile long lastVersion = 0;
 
   public ReplicateFromLeader(CoreContainer cc, String coreName) {
     this.cc = cc;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
index 3d9a964..2391414 100644
--- a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
@@ -35,6 +35,7 @@ import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.component.HttpShardHandlerFactory;
 import org.apache.solr.handler.component.ShardHandler;
 import org.apache.solr.handler.component.ShardRequest;
 import org.apache.solr.handler.component.ShardResponse;
@@ -70,7 +71,7 @@ public class SyncStrategy {
   public SyncStrategy(CoreContainer cc) {
     UpdateShardHandler updateShardHandler = cc.getUpdateShardHandler();
     client = updateShardHandler.getDefaultHttpClient();
-    shardHandler = cc.getShardHandlerFactory().getShardHandler();
+    shardHandler = ((HttpShardHandlerFactory)cc.getShardHandlerFactory()).getShardHandler(cc.getUpdateShardHandler().getDefaultHttpClient());
     updateExecutor = updateShardHandler.getUpdateExecutor();
   }
   
@@ -113,17 +114,18 @@ public class SyncStrategy {
   
   private PeerSync.PeerSyncResult syncReplicas(ZkController zkController, SolrCore core,
       ZkNodeProps leaderProps, boolean peerSyncOnlyWithActive) {
+    if (isClosed) {
+      log.info("We have been closed, won't sync with replicas");
+      return PeerSync.PeerSyncResult.failure();
+    }
     boolean success = false;
     PeerSync.PeerSyncResult result = null;
+    assert core != null;
+    assert core.getCoreDescriptor() != null;
     CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
     String collection = cloudDesc.getCollectionName();
     String shardId = cloudDesc.getShardId();
 
-    if (isClosed) {
-      log.info("We have been closed, won't sync with replicas");
-      return PeerSync.PeerSyncResult.failure();
-    }
-    
     // first sync ourselves - we are the potential leader after all
     try {
       result = syncWithReplicas(zkController, core, leaderProps, collection,
@@ -160,6 +162,11 @@ public class SyncStrategy {
     List<ZkCoreNodeProps> nodes = zkController.getZkStateReader()
         .getReplicaProps(collection, shardId,core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
     
+    if (isClosed) {
+      log.info("We have been closed, won't sync with replicas");
+      return PeerSync.PeerSyncResult.failure();
+    }
+    
     if (nodes == null) {
       // I have no replicas
       return PeerSync.PeerSyncResult.success();
@@ -184,6 +191,11 @@ public class SyncStrategy {
                         String shardId, ZkNodeProps leaderProps, CoreDescriptor cd,
                         int nUpdates) {
     
+    if (isClosed) {
+      log.info("We have been closed, won't sync replicas to me.");
+      return;
+    }
+    
     // sync everyone else
     // TODO: we should do this in parallel at least
     List<ZkCoreNodeProps> nodes = zkController
@@ -289,6 +301,11 @@ public class SyncStrategy {
       }
       @Override
       public void run() {
+        
+        if (isClosed) {
+          log.info("We have been closed, won't request recovery");
+          return;
+        }
         RequestRecovery recoverRequestCmd = new RequestRecovery();
         recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
         recoverRequestCmd.setCoreName(coreName);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 5caad81..32a030c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.cloud;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
@@ -46,6 +47,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -62,11 +64,13 @@ import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
 import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.cloud.overseer.SliceMutator;
+import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.BeforeReconnect;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.CollectionStateWatcher;
+import org.apache.solr.common.cloud.ConnectionManager;
 import org.apache.solr.common.cloud.DefaultConnectionStrategy;
 import org.apache.solr.common.cloud.DefaultZkACLProvider;
 import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
@@ -90,6 +94,7 @@ import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.StrUtils;
@@ -102,6 +107,7 @@ import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.core.SolrCoreInitializationException;
 import org.apache.solr.handler.admin.ConfigSetsHandlerApi;
+import org.apache.solr.handler.component.HttpShardHandler;
 import org.apache.solr.logging.MDCLoggingContext;
 import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.servlet.SolrDispatchFilter;
@@ -137,7 +143,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
  * <p>
  * TODO: exceptions during close on attempts to update cloud state
  */
-public class ZkController {
+public class ZkController implements Closeable {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   static final int WAIT_DOWN_STATES_TIMEOUT_SECONDS = 60;
@@ -433,11 +439,14 @@ public class ZkController {
         closeOutstandingElections(registerOnReconnect);
         markAllAsNotLeader(registerOnReconnect);
       }
-    }, zkACLProvider);
+    }, zkACLProvider, new ConnectionManager.IsClosed() {
+
+      @Override
+      public boolean isClosed() {
+        return cc.isShutDown();
+      }});
+
 
-    this.overseerJobQueue = Overseer.getStateUpdateQueue(zkClient);
-    this.overseerCollectionQueue = Overseer.getCollectionQueue(zkClient);
-    this.overseerConfigSetQueue = Overseer.getConfigSetQueue(zkClient);
     this.overseerRunningMap = Overseer.getRunningMap(zkClient);
     this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
     this.overseerFailureMap = Overseer.getFailureMap(zkClient);
@@ -448,6 +457,10 @@ public class ZkController {
     });
 
     init(registerOnReconnect);
+    
+    this.overseerJobQueue = overseer.getStateUpdateQueue();
+    this.overseerCollectionQueue = overseer.getCollectionQueue(zkClient);
+    this.overseerConfigSetQueue = overseer.getConfigSetQueue(zkClient);
 
     assert ObjectReleaseTracker.track(this);
   }
@@ -554,42 +567,62 @@ public class ZkController {
    */
   public void close() {
     this.isClosed = true;
+
+    ForkJoinPool customThreadPool = new ForkJoinPool(10);
+
+    customThreadPool.submit(() -> Collections.singleton(overseerElector.getContext()).parallelStream().forEach(c -> {
+      IOUtils.closeQuietly(c);
+    }));
+
+    customThreadPool.submit(() -> Collections.singleton(overseer).parallelStream().forEach(c -> {
+      IOUtils.closeQuietly(c);
+    }));
+
     synchronized (collectionToTerms) {
-      collectionToTerms.values().forEach(ZkCollectionTerms::close);
+      customThreadPool.submit(() -> collectionToTerms.values().parallelStream().forEach(c -> {
+        c.close();
+      }));
     }
     try {
-      for (ElectionContext context : electionContexts.values()) {
+
+      customThreadPool.submit(() -> replicateFromLeaders.values().parallelStream().forEach(c -> {
+        c.stopReplication();
+      }));
+
+      customThreadPool.submit(() -> electionContexts.values().parallelStream().forEach(c -> {
+        IOUtils.closeQuietly(c);
+      }));
+
+    } finally {
+
+      customThreadPool.submit(() -> Collections.singleton(cloudSolrClient).parallelStream().forEach(c -> {
+        IOUtils.closeQuietly(c);
+      }));
+      customThreadPool.submit(() -> Collections.singleton(cloudManager).parallelStream().forEach(c -> {
+        IOUtils.closeQuietly(c);
+      }));
+
+      try {
         try {
-          context.close();
+          zkStateReader.close();
         } catch (Exception e) {
-          log.error("Error closing overseer", e);
+          log.error("Error closing zkStateReader", e);
         }
-      }
-    } finally {
-      try {
-        IOUtils.closeQuietly(overseerElector.getContext());
-        IOUtils.closeQuietly(overseer);
       } finally {
-        if (cloudSolrClient != null) {
-          IOUtils.closeQuietly(cloudSolrClient);
-        }
-        if (cloudManager != null) {
-          IOUtils.closeQuietly(cloudManager);
-        }
         try {
-          try {
-            zkStateReader.close();
-          } catch (Exception e) {
-            log.error("Error closing zkStateReader", e);
-          }
+          zkClient.close();
+        } catch (Exception e) {
+          log.error("Error closing zkClient", e);
         } finally {
-          try {
-            zkClient.close();
-          } catch (Exception e) {
-            log.error("Error closing zkClient", e);
-          }
+
+          // just in case the OverseerElectionContext managed to start another Overseer
+          IOUtils.closeQuietly(overseer);
+
+          ExecutorUtil.shutdownAndAwaitTermination(customThreadPool);
         }
+
       }
+
     }
     assert ObjectReleaseTracker.release(this);
   }
@@ -669,9 +702,11 @@ public class ZkController {
       if (cloudManager != null) {
         return cloudManager;
       }
-      cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(zkServerAddress), Optional.empty())
-          .withHttpClient(cc.getUpdateShardHandler().getDefaultHttpClient()).build();
+      cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(zkServerAddress), Optional.empty()).withSocketTimeout(30000).withConnectionTimeout(15000)
+          .withHttpClient(cc.getUpdateShardHandler().getDefaultHttpClient())
+          .withConnectionTimeout(15000).withSocketTimeout(30000).build();
       cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cloudSolrClient);
+      cloudManager.getClusterStateProvider().connect();
     }
     return cloudManager;
   }
@@ -764,7 +799,8 @@ public class ZkController {
    * @throws KeeperException      if there is a Zookeeper error
    * @throws InterruptedException on interrupt
    */
-  public static void createClusterZkNodes(SolrZkClient zkClient) throws KeeperException, InterruptedException, IOException {
+  public static void createClusterZkNodes(SolrZkClient zkClient)
+      throws KeeperException, InterruptedException, IOException {
     ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
     cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
     cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
@@ -777,7 +813,7 @@ public class ZkController {
     cmdExecutor.ensureExists(ZkStateReader.CLUSTER_STATE, emptyJson, CreateMode.PERSISTENT, zkClient);
     cmdExecutor.ensureExists(ZkStateReader.SOLR_SECURITY_CONF_PATH, emptyJson, CreateMode.PERSISTENT, zkClient);
     cmdExecutor.ensureExists(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, emptyJson, CreateMode.PERSISTENT, zkClient);
-   bootstrapDefaultConfigSet(zkClient);
+    bootstrapDefaultConfigSet(zkClient);
   }
 
   private static void bootstrapDefaultConfigSet(SolrZkClient zkClient) throws KeeperException, InterruptedException, IOException {
@@ -839,7 +875,7 @@ public class ZkController {
       // start the overseer first as following code may need it's processing
       if (!zkRunOnly) {
         overseerElector = new LeaderElector(zkClient);
-        this.overseer = new Overseer(cc.getShardHandlerFactory().getShardHandler(), cc.getUpdateShardHandler(),
+        this.overseer = new Overseer((HttpShardHandler) cc.getShardHandlerFactory().getShardHandler(), cc.getUpdateShardHandler(),
             CommonParams.CORES_HANDLER_PATH, zkStateReader, this, cloudConfig);
         ElectionContext context = new OverseerElectionContext(zkClient,
             overseer, getNodeName());
@@ -911,10 +947,10 @@ public class ZkController {
     LiveNodesListener listener = (oldNodes, newNodes) -> {
       oldNodes.removeAll(newNodes);
       if (oldNodes.isEmpty()) { // only added nodes
-        return;
+        return false;
       }
       if (isClosed) {
-        return;
+        return true;
       }
       // if this node is in the top three then attempt to create nodeLost message
       int i = 0;
@@ -923,7 +959,7 @@ public class ZkController {
           break;
         }
         if (i > 2) {
-          return; // this node is not in the top three
+          return false; // this node is not in the top three
         }
         i++;
       }
@@ -948,11 +984,17 @@ public class ZkController {
           }
         }
       }
+      return false;
     };
     zkStateReader.registerLiveNodesListener(listener);
   }
 
   public void publishAndWaitForDownStates() throws KeeperException,
+  InterruptedException {
+    publishAndWaitForDownStates(WAIT_DOWN_STATES_TIMEOUT_SECONDS);
+  }
+  
+  public void publishAndWaitForDownStates(int timeoutSeconds) throws KeeperException,
       InterruptedException {
 
     publishNodeAsDown(getNodeName());
@@ -983,7 +1025,7 @@ public class ZkController {
       });
     }
 
-    boolean allPublishedDown = latch.await(WAIT_DOWN_STATES_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    boolean allPublishedDown = latch.await(timeoutSeconds, TimeUnit.SECONDS);
     if (!allPublishedDown) {
       log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state.");
     }
@@ -1051,10 +1093,13 @@ public class ZkController {
     log.info("Remove node as live in ZooKeeper:" + nodePath);
     List<Op> ops = new ArrayList<>(2);
     ops.add(Op.delete(nodePath, -1));
-    if (zkClient.exists(nodeAddedPath, true)) {
-      ops.add(Op.delete(nodeAddedPath, -1));
+    ops.add(Op.delete(nodeAddedPath, -1));
+ 
+    try {
+      zkClient.multi(ops, true);
+    } catch (NoNodeException e) {
+
     }
-    zkClient.multi(ops, true);
   }
 
   public String getNodeName() {
@@ -1158,6 +1203,10 @@ public class ZkController {
         // TODO: should this actually be done earlier, before (or as part of)
         // leader election perhaps?
         
+        if (core == null) {
+          throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "SolrCore is no longer available to register");
+        }
+
         UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
         boolean isTlogReplicaAndNotLeader = replica.getType() == Replica.Type.TLOG && !isLeader;
         if (isTlogReplicaAndNotLeader) {
@@ -1270,6 +1319,7 @@ public class ZkController {
       final long msInSec = 1000L;
       int maxTries = (int) Math.floor(leaderConflictResolveWait / msInSec);
       while (!leaderUrl.equals(clusterStateLeaderUrl)) {
+        if (cc.isShutDown()) throw new AlreadyClosedException();
         if (tries > maxTries) {
           throw new SolrException(ErrorCode.SERVER_ERROR,
               "There is conflicting information about the leader of shard: "
@@ -1290,6 +1340,8 @@ public class ZkController {
             .getCoreUrl();
       }
 
+    } catch (AlreadyClosedException e) { 
+      throw e;
     } catch (Exception e) {
       log.error("Error getting leader from zk", e);
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
@@ -1336,7 +1388,7 @@ public class ZkController {
         Thread.sleep(1000);
       }
       if (cc.isShutDown()) {
-        throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "CoreContainer is closed");
+        throw new AlreadyClosedException();
       }
     }
     throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Could not get leader props", exp);
@@ -2392,6 +2444,9 @@ public class ZkController {
   }
 
   private boolean fireEventListeners(String zkDir) {
+    if (isClosed || cc.isShutDown()) {
+      return false;
+    }
     synchronized (confDirectoryListeners) {
       // if this is not among directories to be watched then don't set the watcher anymore
       if (!confDirectoryListeners.containsKey(zkDir)) {
@@ -2527,15 +2582,17 @@ public class ZkController {
    * @param nodeName to operate on
    */
   public void publishNodeAsDown(String nodeName) {
-    log.debug("Publish node={} as DOWN", nodeName);
+    log.info("Publish node={} as DOWN", nodeName);
     ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DOWNNODE.toLower(),
         ZkStateReader.NODE_NAME_PROP, nodeName);
     try {
-      Overseer.getStateUpdateQueue(getZkClient()).offer(Utils.toJSON(m));
+      overseer.getStateUpdateQueue().offer(Utils.toJSON(m));
+    } catch (AlreadyClosedException e) {
+      log.info("Not publishing node as DOWN because a resource required to do so is already closed.");
     } catch (InterruptedException e) {
-      Thread.interrupted();
+      Thread.currentThread().interrupt();
       log.debug("Publish node as down was interrupted.");
-    } catch (Exception e) {
+    } catch (KeeperException e) {
       log.warn("Could not publish node as down: " + e.getMessage());
     } 
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
index 7acdfef..d3ce990 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
@@ -39,6 +39,7 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
+import org.apache.solr.common.cloud.ConnectionManager.IsClosed;
 import org.apache.solr.common.util.Pair;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -113,11 +114,15 @@ public class ZkDistributedQueue implements DistributedQueue {
   public ZkDistributedQueue(SolrZkClient zookeeper, String dir, Stats stats) {
     this(zookeeper, dir, stats, 0);
   }
-
+  
   public ZkDistributedQueue(SolrZkClient zookeeper, String dir, Stats stats, int maxQueueSize) {
+    this(zookeeper, dir, stats, maxQueueSize, null);
+  }
+
+  public ZkDistributedQueue(SolrZkClient zookeeper, String dir, Stats stats, int maxQueueSize, IsClosed higherLevelIsClosed) {
     this.dir = dir;
 
-    ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout());
+    ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout(), higherLevelIsClosed);
     try {
       cmdExecutor.ensureExists(dir, zookeeper);
     } catch (KeeperException e) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index bcbb347..01fe62b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -313,29 +313,24 @@ public class ZkShardTerms implements AutoCloseable{
    * Create correspond ZK term node
    */
   private void ensureTermNodeExist() {
-    String path = "/collections/"+collection+ "/terms";
+    String path = "/collections/" + collection + "/terms";
     try {
-      if (!zkClient.exists(path, true)) {
-        try {
-          zkClient.makePath(path, true);
-        } catch (KeeperException.NodeExistsException e) {
-          // it's okay if another beats us creating the node
-        }
-      }
-      path += "/"+shard;
-      if (!zkClient.exists(path, true)) {
-        try {
-          Map<String, Long> initialTerms = new HashMap<>();
-          zkClient.create(path, Utils.toJSON(initialTerms), CreateMode.PERSISTENT, true);
-        } catch (KeeperException.NodeExistsException e) {
-          // it's okay if another beats us creating the node
-        }
+      path += "/" + shard;
+
+      try {
+        Map<String,Long> initialTerms = new HashMap<>();
+        zkClient.makePath(path, Utils.toJSON(initialTerms), CreateMode.PERSISTENT, true);
+      } catch (KeeperException.NodeExistsException e) {
+        // it's okay if another beats us creating the node
       }
-    }  catch (InterruptedException e) {
+
+    } catch (InterruptedException e) {
       Thread.interrupted();
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating shard term node in Zookeeper for collection: " + collection, e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "Error creating shard term node in Zookeeper for collection: " + collection, e);
     } catch (KeeperException e) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating shard term node in Zookeeper for collection: " + collection, e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "Error creating shard term node in Zookeeper for collection: " + collection, e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
index 8b72cdf..a0abaf0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -245,7 +245,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
           props = props.plus(ZkStateReader.CORE_NODE_NAME_PROP, createReplica.coreNodeName);
         }
         try {
-          Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
+          ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
         } catch (Exception e) {
           throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception updating Overseer state queue", e);
         }
@@ -328,6 +328,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
         }
       }
     }
+    log.info("Returning CreateReplica command.");
     return new CreateReplica(collection, shard, node, replicaType, coreName, coreNodeName);
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
index fd09a3f..318cdf7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
@@ -115,7 +115,7 @@ public class Assign {
       } catch (IOException | KeeperException e) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error inc and get counter from Zookeeper for collection:"+collection, e);
       } catch (InterruptedException e) {
-        Thread.interrupted();
+        Thread.currentThread().interrupt();
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error inc and get counter from Zookeeper for collection:" + collection, e);
       }
     }
@@ -182,21 +182,34 @@ public class Assign {
     return String.format(Locale.ROOT, "%s_%s_replica_%s%s", collectionName, shard, type.name().substring(0,1).toLowerCase(Locale.ROOT), replicaNum);
   }
 
-  private static int defaultCounterValue(DocCollection collection, boolean newCollection) {
+  private static int defaultCounterValue(DocCollection collection, boolean newCollection, String shard) {
     if (newCollection) return 0;
-    int defaultValue = collection.getReplicas().size();
+
+    int defaultValue;
+    if (collection.getSlice(shard) != null && collection.getSlice(shard).getReplicas().isEmpty()) {
+      return 0;
+    } else {
+      defaultValue = collection.getReplicas().size() * 2;
+    }
+
     if (collection.getReplicationFactor() != null) {
       // numReplicas and replicationFactor * numSlices can be not equals,
       // in case of many addReplicas or deleteReplicas are executed
       defaultValue = Math.max(defaultValue,
           collection.getReplicationFactor() * collection.getSlices().size());
     }
-    return defaultValue * 20;
+    return defaultValue;
+  }
+  
+  private static int defaultCounterValue(DocCollection collection, boolean newCollection) {
+    if (newCollection) return 0;
+    int defaultValue = collection.getReplicas().size();
+    return defaultValue;
   }
 
   public static String buildSolrCoreName(DistribStateManager stateManager, DocCollection collection, String shard, Replica.Type type, boolean newCollection) {
     Slice slice = collection.getSlice(shard);
-    int defaultValue = defaultCounterValue(collection, newCollection);
+    int defaultValue = defaultCounterValue(collection, newCollection, shard);
     int replicaNum = incAndGetId(stateManager, collection.getName(), defaultValue);
     String coreName = buildSolrCoreName(collection.getName(), shard, type, replicaNum);
     while (existCoreName(coreName, slice)) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
index b8aba76..fd9faad 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
@@ -160,7 +160,7 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
     String backupName = request.getStr(NAME);
     String asyncId = request.getStr(ASYNC);
     String repoName = request.getStr(CoreAdminParams.BACKUP_REPOSITORY);
-    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
     Map<String, String> requestMap = new HashMap<>();
 
     String commitName = request.getStr(CoreAdminParams.COMMIT_NAME);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 533aee8..0f5e41a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -155,8 +155,8 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       }
 
       createCollectionZkNode(stateManager, collectionName, collectionParams);
-
-      Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
+      
+      ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
 
       // wait for a while until we see the collection
       TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, timeSource);
@@ -195,7 +195,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , message : {2}",
           collectionName, shardNames, message));
       Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
-      ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+      ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
       for (ReplicaPosition replicaPosition : replicaPositions) {
         String nodeName = replicaPosition.node;
 
@@ -235,7 +235,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
               ZkStateReader.BASE_URL_PROP, baseUrl,
               ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(),
               CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
-          Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
+          ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
         }
 
         // Need to create new params for each request
@@ -308,7 +308,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
             Overseer.QUEUE_OPERATION, MODIFYCOLLECTION.toString(),
             ZkStateReader.COLLECTION_PROP, withCollection,
             CollectionAdminParams.COLOCATED_WITH, collectionName);
-        Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
+        ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
         try {
           zkStateReader.waitForState(withCollection, 5, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionName.equals(collectionState.getStr(COLOCATED_WITH)));
         } catch (TimeoutException e) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
index e7f35f1..229b799 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
@@ -21,7 +21,6 @@ import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.solr.cloud.Overseer;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
@@ -71,7 +70,7 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
     }
 
     ZkStateReader zkStateReader = ocmh.zkStateReader;
-    Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
+    ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
     // wait for a while until we see the shard
     ocmh.waitForNewShard(collectionName, sliceName);
     String async = message.getStr(ASYNC);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
index 32715d6..8a091ef 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
@@ -84,7 +84,7 @@ public class CreateSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
     Map<String, String> requestMap = new HashMap<>();
     NamedList shardRequestResults = new NamedList();
     Map<String, Slice> shardByCoreName = new HashMap<>();
-    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
 
     for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
       for (Replica replica : slice.getReplicas()) {