You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/10/23 00:06:07 UTC

[46/52] [abbrv] [partial] lucene-solr:jira/gradle: Add gradle support for Solr

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
deleted file mode 100644
index febeec0..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ /dev/null
@@ -1,628 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import java.io.Closeable;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
-
-import com.codahale.metrics.Timer;
-import com.google.common.collect.ImmutableSet;
-import org.apache.commons.io.IOUtils;
-import org.apache.solr.client.solrj.SolrResponse;
-import org.apache.solr.cloud.Overseer.LeaderStatus;
-import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.common.util.Utils;
-import org.apache.solr.logging.MDCLoggingContext;
-import org.apache.solr.util.DefaultSolrThreadFactory;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.ID;
-
-/**
- * A generic processor run in the Overseer, used for handling items added
- * to a distributed work queue.  Has support for handling exclusive tasks
- * (i.e. tasks that should not run in parallel with each other).
- *
- * An {@link OverseerMessageHandlerSelector} determines which
- * {@link OverseerMessageHandler} handles specific messages in the
- * queue.
- */
-public class OverseerTaskProcessor implements Runnable, Closeable {
-
-  /**
-   * Maximum number of overseer collection operations which can be
-   * executed concurrently
-   */
-  public static final int MAX_PARALLEL_TASKS = 100;
-  public static final int MAX_BLOCKED_TASKS = 1000;
-
-  public ExecutorService tpe;
-
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private OverseerTaskQueue workQueue;
-  private DistributedMap runningMap;
-  private DistributedMap completedMap;
-  private DistributedMap failureMap;
-
-  // Set that maintains a list of all the tasks that are running. This is keyed on zk id of the task.
-  final private Set<String> runningTasks;
-
-  // List of completed tasks. This is used to clean up workQueue in zk.
-  final private HashMap<String, QueueEvent> completedTasks;
-
-  private String myId;
-
-  private ZkStateReader zkStateReader;
-
-  private boolean isClosed;
-
-  private Stats stats;
-
-  // Set of tasks that have been picked up for processing but not cleaned up from zk work-queue.
-  // It may contain tasks that have completed execution, have been entered into the completed/failed map in zk but not
-  // deleted from the work-queue as that is a batched operation.
-  final private Set<String> runningZKTasks;
-  // This map may contain tasks which are read from work queue but could not
-  // be executed because they are blocked or the execution queue is full
-  // This is an optimization to ensure that we do not read the same tasks
-  // again and again from ZK.
-  final private Map<String, QueueEvent> blockedTasks = new LinkedHashMap<>();
-  final private Predicate<String> excludedTasks = new Predicate<String>() {
-    @Override
-    public boolean test(String s) {
-      return runningTasks.contains(s) || blockedTasks.containsKey(s);
-    }
-
-    @Override
-    public String toString() {
-      return StrUtils.join(ImmutableSet.of(runningTasks, blockedTasks.keySet()), ',');
-    }
-
-  };
-
-  private final Object waitLock = new Object();
-
-  protected OverseerMessageHandlerSelector selector;
-
-  private OverseerNodePrioritizer prioritizer;
-
-  private String thisNode;
-
-  public OverseerTaskProcessor(ZkStateReader zkStateReader, String myId,
-                                        Stats stats,
-                                        OverseerMessageHandlerSelector selector,
-                                        OverseerNodePrioritizer prioritizer,
-                                        OverseerTaskQueue workQueue,
-                                        DistributedMap runningMap,
-                                        DistributedMap completedMap,
-                                        DistributedMap failureMap) {
-    this.zkStateReader = zkStateReader;
-    this.myId = myId;
-    this.stats = stats;
-    this.selector = selector;
-    this.prioritizer = prioritizer;
-    this.workQueue = workQueue;
-    this.runningMap = runningMap;
-    this.completedMap = completedMap;
-    this.failureMap = failureMap;
-    this.runningZKTasks = new HashSet<>();
-    this.runningTasks = new HashSet<>();
-    this.completedTasks = new HashMap<>();
-    thisNode = Utils.getMDCNode();
-  }
-
-  @Override
-  public void run() {
-    MDCLoggingContext.setNode(thisNode);
-    log.debug("Process current queue of overseer operations");
-    LeaderStatus isLeader = amILeader();
-    while (isLeader == LeaderStatus.DONT_KNOW) {
-      log.debug("am_i_leader unclear {}", isLeader);
-      isLeader = amILeader();  // not a no, not a yes, try ask again
-    }
-
-    String oldestItemInWorkQueue = null;
-    // hasLeftOverItems - used for avoiding re-execution of async tasks that were processed by a previous Overseer.
-    // This variable is set in case there's any task found on the workQueue when the OCP starts up and
-    // the id for the queue tail is used as a marker to check for the task in completed/failed map in zk.
-    // Beyond the marker, all tasks can safely be assumed to have never been executed.
-    boolean hasLeftOverItems = true;
-
-    try {
-      oldestItemInWorkQueue = workQueue.getTailId();
-    } catch (KeeperException e) {
-      // We don't need to handle this. This is just a fail-safe which comes in handy in skipping already processed
-      // async calls.
-      SolrException.log(log, "", e);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
-
-    if (oldestItemInWorkQueue == null)
-      hasLeftOverItems = false;
-    else
-      log.debug("Found already existing elements in the work-queue. Last element: {}", oldestItemInWorkQueue);
-
-    try {
-      prioritizer.prioritizeOverseerNodes(myId);
-    } catch (Exception e) {
-      if (!zkStateReader.getZkClient().isClosed()) {
-        log.error("Unable to prioritize overseer ", e);
-      }
-    }
-
-    // TODO: Make maxThreads configurable.
-
-    this.tpe = new ExecutorUtil.MDCAwareThreadPoolExecutor(5, MAX_PARALLEL_TASKS, 0L, TimeUnit.MILLISECONDS,
-        new SynchronousQueue<Runnable>(),
-        new DefaultSolrThreadFactory("OverseerThreadFactory"));
-    try {
-      while (!this.isClosed) {
-        try {
-          isLeader = amILeader();
-          if (LeaderStatus.NO == isLeader) {
-            break;
-          } else if (LeaderStatus.YES != isLeader) {
-            log.debug("am_i_leader unclear {}", isLeader);
-            continue; // not a no, not a yes, try asking again
-          }
-
-          log.debug("Cleaning up work-queue. #Running tasks: {}", runningTasks.size());
-          cleanUpWorkQueue();
-
-          printTrackingMaps();
-
-          boolean waited = false;
-
-          while (runningTasks.size() > MAX_PARALLEL_TASKS) {
-            synchronized (waitLock) {
-              waitLock.wait(100);//wait for 100 ms or till a task is complete
-            }
-            waited = true;
-          }
-
-          if (waited)
-            cleanUpWorkQueue();
-
-
-          ArrayList<QueueEvent> heads = new ArrayList<>(blockedTasks.size() + MAX_PARALLEL_TASKS);
-          heads.addAll(blockedTasks.values());
-
-          //If we have enough items in the blocked tasks already, it makes
-          // no sense to read more items from the work queue. it makes sense
-          // to clear out at least a few items in the queue before we read more items
-          if (heads.size() < MAX_BLOCKED_TASKS) {
-            //instead of reading MAX_PARALLEL_TASKS items always, we should only fetch as much as we can execute
-            int toFetch = Math.min(MAX_BLOCKED_TASKS - heads.size(), MAX_PARALLEL_TASKS - runningTasks.size());
-            List<QueueEvent> newTasks = workQueue.peekTopN(toFetch, excludedTasks, 2000L);
-            log.debug("Got {} tasks from work-queue : [{}]", newTasks.size(), newTasks);
-            heads.addAll(newTasks);
-          } else {
-            // Prevent free-spinning this loop.
-            Thread.sleep(1000);
-          }
-
-          if (isClosed) break;
-
-          if (heads.isEmpty()) {
-            continue;
-          }
-
-          blockedTasks.clear(); // clear it now; may get refilled below.
-
-          taskBatch.batchId++;
-          boolean tooManyTasks = false;
-          for (QueueEvent head : heads) {
-            if (!tooManyTasks) {
-              synchronized (runningTasks) {
-                tooManyTasks = runningTasks.size() >= MAX_PARALLEL_TASKS;
-              }
-            }
-            if (tooManyTasks) {
-              // Too many tasks are running, just shove the rest into the "blocked" queue.
-              if(blockedTasks.size() < MAX_BLOCKED_TASKS)
-                blockedTasks.put(head.getId(), head);
-              continue;
-            }
-            if (runningZKTasks.contains(head.getId())) continue;
-            final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
-            final String asyncId = message.getStr(ASYNC);
-            if (hasLeftOverItems) {
-              if (head.getId().equals(oldestItemInWorkQueue))
-                hasLeftOverItems = false;
-              if (asyncId != null && (completedMap.contains(asyncId) || failureMap.contains(asyncId))) {
-                log.debug("Found already processed task in workQueue, cleaning up. AsyncId [{}]",asyncId );
-                workQueue.remove(head);
-                continue;
-              }
-            }
-            String operation = message.getStr(Overseer.QUEUE_OPERATION);
-            if (operation == null) {
-              log.error("Msg does not have required " + Overseer.QUEUE_OPERATION + ": {}", message);
-              workQueue.remove(head);
-              continue;
-            }
-            OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message);
-            OverseerMessageHandler.Lock lock = messageHandler.lockTask(message, taskBatch);
-            if (lock == null) {
-              log.debug("Exclusivity check failed for [{}]", message.toString());
-              //we may end crossing the size of the MAX_BLOCKED_TASKS. They are fine
-              if (blockedTasks.size() < MAX_BLOCKED_TASKS)
-                blockedTasks.put(head.getId(), head);
-              continue;
-            }
-            try {
-              markTaskAsRunning(head, asyncId);
-              log.debug("Marked task [{}] as running", head.getId());
-            } catch (KeeperException.NodeExistsException e) {
-              lock.unlock();
-              // This should never happen
-              log.error("Tried to pick up task [{}] when it was already running!", head.getId());
-              continue;
-            } catch (InterruptedException e) {
-              lock.unlock();
-              log.error("Thread interrupted while trying to pick task for execution.", head.getId());
-              Thread.currentThread().interrupt();
-              continue;
-            }
-            log.debug(messageHandler.getName() + ": Get the message id:" + head.getId() + " message:" + message.toString());
-            Runner runner = new Runner(messageHandler, message,
-                operation, head, lock);
-            tpe.execute(runner);
-          }
-
-        } catch (KeeperException e) {
-          if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
-            log.warn("Overseer cannot talk to ZK");
-            return;
-          }
-          SolrException.log(log, "", e);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          return;
-        } catch (Exception e) {
-          SolrException.log(log, "", e);
-        }
-      }
-    } finally {
-      this.close();
-    }
-  }
-
-  private void cleanUpWorkQueue() throws KeeperException, InterruptedException {
-    synchronized (completedTasks) {
-      for (String id : completedTasks.keySet()) {
-        workQueue.remove(completedTasks.get(id));
-        runningZKTasks.remove(id);
-      }
-      completedTasks.clear();
-    }
-  }
-
-  public void close() {
-    isClosed = true;
-    if (tpe != null) {
-      if (!tpe.isShutdown()) {
-        ExecutorUtil.shutdownAndAwaitTermination(tpe);
-      }
-    }
-    IOUtils.closeQuietly(selector);
-  }
-
-  public static List<String> getSortedOverseerNodeNames(SolrZkClient zk) throws KeeperException, InterruptedException {
-    List<String> children = null;
-    try {
-      children = zk.getChildren(Overseer.OVERSEER_ELECT + LeaderElector.ELECTION_NODE, null, true);
-    } catch (Exception e) {
-      log.warn("error ", e);
-      return new ArrayList<>();
-    }
-    LeaderElector.sortSeqs(children);
-    ArrayList<String> nodeNames = new ArrayList<>(children.size());
-    for (String c : children) nodeNames.add(LeaderElector.getNodeName(c));
-    return nodeNames;
-  }
-
-  public static List<String> getSortedElectionNodes(SolrZkClient zk, String path) throws KeeperException, InterruptedException {
-    List<String> children = null;
-    try {
-      children = zk.getChildren(path, null, true);
-      LeaderElector.sortSeqs(children);
-      return children;
-    } catch (Exception e) {
-      throw e;
-    }
-
-  }
-
-  public static String getLeaderNode(SolrZkClient zkClient) throws KeeperException, InterruptedException {
-    String id = getLeaderId(zkClient);
-    return id==null ?
-        null:
-        LeaderElector.getNodeName( id);
-  }
-
-  public static String getLeaderId(SolrZkClient zkClient) throws KeeperException,InterruptedException{
-    byte[] data = null;
-    try {
-      data = zkClient.getData(Overseer.OVERSEER_ELECT + "/leader", null, new Stat(), true);
-    } catch (KeeperException.NoNodeException e) {
-      return null;
-    }
-    Map m = (Map) Utils.fromJSON(data);
-    return  (String) m.get(ID);
-  }
-
-  protected LeaderStatus amILeader() {
-    String statsName = "collection_am_i_leader";
-    Timer.Context timerContext = stats.time(statsName);
-    boolean success = true;
-    String propsId = null;
-    try {
-      ZkNodeProps props = ZkNodeProps.load(zkStateReader.getZkClient().getData(
-          Overseer.OVERSEER_ELECT + "/leader", null, null, true));
-      propsId = props.getStr(ID);
-      if (myId.equals(propsId)) {
-        return LeaderStatus.YES;
-      }
-    } catch (KeeperException e) {
-      success = false;
-      if (e.code() == KeeperException.Code.CONNECTIONLOSS) {
-        log.error("", e);
-        return LeaderStatus.DONT_KNOW;
-      } else if (e.code() != KeeperException.Code.SESSIONEXPIRED) {
-        log.warn("", e);
-      } else {
-        log.debug("", e);
-      }
-    } catch (InterruptedException e) {
-      success = false;
-      Thread.currentThread().interrupt();
-    } finally {
-      timerContext.stop();
-      if (success)  {
-        stats.success(statsName);
-      } else  {
-        stats.error(statsName);
-      }
-    }
-    log.info("According to ZK I (id={}) am no longer a leader. propsId={}", myId, propsId);
-    return LeaderStatus.NO;
-  }
-
-  public boolean isClosed() {
-    return isClosed;
-  }
-
-  @SuppressWarnings("unchecked")
-  private void markTaskAsRunning(QueueEvent head, String asyncId)
-      throws KeeperException, InterruptedException {
-    synchronized (runningZKTasks) {
-      runningZKTasks.add(head.getId());
-    }
-
-    synchronized (runningTasks) {
-      runningTasks.add(head.getId());
-    }
-
-
-    if (asyncId != null)
-      runningMap.put(asyncId, null);
-  }
-  
-  protected class Runner implements Runnable {
-    ZkNodeProps message;
-    String operation;
-    SolrResponse response;
-    QueueEvent head;
-    OverseerMessageHandler messageHandler;
-    private final OverseerMessageHandler.Lock lock;
-
-    public Runner(OverseerMessageHandler messageHandler, ZkNodeProps message, String operation, QueueEvent head, OverseerMessageHandler.Lock lock) {
-      this.message = message;
-      this.operation = operation;
-      this.head = head;
-      this.messageHandler = messageHandler;
-      this.lock = lock;
-      response = null;
-    }
-
-
-    public void run() {
-      String statsName = messageHandler.getTimerName(operation);
-      final Timer.Context timerContext = stats.time(statsName);
-
-      boolean success = false;
-      final String asyncId = message.getStr(ASYNC);
-      String taskKey = messageHandler.getTaskKey(message);
-
-      try {
-        try {
-          log.debug("Runner processing {}", head.getId());
-          response = messageHandler.processMessage(message, operation);
-        } finally {
-          timerContext.stop();
-          updateStats(statsName);
-        }
-
-        if (asyncId != null) {
-          if (response != null && (response.getResponse().get("failure") != null 
-              || response.getResponse().get("exception") != null)) {
-            failureMap.put(asyncId, SolrResponse.serializable(response));
-            log.debug("Updated failed map for task with zkid:[{}]", head.getId());
-          } else {
-            completedMap.put(asyncId, SolrResponse.serializable(response));
-            log.debug("Updated completed map for task with zkid:[{}]", head.getId());
-          }
-        } else {
-          head.setBytes(SolrResponse.serializable(response));
-          log.debug("Completed task:[{}]", head.getId());
-        }
-
-        markTaskComplete(head.getId(), asyncId);
-        log.debug("Marked task [{}] as completed.", head.getId());
-        printTrackingMaps();
-
-        log.debug(messageHandler.getName() + ": Message id:" + head.getId() +
-            " complete, response:" + response.getResponse().toString());
-        success = true;
-      } catch (KeeperException e) {
-        SolrException.log(log, "", e);
-      } catch (InterruptedException e) {
-        // Reset task from tracking data structures so that it can be retried.
-        resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey, message);
-        log.warn("Resetting task {} as the thread was interrupted.", head.getId());
-        Thread.currentThread().interrupt();
-      } finally {
-        lock.unlock();
-        if (!success) {
-          // Reset task from tracking data structures so that it can be retried.
-          resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey, message);
-        }
-        synchronized (waitLock){
-          waitLock.notifyAll();
-        }
-      }
-    }
-
-    private void markTaskComplete(String id, String asyncId)
-        throws KeeperException, InterruptedException {
-      synchronized (completedTasks) {
-        completedTasks.put(id, head);
-      }
-
-      synchronized (runningTasks) {
-        runningTasks.remove(id);
-      }
-
-      if (asyncId != null) {
-        if (!runningMap.remove(asyncId)) {
-          log.warn("Could not find and remove async call [" + asyncId + "] from the running map.");
-        }
-      }
-
-      workQueue.remove(head);
-    }
-
-    private void resetTaskWithException(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey, ZkNodeProps message) {
-      log.warn("Resetting task: {}, requestid: {}, taskKey: {}", id, asyncId, taskKey);
-      try {
-        if (asyncId != null) {
-          if (!runningMap.remove(asyncId)) {
-            log.warn("Could not find and remove async call [" + asyncId + "] from the running map.");
-          }
-        }
-
-        synchronized (runningTasks) {
-          runningTasks.remove(id);
-        }
-
-      } catch (KeeperException e) {
-        SolrException.log(log, "", e);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-
-    }
-
-    private void updateStats(String statsName) {
-      if (isSuccessful()) {
-        stats.success(statsName);
-      } else {
-        stats.error(statsName);
-        stats.storeFailureDetails(statsName, message, response);
-      }
-    }
-
-    private boolean isSuccessful() {
-      if (response == null)
-        return false;
-      return !(response.getResponse().get("failure") != null || response.getResponse().get("exception") != null);
-    }
-  }
-
-  private void printTrackingMaps() {
-    if (log.isDebugEnabled()) {
-      synchronized (runningTasks) {
-        log.debug("RunningTasks: {}", runningTasks.toString());
-      }
-      log.debug("BlockedTasks: {}", blockedTasks.keySet().toString());
-      synchronized (completedTasks) {
-        log.debug("CompletedTasks: {}", completedTasks.keySet().toString());
-      }
-      synchronized (runningZKTasks) {
-        log.debug("RunningZKTasks: {}", runningZKTasks.toString());
-      }
-    }
-  }
-
-
-
-  String getId(){
-    return myId;
-  }
-
-  /**
-   * An interface to determine which {@link OverseerMessageHandler}
-   * handles a given message.  This could be a single OverseerMessageHandler
-   * for the case where a single type of message is handled (e.g. collection
-   * messages only) , or a different handler could be selected based on the
-   * contents of the message.
-   */
-  public interface OverseerMessageHandlerSelector extends Closeable {
-    OverseerMessageHandler selectOverseerMessageHandler(ZkNodeProps message);
-  }
-
-  final private TaskBatch taskBatch = new TaskBatch();
-
-  public class TaskBatch {
-    private long batchId = 0;
-
-    public long getId() {
-      return batchId;
-    }
-
-    public int getRunningTasks() {
-      synchronized (runningTasks) {
-        return runningTasks.size();
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
deleted file mode 100644
index 66a31c5..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import com.codahale.metrics.Timer;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Predicate;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.util.Pair;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@link ZkDistributedQueue} augmented with helper methods specific to the overseer task queues.
- * Methods specific to this subclass ignore superclass internal state and hit ZK directly.
- * This is inefficient!  But the API on this class is kind of muddy..
- */
-public class OverseerTaskQueue extends ZkDistributedQueue {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  
-  private static final String RESPONSE_PREFIX = "qnr-" ;
-
-  public OverseerTaskQueue(SolrZkClient zookeeper, String dir) {
-    this(zookeeper, dir, new Stats());
-  }
-
-  public OverseerTaskQueue(SolrZkClient zookeeper, String dir, Stats stats) {
-    super(zookeeper, dir, stats);
-  }
-  
-  /**
-   * Returns true if the queue contains a task with the specified async id.
-   */
-  public boolean containsTaskWithRequestId(String requestIdKey, String requestId)
-      throws KeeperException, InterruptedException {
-
-    List<String> childNames = zookeeper.getChildren(dir, null, true);
-    stats.setQueueLength(childNames.size());
-    for (String childName : childNames) {
-      if (childName != null && childName.startsWith(PREFIX)) {
-        try {
-          byte[] data = zookeeper.getData(dir + "/" + childName, null, null, true);
-          if (data != null) {
-            ZkNodeProps message = ZkNodeProps.load(data);
-            if (message.containsKey(requestIdKey)) {
-              log.debug("Looking for {}, found {}", message.get(requestIdKey), requestId);
-              if(message.get(requestIdKey).equals(requestId)) return true;
-            }
-          }
-        } catch (KeeperException.NoNodeException e) {
-          // Another client removed the node first, try next
-        }
-      }
-    }
-
-    return false;
-  }
-
-  /**
-   * Remove the event and save the response into the other path.
-   */
-  public void remove(QueueEvent event) throws KeeperException,
-      InterruptedException {
-    Timer.Context time = stats.time(dir + "_remove_event");
-    try {
-      String path = event.getId();
-      String responsePath = dir + "/" + RESPONSE_PREFIX
-          + path.substring(path.lastIndexOf("-") + 1);
-      if (zookeeper.exists(responsePath, true)) {
-        zookeeper.setData(responsePath, event.getBytes(), true);
-      } else {
-        log.info("Response ZK path: " + responsePath + " doesn't exist."
-            + "  Requestor may have disconnected from ZooKeeper");
-      }
-      try {
-        zookeeper.delete(path, -1, true);
-      } catch (KeeperException.NoNodeException ignored) {
-      }
-    } finally {
-      time.stop();
-    }
-  }
-
-  /**
-   * Watcher that blocks until a WatchedEvent occurs for a znode.
-   */
-  static final class LatchWatcher implements Watcher {
-
-    private final Lock lock;
-    private final Condition eventReceived;
-    private WatchedEvent event;
-    private Event.EventType latchEventType;
-    
-    LatchWatcher() {
-      this(null);
-    }
-    
-    LatchWatcher(Event.EventType eventType) {
-      this.lock = new ReentrantLock();
-      this.eventReceived = lock.newCondition();
-      this.latchEventType = eventType;
-    }
-
-
-    @Override
-    public void process(WatchedEvent event) {
-      // session events are not change events, and do not remove the watcher
-      if (Event.EventType.None.equals(event.getType())) {
-        return;
-      }
-      // If latchEventType is not null, only fire if the type matches
-      log.debug("{} fired on path {} state {} latchEventType {}", event.getType(), event.getPath(), event.getState(), latchEventType);
-      if (latchEventType == null || event.getType() == latchEventType) {
-        lock.lock();
-        try {
-          this.event = event;
-          eventReceived.signalAll();
-        } finally {
-          lock.unlock();
-        }
-      }
-    }
-
-    public void await(long timeoutMs) throws InterruptedException {
-      assert timeoutMs > 0;
-      lock.lock();
-      try {
-        if (this.event != null) {
-          return;
-        }
-        eventReceived.await(timeoutMs, TimeUnit.MILLISECONDS);
-      } finally {
-        lock.unlock();
-      }
-    }
-
-    public WatchedEvent getWatchedEvent() {
-      return event;
-    }
-  }
-
-  /**
-   * Inserts data into zookeeper.
-   * 
-   * @return true if data was successfully added
-   */
-  private String createData(String path, byte[] data, CreateMode mode)
-      throws KeeperException, InterruptedException {
-    for (;;) {
-      try {
-        return zookeeper.create(path, data, mode, true);
-      } catch (KeeperException.NoNodeException e) {
-        try {
-          zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
-        } catch (KeeperException.NodeExistsException ne) {
-          // someone created it
-        }
-      }
-    }
-  }
-  
-  /**
-   * Offer the data and wait for the response
-   * 
-   */
-  public QueueEvent offer(byte[] data, long timeout) throws KeeperException,
-      InterruptedException {
-    Timer.Context time = stats.time(dir + "_offer");
-    try {
-      // Create and watch the response node before creating the request node;
-      // otherwise we may miss the response.
-      String watchID = createResponseNode();
-
-      LatchWatcher watcher = new LatchWatcher();
-      Stat stat = zookeeper.exists(watchID, watcher, true);
-
-      // create the request node
-      createRequestNode(data, watchID);
-
-      if (stat != null) {
-        watcher.await(timeout);
-      }
-      byte[] bytes = zookeeper.getData(watchID, null, null, true);
-      // create the event before deleting the node, otherwise we can get the deleted
-      // event from the watcher.
-      QueueEvent event =  new QueueEvent(watchID, bytes, watcher.getWatchedEvent());
-      zookeeper.delete(watchID, -1, true);
-      return event;
-    } finally {
-      time.stop();
-    }
-  }
-
-  void createRequestNode(byte[] data, String watchID) throws KeeperException, InterruptedException {
-    createData(dir + "/" + PREFIX + watchID.substring(watchID.lastIndexOf("-") + 1),
-        data, CreateMode.PERSISTENT);
-  }
-
-  String createResponseNode() throws KeeperException, InterruptedException {
-    return createData(
-            dir + "/" + RESPONSE_PREFIX,
-            null, CreateMode.EPHEMERAL_SEQUENTIAL);
-  }
-
-
-  public List<QueueEvent> peekTopN(int n, Predicate<String> excludeSet, long waitMillis)
-      throws KeeperException, InterruptedException {
-    ArrayList<QueueEvent> topN = new ArrayList<>();
-
-    log.debug("Peeking for top {} elements. ExcludeSet: {}", n, excludeSet);
-    Timer.Context time;
-    if (waitMillis == Long.MAX_VALUE) time = stats.time(dir + "_peekTopN_wait_forever");
-    else time = stats.time(dir + "_peekTopN_wait" + waitMillis);
-
-    try {
-      for (Pair<String, byte[]> element : peekElements(n, waitMillis, child -> !excludeSet.test(dir + "/" + child))) {
-        topN.add(new QueueEvent(dir + "/" + element.first(),
-            element.second(), null));
-      }
-      printQueueEventsListElementIds(topN);
-      return topN;
-    } finally {
-      time.stop();
-    }
-  }
-
-  private static void printQueueEventsListElementIds(ArrayList<QueueEvent> topN) {
-    if (log.isDebugEnabled() && !topN.isEmpty()) {
-      StringBuilder sb = new StringBuilder("[");
-      for (QueueEvent queueEvent : topN) {
-        sb.append(queueEvent.getId()).append(", ");
-      }
-      sb.append("]");
-      log.debug("Returning topN elements: {}", sb.toString());
-    }
-  }
-
-
-  /**
-   *
-   * Gets last element of the Queue without removing it.
-   */
-  public String getTailId() throws KeeperException, InterruptedException {
-    // TODO: could we use getChildren here?  Unsure what freshness guarantee the caller needs.
-    TreeSet<String> orderedChildren = fetchZkChildren(null);
-
-    for (String headNode : orderedChildren.descendingSet())
-      if (headNode != null) {
-        try {
-          QueueEvent queueEvent = new QueueEvent(dir + "/" + headNode, zookeeper.getData(dir + "/" + headNode,
-              null, null, true), null);
-          return queueEvent.getId();
-        } catch (KeeperException.NoNodeException e) {
-          // Another client removed the node first, try next
-        }
-      }
-    return null;
-  }
-  
-  public static class QueueEvent {
-    @Override
-    public int hashCode() {
-      final int prime = 31;
-      int result = 1;
-      result = prime * result + ((id == null) ? 0 : id.hashCode());
-      return result;
-    }
-    
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj) return true;
-      if (obj == null) return false;
-      if (getClass() != obj.getClass()) return false;
-      QueueEvent other = (QueueEvent) obj;
-      if (id == null) {
-        if (other.id != null) return false;
-      } else if (!id.equals(other.id)) return false;
-      return true;
-    }
-    
-    private WatchedEvent event = null;
-    private String id;
-    private byte[] bytes;
-    
-    QueueEvent(String id, byte[] bytes, WatchedEvent event) {
-      this.id = id;
-      this.bytes = bytes;
-      this.event = event;
-    }
-    
-    public void setId(String id) {
-      this.id = id;
-    }
-    
-    public String getId() {
-      return id;
-    }
-    
-    public void setBytes(byte[] bytes) {
-      this.bytes = bytes;
-    }
-    
-    public byte[] getBytes() {
-      return bytes;
-    }
-    
-    public WatchedEvent getWatchedEvent() {
-      return event;
-    }
-    
-  }
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
deleted file mode 100644
index 007d221..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-import java.lang.invoke.MethodHandles;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.solr.core.CoreContainer;
-import org.apache.solr.core.CoreDescriptor;
-import org.apache.solr.core.SolrCore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Start recovery of a core if its term is less than leader's term
- */
-public class RecoveringCoreTermWatcher implements ZkShardTerms.CoreTermWatcher {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final CoreDescriptor coreDescriptor;
-  private final CoreContainer coreContainer;
-  // used to prevent the case when term of other replicas get changed, we redo recovery
-  // the idea here is with a specific term of a replica, we only do recovery one
-  private final AtomicLong lastTermDoRecovery;
-
-  RecoveringCoreTermWatcher(CoreDescriptor coreDescriptor, CoreContainer coreContainer) {
-    this.coreDescriptor = coreDescriptor;
-    this.coreContainer = coreContainer;
-    this.lastTermDoRecovery = new AtomicLong(-1);
-  }
-
-  @Override
-  public boolean onTermChanged(ZkShardTerms.Terms terms) {
-    if (coreContainer.isShutDown()) return false;
-
-    try (SolrCore solrCore = coreContainer.getCore(coreDescriptor.getName())) {
-      if (solrCore == null || solrCore.isClosed()) {
-        return false;
-      }
-
-      if (solrCore.getCoreDescriptor() == null || solrCore.getCoreDescriptor().getCloudDescriptor() == null) return true;
-      String coreNodeName = solrCore.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
-      if (terms.haveHighestTermValue(coreNodeName)) return true;
-      if (lastTermDoRecovery.get() < terms.getTerm(coreNodeName)) {
-        log.info("Start recovery on {} because core's term is less than leader's term", coreNodeName);
-        lastTermDoRecovery.set(terms.getTerm(coreNodeName));
-        solrCore.getUpdateHandler().getSolrCoreState().doRecovery(solrCore.getCoreContainer(), solrCore.getCoreDescriptor());
-      }
-    } catch (Exception e) {
-      log.info("Failed to watch term of core {}", coreDescriptor.getName(), e);
-      return false;
-    }
-
-    return true;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    RecoveringCoreTermWatcher that = (RecoveringCoreTermWatcher) o;
-
-    return coreDescriptor.getName().equals(that.coreDescriptor.getName());
-  }
-
-  @Override
-  public int hashCode() {
-    return coreDescriptor.getName().hashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
deleted file mode 100644
index 94e126e..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ /dev/null
@@ -1,873 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.lucene.search.MatchAllDocsQuery;
-import org.apache.lucene.store.Directory;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.impl.HttpSolrClient.HttpUriRequestResponse;
-import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
-import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.response.SolrPingResponse;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.cloud.ZooKeeperException;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.core.CoreContainer;
-import org.apache.solr.core.CoreDescriptor;
-import org.apache.solr.core.DirectoryFactory.DirContext;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.handler.ReplicationHandler;
-import org.apache.solr.logging.MDCLoggingContext;
-import org.apache.solr.request.LocalSolrQueryRequest;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.request.SolrRequestHandler;
-import org.apache.solr.search.SolrIndexSearcher;
-import org.apache.solr.update.CdcrUpdateLog;
-import org.apache.solr.update.CommitUpdateCommand;
-import org.apache.solr.update.PeerSyncWithLeader;
-import org.apache.solr.update.UpdateLog;
-import org.apache.solr.update.UpdateLog.RecoveryInfo;
-import org.apache.solr.update.processor.DistributedUpdateProcessor;
-import org.apache.solr.util.RefCounted;
-import org.apache.solr.util.SolrPluginUtils;
-import org.apache.solr.util.plugin.NamedListInitializedPlugin;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class may change in future and customisations are not supported
- * between versions in terms of API or back compat behaviour.
- * @lucene.experimental
- */
-public class RecoveryStrategy implements Runnable, Closeable {
-
-  public static class Builder implements NamedListInitializedPlugin {
-    private NamedList args;
-    @Override
-    public void init(NamedList args) {
-      this.args = args;
-    }
-    // this should only be used from SolrCoreState
-    public RecoveryStrategy create(CoreContainer cc, CoreDescriptor cd,
-        RecoveryStrategy.RecoveryListener recoveryListener) {
-      final RecoveryStrategy recoveryStrategy = newRecoveryStrategy(cc, cd, recoveryListener);
-      SolrPluginUtils.invokeSetters(recoveryStrategy, args);
-      return recoveryStrategy;
-    }
-    protected RecoveryStrategy newRecoveryStrategy(CoreContainer cc, CoreDescriptor cd,
-        RecoveryStrategy.RecoveryListener recoveryListener) {
-      return new RecoveryStrategy(cc, cd, recoveryListener);
-    }
-  }
-
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private int waitForUpdatesWithStaleStatePauseMilliSeconds = Integer.getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 2500);
-  private int maxRetries = 500;
-  private int startingRecoveryDelayMilliSeconds = 5000;
-
-  public static interface RecoveryListener {
-    public void recovered();
-    public void failed();
-  }
-  
-  private volatile boolean close = false;
-
-  private RecoveryListener recoveryListener;
-  private ZkController zkController;
-  private String baseUrl;
-  private String coreZkNodeName;
-  private ZkStateReader zkStateReader;
-  private volatile String coreName;
-  private int retries;
-  private boolean recoveringAfterStartup;
-  private CoreContainer cc;
-  private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
-  private final Replica.Type replicaType;
-
-  protected RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
-    this.cc = cc;
-    this.coreName = cd.getName();
-    this.recoveryListener = recoveryListener;
-    zkController = cc.getZkController();
-    zkStateReader = zkController.getZkStateReader();
-    baseUrl = zkController.getBaseUrl();
-    coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName();
-    replicaType = cd.getCloudDescriptor().getReplicaType();
-  }
-
-  final public int getWaitForUpdatesWithStaleStatePauseMilliSeconds() {
-    return waitForUpdatesWithStaleStatePauseMilliSeconds;
-  }
-
-  final public void setWaitForUpdatesWithStaleStatePauseMilliSeconds(int waitForUpdatesWithStaleStatePauseMilliSeconds) {
-    this.waitForUpdatesWithStaleStatePauseMilliSeconds = waitForUpdatesWithStaleStatePauseMilliSeconds;
-  }
-
-  final public int getMaxRetries() {
-    return maxRetries;
-  }
-
-  final public void setMaxRetries(int maxRetries) {
-    this.maxRetries = maxRetries;
-  }
-
-  final public int getStartingRecoveryDelayMilliSeconds() {
-    return startingRecoveryDelayMilliSeconds;
-  }
-
-  final public void setStartingRecoveryDelayMilliSeconds(int startingRecoveryDelayMilliSeconds) {
-    this.startingRecoveryDelayMilliSeconds = startingRecoveryDelayMilliSeconds;
-  }
-
-  final public boolean getRecoveringAfterStartup() {
-    return recoveringAfterStartup;
-  }
-
-  final public void setRecoveringAfterStartup(boolean recoveringAfterStartup) {
-    this.recoveringAfterStartup = recoveringAfterStartup;
-  }
-
-  // make sure any threads stop retrying
-  @Override
-  final public void close() {
-    close = true;
-    if (prevSendPreRecoveryHttpUriRequest != null) {
-      prevSendPreRecoveryHttpUriRequest.abort();
-    }
-    log.warn("Stopping recovery for core=[{}] coreNodeName=[{}]", coreName, coreZkNodeName);
-  }
-
-  final private void recoveryFailed(final SolrCore core,
-      final ZkController zkController, final String baseUrl,
-      final String shardZkNodeName, final CoreDescriptor cd) throws Exception {
-    SolrException.log(log, "Recovery failed - I give up.");
-    try {
-      zkController.publish(cd, Replica.State.RECOVERY_FAILED);
-    } finally {
-      close();
-      recoveryListener.failed();
-    }
-  }
-  
-  /**
-   * This method may change in future and customisations are not supported
-   * between versions in terms of API or back compat behaviour.
-   * @lucene.experimental
-   */
-  protected String getReplicateLeaderUrl(ZkNodeProps leaderprops) {
-    return new ZkCoreNodeProps(leaderprops).getCoreUrl();
-  }
-
-  final private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops)
-      throws SolrServerException, IOException {
-
-    final String leaderUrl = getReplicateLeaderUrl(leaderprops);
-    
-    log.info("Attempting to replicate from [{}].", leaderUrl);
-    
-    // send commit
-    commitOnLeader(leaderUrl);
-    
-    // use rep handler directly, so we can do this sync rather than async
-    SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
-    ReplicationHandler replicationHandler = (ReplicationHandler) handler;
-    
-    if (replicationHandler == null) {
-      throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
-          "Skipping recovery, no " + ReplicationHandler.PATH + " handler found");
-    }
-    
-    ModifiableSolrParams solrParams = new ModifiableSolrParams();
-    solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl);
-    solrParams.set(ReplicationHandler.SKIP_COMMIT_ON_MASTER_VERSION_ZERO, replicaType == Replica.Type.TLOG);
-    // always download the tlogs from the leader when running with cdcr enabled. We need to have all the tlogs
-    // to ensure leader failover doesn't cause missing docs on the target
-    if (core.getUpdateHandler().getUpdateLog() != null && core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog) {
-      solrParams.set(ReplicationHandler.TLOG_FILES, true);
-    }
-    
-    if (isClosed()) return; // we check closed on return
-    boolean success = replicationHandler.doFetch(solrParams, false).getSuccessful();
-    
-    if (!success) {
-      throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed.");
-    }
-    
-    // solrcloud_debug
-    if (log.isDebugEnabled()) {
-      try {
-        RefCounted<SolrIndexSearcher> searchHolder = core
-            .getNewestSearcher(false);
-        SolrIndexSearcher searcher = searchHolder.get();
-        Directory dir = core.getDirectoryFactory().get(core.getIndexDir(), DirContext.META_DATA, null);
-        try {
-          log.debug(core.getCoreContainer()
-              .getZkController().getNodeName()
-              + " replicated "
-              + searcher.count(new MatchAllDocsQuery())
-              + " from "
-              + leaderUrl
-              + " gen:"
-              + (core.getDeletionPolicy().getLatestCommit() != null ? "null" : core.getDeletionPolicy().getLatestCommit().getGeneration())
-              + " data:" + core.getDataDir()
-              + " index:" + core.getIndexDir()
-              + " newIndex:" + core.getNewIndexDir()
-              + " files:" + Arrays.asList(dir.listAll()));
-        } finally {
-          core.getDirectoryFactory().release(dir);
-          searchHolder.decref();
-        }
-      } catch (Exception e) {
-        log.debug("Error in solrcloud_debug block", e);
-      }
-    }
-
-  }
-
-  final private void commitOnLeader(String leaderUrl) throws SolrServerException,
-      IOException {
-    try (HttpSolrClient client = new HttpSolrClient.Builder(leaderUrl)
-        .withConnectionTimeout(30000)
-        .build()) {
-      UpdateRequest ureq = new UpdateRequest();
-      ureq.setParams(new ModifiableSolrParams());
-      ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
-//      ureq.getParams().set(UpdateParams.OPEN_SEARCHER, onlyLeaderIndexes);// Why do we need to open searcher if "onlyLeaderIndexes"?
-      ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
-      ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
-          client);
-    }
-  }
-
-  @Override
-  final public void run() {
-
-    // set request info for logging
-    try (SolrCore core = cc.getCore(coreName)) {
-
-      if (core == null) {
-        SolrException.log(log, "SolrCore not found - cannot recover:" + coreName);
-        return;
-      }
-      MDCLoggingContext.setCore(core);
-
-      log.info("Starting recovery process. recoveringAfterStartup=" + recoveringAfterStartup);
-
-      try {
-        doRecovery(core);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        SolrException.log(log, "", e);
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
-      } catch (Exception e) {
-        log.error("", e);
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
-      }
-    } finally {
-      MDCLoggingContext.clear();
-    }
-  }
-  
-  final public void doRecovery(SolrCore core) throws Exception {
-    if (core.getCoreDescriptor().getCloudDescriptor().requiresTransactionLog()) {
-      doSyncOrReplicateRecovery(core);
-    } else {
-      doReplicateOnlyRecovery(core);
-    }
-  }
-
-  final private void doReplicateOnlyRecovery(SolrCore core) throws InterruptedException {
-    boolean successfulRecovery = false;
-
-//  if (core.getUpdateHandler().getUpdateLog() != null) {
-//    SolrException.log(log, "'replicate-only' recovery strategy should only be used if no update logs are present, but this core has one: "
-//        + core.getUpdateHandler().getUpdateLog());
-//    return;
-//  }
-    while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
-      try {
-        CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
-        ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
-            cloudDesc.getCollectionName(), cloudDesc.getShardId());
-        final String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
-        final String leaderCoreName = leaderprops.getStr(ZkStateReader.CORE_NAME_PROP);
-
-        String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
-
-        String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
-
-        boolean isLeader = leaderUrl.equals(ourUrl); //TODO: We can probably delete most of this code if we say this strategy can only be used for pull replicas
-        if (isLeader && !cloudDesc.isLeader()) {
-          throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
-        }
-        if (cloudDesc.isLeader()) {
-          assert cloudDesc.getReplicaType() != Replica.Type.PULL;
-          // we are now the leader - no one else must have been suitable
-          log.warn("We have not yet recovered - but we are now the leader!");
-          log.info("Finished recovery process.");
-          zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
-          return;
-        }
-
-
-        log.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leaderUrl,
-            ourUrl);
-        zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
-
-        if (isClosed()) {
-          log.info("Recovery for core {} has been closed", core.getName());
-          break;
-        }
-        log.info("Starting Replication Recovery.");
-
-        try {
-          log.info("Stopping background replicate from leader process");
-          zkController.stopReplicationFromLeader(coreName);
-          replicate(zkController.getNodeName(), core, leaderprops);
-
-          if (isClosed()) {
-            log.info("Recovery for core {} has been closed", core.getName());
-            break;
-          }
-
-          log.info("Replication Recovery was successful.");
-          successfulRecovery = true;
-        } catch (Exception e) {
-          SolrException.log(log, "Error while trying to recover", e);
-        }
-
-      } catch (Exception e) {
-        SolrException.log(log, "Error while trying to recover. core=" + coreName, e);
-      } finally {
-        if (successfulRecovery) {
-          log.info("Restaring background replicate from leader process");
-          zkController.startReplicationFromLeader(coreName, false);
-          log.info("Registering as Active after recovery.");
-          try {
-            zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
-          } catch (Exception e) {
-            log.error("Could not publish as ACTIVE after succesful recovery", e);
-            successfulRecovery = false;
-          }
-
-          if (successfulRecovery) {
-            close = true;
-            recoveryListener.recovered();
-          }
-        }
-      }
-
-      if (!successfulRecovery) {
-        // lets pause for a moment and we need to try again...
-        // TODO: we don't want to retry for some problems?
-        // Or do a fall off retry...
-        try {
-
-          if (isClosed()) {
-            log.info("Recovery for core {} has been closed", core.getName());
-            break;
-          }
-
-          log.error("Recovery failed - trying again... (" + retries + ")");
-
-          retries++;
-          if (retries >= maxRetries) {
-            SolrException.log(log, "Recovery failed - max retries exceeded (" + retries + ").");
-            try {
-              recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor());
-            } catch (Exception e) {
-              SolrException.log(log, "Could not publish that recovery failed", e);
-            }
-            break;
-          }
-        } catch (Exception e) {
-          SolrException.log(log, "An error has occurred during recovery", e);
-        }
-
-        try {
-          // Wait an exponential interval between retries, start at 5 seconds and work up to a minute.
-          // If we're at attempt >= 4, there's no point computing pow(2, retries) because the result
-          // will always be the minimum of the two (12). Since we sleep at 5 seconds sub-intervals in
-          // order to check if we were closed, 12 is chosen as the maximum loopCount (5s * 12 = 1m).
-          int loopCount = retries < 4 ? (int) Math.min(Math.pow(2, retries), 12) : 12;
-          log.info("Wait [{}] seconds before trying to recover again (attempt={})",
-              TimeUnit.MILLISECONDS.toSeconds(loopCount * startingRecoveryDelayMilliSeconds), retries);
-          for (int i = 0; i < loopCount; i++) {
-            if (isClosed()) {
-              log.info("Recovery for core {} has been closed", core.getName());
-              break; // check if someone closed us
-            }
-            Thread.sleep(startingRecoveryDelayMilliSeconds);
-          }
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          log.warn("Recovery was interrupted.", e);
-          close = true;
-        }
-      }
-
-    }
-    // We skip core.seedVersionBuckets(); We don't have a transaction log
-    log.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
-  }
-
-  // TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
-  public final void doSyncOrReplicateRecovery(SolrCore core) throws Exception {
-    boolean successfulRecovery = false;
-
-    UpdateLog ulog;
-    ulog = core.getUpdateHandler().getUpdateLog();
-    if (ulog == null) {
-      SolrException.log(log, "No UpdateLog found - cannot recover.");
-      recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
-          core.getCoreDescriptor());
-      return;
-    }
-
-    // we temporary ignore peersync for tlog replicas
-    boolean firstTime = replicaType != Replica.Type.TLOG;
-
-    List<Long> recentVersions;
-    try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
-      recentVersions = recentUpdates.getVersions(ulog.getNumRecordsToKeep());
-    } catch (Exception e) {
-      SolrException.log(log, "Corrupt tlog - ignoring.", e);
-      recentVersions = new ArrayList<>(0);
-    }
-
-    List<Long> startingVersions = ulog.getStartingVersions();
-
-    if (startingVersions != null && recoveringAfterStartup) {
-      try {
-        int oldIdx = 0; // index of the start of the old list in the current list
-        long firstStartingVersion = startingVersions.size() > 0 ? startingVersions.get(0) : 0;
-        
-        for (; oldIdx < recentVersions.size(); oldIdx++) {
-          if (recentVersions.get(oldIdx) == firstStartingVersion) break;
-        }
-        
-        if (oldIdx > 0) {
-          log.info("Found new versions added after startup: num=[{}]", oldIdx);
-          log.info("currentVersions size={} range=[{} to {}]", recentVersions.size(), recentVersions.get(0), recentVersions.get(recentVersions.size()-1));
-        }
-
-        if (startingVersions.isEmpty()) {
-          log.info("startupVersions is empty");
-        } else {
-          log.info("startupVersions size={} range=[{} to {}]", startingVersions.size(), startingVersions.get(0), startingVersions.get(startingVersions.size()-1));
-        }
-      } catch (Exception e) {
-        SolrException.log(log, "Error getting recent versions.", e);
-        recentVersions = new ArrayList<>(0);
-      }
-    }
-
-    if (recoveringAfterStartup) {
-      // if we're recovering after startup (i.e. we have been down), then we need to know what the last versions were
-      // when we went down.  We may have received updates since then.
-      recentVersions = startingVersions;
-      try {
-        if (ulog.existOldBufferLog()) {
-          // this means we were previously doing a full index replication
-          // that probably didn't complete and buffering updates in the
-          // meantime.
-          log.info("Looks like a previous replication recovery did not complete - skipping peer sync.");
-          firstTime = false; // skip peersync
-        }
-      } catch (Exception e) {
-        SolrException.log(log, "Error trying to get ulog starting operation.", e);
-        firstTime = false; // skip peersync
-      }
-    }
-
-    if (replicaType == Replica.Type.TLOG) {
-      zkController.stopReplicationFromLeader(coreName);
-    }
-
-    final String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
-    Future<RecoveryInfo> replayFuture = null;
-    while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
-      try {
-        CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
-        final Replica leader = pingLeader(ourUrl, core.getCoreDescriptor(), true);
-        if (isClosed()) {
-          log.info("RecoveryStrategy has been closed");
-          break;
-        }
-
-        boolean isLeader = leader.getCoreUrl().equals(ourUrl);
-        if (isLeader && !cloudDesc.isLeader()) {
-          throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
-        }
-        if (cloudDesc.isLeader()) {
-          // we are now the leader - no one else must have been suitable
-          log.warn("We have not yet recovered - but we are now the leader!");
-          log.info("Finished recovery process.");
-          zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
-          return;
-        }
-
-        log.info("Begin buffering updates. core=[{}]", coreName);
-        // recalling buffer updates will drop the old buffer tlog
-        ulog.bufferUpdates();
-
-        log.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leader.getCoreUrl(),
-            ourUrl);
-        zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
-        
-        
-        final Slice slice = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName())
-            .getSlice(cloudDesc.getShardId());
-            
-        try {
-          prevSendPreRecoveryHttpUriRequest.abort();
-        } catch (NullPointerException e) {
-          // okay
-        }
-        
-        if (isClosed()) {
-          log.info("RecoveryStrategy has been closed");
-          break;
-        }
-
-        sendPrepRecoveryCmd(leader.getBaseUrl(), leader.getCoreName(), slice);
-        
-        if (isClosed()) {
-          log.info("RecoveryStrategy has been closed");
-          break;
-        }
-        
-        // we wait a bit so that any updates on the leader
-        // that started before they saw recovering state 
-        // are sure to have finished (see SOLR-7141 for
-        // discussion around current value)
-        //TODO since SOLR-11216, we probably won't need this
-        try {
-          Thread.sleep(waitForUpdatesWithStaleStatePauseMilliSeconds);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        }
-
-        // first thing we just try to sync
-        if (firstTime) {
-          firstTime = false; // only try sync the first time through the loop
-          log.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leader.getCoreUrl(), recoveringAfterStartup);
-          // System.out.println("Attempting to PeerSync from " + leaderUrl
-          // + " i am:" + zkController.getNodeName());
-          PeerSyncWithLeader peerSyncWithLeader = new PeerSyncWithLeader(core,
-              leader.getCoreUrl(), ulog.getNumRecordsToKeep());
-          boolean syncSuccess = peerSyncWithLeader.sync(recentVersions).isSuccess();
-          if (syncSuccess) {
-            SolrQueryRequest req = new LocalSolrQueryRequest(core,
-                new ModifiableSolrParams());
-            // force open a new searcher
-            core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
-            req.close();
-            log.info("PeerSync stage of recovery was successful.");
-
-            // solrcloud_debug
-            cloudDebugLog(core, "synced");
-            
-            log.info("Replaying updates buffered during PeerSync.");
-            replay(core);
-
-            // sync success
-            successfulRecovery = true;
-            return;
-          }
-
-          log.info("PeerSync Recovery was not successful - trying replication.");
-        }
-
-        if (isClosed()) {
-          log.info("RecoveryStrategy has been closed");
-          break;
-        }
-        
-        log.info("Starting Replication Recovery.");
-
-        try {
-
-          replicate(zkController.getNodeName(), core, leader);
-
-          if (isClosed()) {
-            log.info("RecoveryStrategy has been closed");
-            break;
-          }
-
-          replayFuture = replay(core);
-
-          if (isClosed()) {
-            log.info("RecoveryStrategy has been closed");
-            break;
-          }
-
-          log.info("Replication Recovery was successful.");
-          successfulRecovery = true;
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          log.warn("Recovery was interrupted", e);
-          close = true;
-        } catch (Exception e) {
-          SolrException.log(log, "Error while trying to recover", e);
-        }
-
-      } catch (Exception e) {
-        SolrException.log(log, "Error while trying to recover. core=" + coreName, e);
-      } finally {
-        if (successfulRecovery) {
-          log.info("Registering as Active after recovery.");
-          try {
-            if (replicaType == Replica.Type.TLOG) {
-              zkController.startReplicationFromLeader(coreName, true);
-            }
-            zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
-          } catch (Exception e) {
-            log.error("Could not publish as ACTIVE after succesful recovery", e);
-            successfulRecovery = false;
-          }
-          
-          if (successfulRecovery) {
-            close = true;
-            recoveryListener.recovered();
-          }
-        }
-      }
-
-      if (!successfulRecovery) {
-        // lets pause for a moment and we need to try again...
-        // TODO: we don't want to retry for some problems?
-        // Or do a fall off retry...
-        try {
-
-          if (isClosed()) {
-            log.info("RecoveryStrategy has been closed");
-            break;
-          }
-          
-          log.error("Recovery failed - trying again... (" + retries + ")");
-          
-          retries++;
-          if (retries >= maxRetries) {
-            SolrException.log(log, "Recovery failed - max retries exceeded (" + retries + ").");
-            try {
-              recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor());
-            } catch (Exception e) {
-              SolrException.log(log, "Could not publish that recovery failed", e);
-            }
-            break;
-          }
-        } catch (Exception e) {
-          SolrException.log(log, "An error has occurred during recovery", e);
-        }
-
-        try {
-          // Wait an exponential interval between retries, start at 5 seconds and work up to a minute.
-          // If we're at attempt >= 4, there's no point computing pow(2, retries) because the result 
-          // will always be the minimum of the two (12). Since we sleep at 5 seconds sub-intervals in
-          // order to check if we were closed, 12 is chosen as the maximum loopCount (5s * 12 = 1m).
-          double loopCount = retries < 4 ? Math.min(Math.pow(2, retries), 12) : 12;
-          log.info("Wait [{}] seconds before trying to recover again (attempt={})", loopCount, retries);
-          for (int i = 0; i < loopCount; i++) {
-            if (isClosed()) {
-              log.info("RecoveryStrategy has been closed");
-              break; // check if someone closed us
-            }
-            Thread.sleep(startingRecoveryDelayMilliSeconds);
-          }
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          log.warn("Recovery was interrupted.", e);
-          close = true;
-        }
-      }
-
-    }
-
-    // if replay was skipped (possibly to due pulling a full index from the leader),
-    // then we still need to update version bucket seeds after recovery
-    if (successfulRecovery && replayFuture == null) {
-      log.info("Updating version bucket highest from index after successful recovery.");
-      core.seedVersionBuckets();
-    }
-
-    log.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
-  }
-
-  private final Replica pingLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown) throws Exception {
-    int numTried = 0;
-    while (true) {
-      CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
-      DocCollection docCollection = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName());
-      if (!isClosed() && mayPutReplicaAsDown && numTried == 1 &&
-          docCollection.getReplica(coreDesc.getCloudDescriptor().getCoreNodeName()).getState() == Replica.State.ACTIVE) {
-        // this operation may take a long time, by putting replica into DOWN state, client won't query this replica
-        zkController.publish(coreDesc, Replica.State.DOWN);
-      }
-      numTried++;
-      Replica leaderReplica = null;
-
-      if (isClosed()) {
-        return leaderReplica;
-      }
-
-      try {
-        leaderReplica = zkStateReader.getLeaderRetry(
-            cloudDesc.getCollectionName(), cloudDesc.getShardId());
-      } catch (SolrException e) {
-        Thread.sleep(500);
-        continue;
-      }
-
-      if (leaderReplica.getCoreUrl().equals(ourUrl)) {
-        return leaderReplica;
-      }
-
-      try (HttpSolrClient httpSolrClient = new HttpSolrClient.Builder(leaderReplica.getCoreUrl())
-          .withSocketTimeout(1000)
-          .withConnectionTimeout(1000)
-          .build()) {
-        SolrPingResponse resp = httpSolrClient.ping();
-        return leaderReplica;
-      } catch (IOException e) {
-        log.info("Failed to connect leader {} on recovery, try again", leaderReplica.getBaseUrl());
-        Thread.sleep(500);
-      } catch (Exception e) {
-        if (e.getCause() instanceof IOException) {
-          log.info("Failed to connect leader {} on recovery, try again", leaderReplica.getBaseUrl());
-          Thread.sleep(500);
-        } else {
-          return leaderReplica;
-        }
-      }
-    }
-  }
-
-  public static Runnable testing_beforeReplayBufferingUpdates;
-
-  final private Future<RecoveryInfo> replay(SolrCore core)
-      throws InterruptedException, ExecutionException {
-    if (testing_beforeReplayBufferingUpdates != null) {
-      testing_beforeReplayBufferingUpdates.run();
-    }
-    if (replicaType == Replica.Type.TLOG) {
-      // roll over all updates during buffering to new tlog, make RTG available
-      SolrQueryRequest req = new LocalSolrQueryRequest(core,
-          new ModifiableSolrParams());
-      core.getUpdateHandler().getUpdateLog().copyOverBufferingUpdates(new CommitUpdateCommand(req, false));
-      req.close();
-      return null;
-    }
-    Future<RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().applyBufferedUpdates();
-    if (future == null) {
-      // no replay needed\
-      log.info("No replay needed.");
-    } else {
-      log.info("Replaying buffered documents.");
-      // wait for replay
-      RecoveryInfo report = future.get();
-      if (report.failed) {
-        SolrException.log(log, "Replay failed");
-        throw new SolrException(ErrorCode.SERVER_ERROR, "Replay failed");
-      }
-    }
-
-    // the index may ahead of the tlog's caches after recovery, by calling this tlog's caches will be purged
-    core.getUpdateHandler().getUpdateLog().openRealtimeSearcher();
-    
-    // solrcloud_debug
-    cloudDebugLog(core, "replayed");
-    
-    return future;
-  }
-  
-  final private void cloudDebugLog(SolrCore core, String op) {
-    if (!log.isDebugEnabled()) {
-      return;
-    }
-    try {
-      RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
-      SolrIndexSearcher searcher = searchHolder.get();
-      try {
-        final int totalHits = searcher.count(new MatchAllDocsQuery());
-        final String nodeName = core.getCoreContainer().getZkController().getNodeName();
-        log.debug("[{}] {} [{} total hits]", nodeName, op, totalHits);
-      } finally {
-        searchHolder.decref();
-      }
-    } catch (Exception e) {
-      log.debug("Error in solrcloud_debug block", e);
-    }
-  }
-
-  final public boolean isClosed() {
-    return close;
-  }
-  
-  final private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice)
-      throws SolrServerException, IOException, InterruptedException, ExecutionException {
-
-    WaitForState prepCmd = new WaitForState();
-    prepCmd.setCoreName(leaderCoreName);
-    prepCmd.setNodeName(zkController.getNodeName());
-    prepCmd.setCoreNodeName(coreZkNodeName);
-    prepCmd.setState(Replica.State.RECOVERING);
-    prepCmd.setCheckLive(true);
-    prepCmd.setOnlyIfLeader(true);
-    final Slice.State state = slice.getState();
-    if (state != Slice.State.CONSTRUCTION && state != Slice.State.RECOVERY && state != Slice.State.RECOVERY_FAILED) {
-      prepCmd.setOnlyIfLeaderActive(true);
-    }
-
-    int conflictWaitMs = zkController.getLeaderConflictResolveWait();
-    // timeout after 5 seconds more than the max timeout (conflictWait + 3 seconds) on the server side
-    int readTimeout = conflictWaitMs + 8000;
-    try (HttpSolrClient client = new HttpSolrClient.Builder(leaderBaseUrl).build()) {
-      client.setConnectionTimeout(10000);
-      client.setSoTimeout(readTimeout);
-      HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd);
-      prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest;
-
-      log.info("Sending prep recovery command to [{}]; [{}]", leaderBaseUrl, prepCmd.toString());
-
-      mrr.future.get();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
deleted file mode 100644
index 5fb0946..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-import java.lang.invoke.MethodHandles;
-
-import org.apache.lucene.index.IndexCommit;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.core.CoreContainer;
-import org.apache.solr.core.SolrConfig;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.handler.IndexFetcher;
-import org.apache.solr.handler.ReplicationHandler;
-import org.apache.solr.request.LocalSolrQueryRequest;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.update.CommitUpdateCommand;
-import org.apache.solr.update.SolrIndexWriter;
-import org.apache.solr.update.UpdateLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ReplicateFromLeader {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private CoreContainer cc;
-  private String coreName;
-
-  private ReplicationHandler replicationProcess;
-  private long lastVersion = 0;
-
-  public ReplicateFromLeader(CoreContainer cc, String coreName) {
-    this.cc = cc;
-    this.coreName = coreName;
-  }
-
-  /**
-   * Start a replication handler thread that will periodically pull indices from the shard leader
-   * @param switchTransactionLog if true, ReplicationHandler will rotate the transaction log once
-   * the replication is done
-   */
-  public void startReplication(boolean switchTransactionLog) throws InterruptedException {
-    try (SolrCore core = cc.getCore(coreName)) {
-      if (core == null) {
-        if (cc.isShutDown()) {
-          return;
-        } else {
-          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "SolrCore not found:" + coreName + " in " + cc.getLoadedCoreNames());
-        }
-      }
-      SolrConfig.UpdateHandlerInfo uinfo = core.getSolrConfig().getUpdateHandlerInfo();
-      String pollIntervalStr = "00:00:03";
-      if (uinfo.autoCommmitMaxTime != -1) {
-        pollIntervalStr = toPollIntervalStr(uinfo.autoCommmitMaxTime/2);
-      } else if (uinfo.autoSoftCommmitMaxTime != -1) {
-        pollIntervalStr = toPollIntervalStr(uinfo.autoSoftCommmitMaxTime/2);
-      }
-      log.info("Will start replication from leader with poll interval: {}", pollIntervalStr );
-
-      NamedList<Object> slaveConfig = new NamedList<>();
-      slaveConfig.add("fetchFromLeader", Boolean.TRUE);
-      slaveConfig.add(ReplicationHandler.SKIP_COMMIT_ON_MASTER_VERSION_ZERO, switchTransactionLog);
-      slaveConfig.add("pollInterval", pollIntervalStr);
-      NamedList<Object> replicationConfig = new NamedList<>();
-      replicationConfig.add("slave", slaveConfig);
-
-      String lastCommitVersion = getCommitVersion(core);
-      if (lastCommitVersion != null) {
-        lastVersion = Long.parseLong(lastCommitVersion);
-      }
-
-      replicationProcess = new ReplicationHandler();
-      if (switchTransactionLog) {
-        replicationProcess.setPollListener((solrCore, fetchResult) -> {
-          if (fetchResult == IndexFetcher.IndexFetchResult.INDEX_FETCH_SUCCESS) {
-            String commitVersion = getCommitVersion(core);
-            if (commitVersion == null) return;
-            if (Long.parseLong(commitVersion) == lastVersion) return;
-            UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog();
-            SolrQueryRequest req = new LocalSolrQueryRequest(core,
-                new ModifiableSolrParams());
-            CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
-            cuc.setVersion(Long.parseLong(commitVersion));
-            updateLog.commitAndSwitchToNewTlog(cuc);
-            lastVersion = Long.parseLong(commitVersion);
-          }
-        });
-      }
-      replicationProcess.init(replicationConfig);
-      replicationProcess.inform(core);
-    }
-  }
-
-  public static String getCommitVersion(SolrCore solrCore) {
-    IndexCommit commit = solrCore.getDeletionPolicy().getLatestCommit();
-    try {
-      String commitVersion = commit.getUserData().get(SolrIndexWriter.COMMIT_COMMAND_VERSION);
-      if (commitVersion == null) return null;
-      else return commitVersion;
-    } catch (Exception e) {
-      log.warn("Cannot get commit command version from index commit point ",e);
-      return null;
-    }
-  }
-
-  private static String toPollIntervalStr(int ms) {
-    int sec = ms/1000;
-    int hour = sec / 3600;
-    sec = sec % 3600;
-    int min = sec / 60;
-    sec = sec % 60;
-    return hour + ":" + min + ":" + sec;
-  }
-
-  public void stopReplication() {
-    if (replicationProcess != null) {
-      replicationProcess.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java b/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
deleted file mode 100644
index 0cb6cbe..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import java.util.List;
-import org.apache.lucene.util.PriorityQueue;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-
-/**
- * A size limited distributed map maintained in zk.
- * Oldest znodes (as per modification time) are evicted as newer ones come in.
- *
- * When the map hits the specified maximum size, the oldest <code>maxSize / 10</code> items
- * are evicted on the next {@link #put(String, byte[])} invocation.
- */
-public class SizeLimitedDistributedMap extends DistributedMap {
-
-  private final int maxSize;
-
-  /**
-   * This observer will be called when this map overflows, and deletes the excess of elements
-   */
-  private final OnOverflowObserver onOverflowObserver;
-
-  public SizeLimitedDistributedMap(SolrZkClient zookeeper, String dir, int maxSize) {
-    this(zookeeper, dir, maxSize, null);
-  }
-  
-  public SizeLimitedDistributedMap(SolrZkClient zookeeper, String dir, int maxSize, OnOverflowObserver onOverflowObserver) {
-    super(zookeeper, dir);
-    this.maxSize = maxSize;
-    this.onOverflowObserver = onOverflowObserver;
-  }
-
-  @Override
-  public void put(String trackingId, byte[] data) throws KeeperException, InterruptedException {
-    if (this.size() >= maxSize) {
-      // Bring down the size
-      List<String> children = zookeeper.getChildren(dir, null, true);
-
-      int cleanupSize = maxSize / 10;
-
-      final PriorityQueue<Long> priorityQueue = new PriorityQueue<Long>(cleanupSize) {
-        @Override
-        protected boolean lessThan(Long a, Long b) {
-          return (a > b);
-        }
-      };
-
-      for (String child : children) {
-        Stat stat = zookeeper.exists(dir + "/" + child, null, true);
-        priorityQueue.insertWithOverflow(stat.getMzxid());
-      }
-
-      long topElementMzxId = priorityQueue.top();
-
-      for (String child : children) {
-        Stat stat = zookeeper.exists(dir + "/" + child, null, true);
-        if (stat.getMzxid() <= topElementMzxId) {
-          zookeeper.delete(dir + "/" + child, -1, true);
-          if (onOverflowObserver != null) onOverflowObserver.onChildDelete(child.substring(PREFIX.length()));
-        }
-      }
-    }
-
-    super.put(trackingId, data);
-  }
-  
-  interface OnOverflowObserver {
-    void onChildDelete(String child) throws KeeperException, InterruptedException;
-  }
-}