You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2019/03/19 13:27:20 UTC

[lucene-solr] branch branch_8x updated: SOLR-11127: REINDEXCOLLECTION command for re-indexing of existing collections.

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new b778417  SOLR-11127: REINDEXCOLLECTION command for re-indexing of existing collections.
b778417 is described below

commit b778417054e735cf323139a43e84d6262ce9dcd7
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Tue Mar 19 13:41:11 2019 +0100

    SOLR-11127: REINDEXCOLLECTION command for re-indexing of existing collections.
---
 solr/CHANGES.txt                                   |   4 +
 .../src/java/org/apache/solr/cloud/Overseer.java   | 136 +++-
 .../cloud/api/collections/DeleteCollectionCmd.java |   2 +-
 .../OverseerCollectionMessageHandler.java          |   1 +
 .../api/collections/ReindexCollectionCmd.java      | 824 +++++++++++++++++++++
 .../org/apache/solr/handler/StreamHandler.java     |   4 +-
 .../org/apache/solr/handler/admin/ColStatus.java   |   3 +
 .../solr/handler/admin/CollectionsHandler.java     |  31 +
 .../solr/schema/ManagedIndexSchemaFactory.java     |   8 +-
 .../java/org/apache/solr/util/TestInjection.java   |  35 +
 .../apache/solr/cloud/ReindexCollectionTest.java   | 379 ++++++++++
 .../solr/cloud/SystemCollectionCompatTest.java     | 208 ++++++
 solr/solr-ref-guide/src/collections-api.adoc       | 120 +++
 .../solr/client/solrj/io/stream/DaemonStream.java  |  13 +-
 .../solrj/request/CollectionAdminRequest.java      |  92 ++-
 .../solr/common/cloud/CompositeIdRouter.java       |   5 +
 .../org/apache/solr/common/cloud/DocRouter.java    |   1 +
 .../solr/common/cloud/ImplicitDocRouter.java       |   5 +
 .../apache/solr/common/cloud/PlainIdRouter.java    |   5 +
 .../solr/common/params/CollectionParams.java       |   4 +-
 .../java/org/apache/solr/common/util/Utils.java    |   3 +-
 21 files changed, 1865 insertions(+), 18 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 56359dc..bc1de23 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -64,6 +64,10 @@ New Features
 
 * SOLR-13292: Provide extended per-segment status of a collection. (ab)
 
+* SOLR-11127: REINDEXCOLLECTION command for re-indexing of existing collections. This issue also adds
+  a back-compat check of the .system collection to notify users of potential compatibility issues after
+  upgrades or schema changes. (ab)
+
 Bug Fixes
 ----------------------
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 91b7e74..a89926f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -21,15 +21,23 @@ import static org.apache.solr.common.params.CommonParams.ID;
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.function.BiConsumer;
 
+import org.apache.lucene.util.Version;
+import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.ClusterStateProvider;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
 import org.apache.solr.cloud.autoscaling.OverseerTriggerThread;
 import org.apache.solr.cloud.overseer.ClusterStateMutator;
@@ -45,11 +53,15 @@ import org.apache.solr.common.SolrCloseable;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.ConnectionManager;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionAdminParams;
 import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.Pair;
 import org.apache.solr.common.util.Utils;
@@ -524,6 +536,8 @@ public class Overseer implements SolrCloseable {
   private Stats stats;
   private String id;
   private volatile boolean closed;
+  private volatile boolean systemCollCompatCheck = true;
+
   private CloudConfig config;
 
   // overseer not responsible for closing reader
@@ -570,10 +584,130 @@ public class Overseer implements SolrCloseable {
     updaterThread.start();
     ccThread.start();
     triggerThread.start();
- 
+
+    systemCollectionCompatCheck(new BiConsumer<String, Object>() {
+      boolean firstPair = true;
+      @Override
+      public void accept(String s, Object o) {
+        if (firstPair) {
+          log.warn("WARNING: Collection '.system' may need re-indexing due to compatibility issues listed below. See REINDEXCOLLECTION documentation for more details.");
+          firstPair = false;
+        }
+        log.warn("WARNING: *\t{}:\t{}", s, o);
+      }
+    });
+
     assert ObjectReleaseTracker.track(this);
   }
 
+  public void systemCollectionCompatCheck(final BiConsumer<String, Object> consumer) {
+    ClusterState clusterState = zkController.getClusterState();
+    if (clusterState == null) {
+      log.warn("Unable to check back-compat of .system collection - can't obtain ClusterState.");
+      return;
+    }
+    DocCollection coll = clusterState.getCollectionOrNull(CollectionAdminParams.SYSTEM_COLL);
+    if (coll == null) {
+      return;
+    }
+    // check that all shard leaders are active
+    boolean allActive = true;
+    for (Slice s : coll.getActiveSlices()) {
+      if (s.getLeader() == null || !s.getLeader().isActive(clusterState.getLiveNodes())) {
+        allActive = false;
+        break;
+      }
+    }
+    if (allActive) {
+      doCompatCheck(consumer);
+    } else {
+      // wait for all leaders to become active and then check
+      zkController.zkStateReader.registerCollectionStateWatcher(CollectionAdminParams.SYSTEM_COLL, (liveNodes, state) -> {
+        boolean active = true;
+        if (state == null || liveNodes.isEmpty()) {
+          return true;
+        }
+        for (Slice s : state.getActiveSlices()) {
+          if (s.getLeader() == null || !s.getLeader().isActive(liveNodes)) {
+            active = false;
+            break;
+          }
+        }
+        if (active) {
+          doCompatCheck(consumer);
+        }
+        return active;
+      });
+    }
+  }
+
+  private void doCompatCheck(BiConsumer<String, Object> consumer) {
+    if (systemCollCompatCheck) {
+      systemCollCompatCheck = false;
+    } else {
+      return;
+    }
+    try (CloudSolrClient client = new CloudSolrClient.Builder(Collections.singletonList(getZkController().getZkServerAddress()), Optional.empty())
+          .withSocketTimeout(30000).withConnectionTimeout(15000)
+        .withHttpClient(updateShardHandler.getDefaultHttpClient()).build()) {
+      CollectionAdminRequest.ColStatus req = CollectionAdminRequest.collectionStatus(CollectionAdminParams.SYSTEM_COLL)
+          .setWithSegments(true)
+          .setWithFieldInfo(true);
+      CollectionAdminResponse rsp = req.process(client);
+      NamedList<Object> status = (NamedList<Object>)rsp.getResponse().get(CollectionAdminParams.SYSTEM_COLL);
+      Collection<String> nonCompliant = (Collection<String>)status.get("schemaNonCompliant");
+      if (!nonCompliant.contains("(NONE)")) {
+        consumer.accept("indexFieldsNotMatchingSchema", nonCompliant);
+      }
+      Set<Integer> segmentCreatedMajorVersions = new HashSet<>();
+      Set<String> segmentVersions = new HashSet<>();
+      int currentMajorVersion = Version.LATEST.major;
+      String currentVersion = Version.LATEST.toString();
+      segmentVersions.add(currentVersion);
+      segmentCreatedMajorVersions.add(currentMajorVersion);
+      NamedList<Object> shards = (NamedList<Object>)status.get("shards");
+      for (Map.Entry<String, Object> entry : shards) {
+        NamedList<Object> leader = (NamedList<Object>)((NamedList<Object>)entry.getValue()).get("leader");
+        if (leader == null) {
+          continue;
+        }
+        NamedList<Object> segInfos = (NamedList<Object>)leader.get("segInfos");
+        if (segInfos == null) {
+          continue;
+        }
+        NamedList<Object> infos = (NamedList<Object>)segInfos.get("info");
+        if (((Number)infos.get("numSegments")).intValue() > 0) {
+          segmentVersions.add(infos.get("minSegmentLuceneVersion").toString());
+        }
+        if (infos.get("commitLuceneVersion") != null) {
+          segmentVersions.add(infos.get("commitLuceneVersion").toString());
+        }
+        NamedList<Object> segmentInfos = (NamedList<Object>)segInfos.get("segments");
+        segmentInfos.forEach((k, v) -> {
+          NamedList<Object> segment = (NamedList<Object>)v;
+          segmentVersions.add(segment.get("version").toString());
+          if (segment.get("minVersion") != null) {
+            segmentVersions.add(segment.get("version").toString());
+          }
+          if (segment.get("createdVersionMajor") != null) {
+            segmentCreatedMajorVersions.add(((Number)segment.get("createdVersionMajor")).intValue());
+          }
+        });
+      }
+      if (segmentVersions.size() > 1) {
+        consumer.accept("differentSegmentVersions", segmentVersions);
+        consumer.accept("currentLuceneVersion", currentVersion);
+      }
+      if (segmentCreatedMajorVersions.size() > 1) {
+        consumer.accept("differentMajorSegmentVersions", segmentCreatedMajorVersions);
+        consumer.accept("currentLuceneMajorVersion", currentMajorVersion);
+      }
+
+    } catch (SolrServerException | IOException e) {
+      log.warn("Unable to perform back-compat check of .system collection", e);
+    }
+  }
+
   public Stats getStats() {
     return stats;
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index e5f6f2d..7177f03 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -181,7 +181,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
     }
   }
 
-  private String referencedByAlias(String collection, Aliases aliases) {
+  public static String referencedByAlias(String collection, Aliases aliases) {
     Objects.requireNonNull(aliases);
     return aliases.getCollectionAliasListMap().entrySet().stream()
         .filter(e -> e.getValue().contains(collection))
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index de7b3eb..a1bd826 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -241,6 +241,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
         .put(DELETEREPLICA, new DeleteReplicaCmd(this))
         .put(ADDREPLICA, new AddReplicaCmd(this))
         .put(MOVEREPLICA, new MoveReplicaCmd(this))
+        .put(REINDEXCOLLECTION, new ReindexCollectionCmd(this))
         .put(UTILIZENODE, new UtilizeNodeCmd(this))
         .build()
     ;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
new file mode 100644
index 0000000..553c4bf
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
@@ -0,0 +1,824 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.http.client.HttpClient;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.cloud.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionAdminParams;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Reindex a collection, usually in order to change the index schema.
+ * <p>WARNING: Reindexing is potentially a lossy operation - some indexed data that is not available as
+ * stored fields may be irretrievably lost, so users should use this command with caution, evaluating
+ * the potential impact by using different source and target collection names first, and preserving
+ * the source collection until the evaluation is complete.</p>
+ * <p>Reindexing follows these steps:</p>
+ * <ol>
+ *    <li>creates a temporary collection using the most recent schema of the source collection
+ *    (or the one specified in the parameters, which must already exist), and the shape of the original
+ *    collection, unless overridden by parameters.</li>
+ *    <li>copy the source documents to the temporary collection, using their stored fields and
+ *    reindexing them using the specified schema. NOTE: some data
+ *    loss may occur if the original stored field data is not available!</li>
+ *    <li>create the target collection from scratch with the specified name (or the same as source if not
+ *    specified) and the specified parameters. NOTE: if the target name was not specified or is the same
+ *    as the source collection then a unique sequential collection name will be used.</li>
+ *    <li>copy the documents from the source collection to the target collection.</li>
+ *    <li>if the source and target collection name was the same then set up an alias pointing from the source collection name to the actual
+ *    (sequentially named) target collection</li>
+ *    <li>optionally delete the source collection.</li>
+ * </ol>
+ */
+public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public static final String COMMAND = "cmd";
+  public static final String REINDEX_STATUS = "reindexStatus";
+  public static final String REMOVE_SOURCE = "removeSource";
+  public static final String TARGET = "target";
+  public static final String TARGET_COL_PREFIX = ".rx_";
+  public static final String CHK_COL_PREFIX = ".rx_ck_";
+  public static final String REINDEXING_STATE = CollectionAdminRequest.PROPERTY_PREFIX + "rx";
+
+  public static final String STATE = "state";
+  public static final String PHASE = "phase";
+
+  private static final List<String> COLLECTION_PARAMS = Arrays.asList(
+      ZkStateReader.CONFIGNAME_PROP,
+      ZkStateReader.NUM_SHARDS_PROP,
+      ZkStateReader.NRT_REPLICAS,
+      ZkStateReader.PULL_REPLICAS,
+      ZkStateReader.TLOG_REPLICAS,
+      ZkStateReader.REPLICATION_FACTOR,
+      ZkStateReader.MAX_SHARDS_PER_NODE,
+      "shards",
+      Policy.POLICY,
+      CollectionAdminParams.CREATE_NODE_SET_PARAM,
+      CollectionAdminParams.CREATE_NODE_SET_SHUFFLE_PARAM,
+      ZkStateReader.AUTO_ADD_REPLICAS
+  );
+
+  private final OverseerCollectionMessageHandler ocmh;
+
+  private static AtomicInteger tmpCollectionSeq = new AtomicInteger();
+
+  public enum State {
+    IDLE,
+    RUNNING,
+    ABORTED,
+    FINISHED;
+
+    public String toLower() {
+      return toString().toLowerCase(Locale.ROOT);
+    }
+
+    public static State get(Object p) {
+      if (p == null) {
+        return null;
+      }
+      p = String.valueOf(p).toLowerCase(Locale.ROOT);
+      return states.get(p);
+    }
+    static Map<String, State> states = Collections.unmodifiableMap(
+        Stream.of(State.values()).collect(Collectors.toMap(State::toLower, Function.identity())));
+  }
+
+  public enum Cmd {
+    START,
+    ABORT,
+    STATUS;
+
+    public String toLower() {
+      return toString().toLowerCase(Locale.ROOT);
+    }
+
+    public static Cmd get(String p) {
+      if (p == null) {
+        return null;
+      }
+      p = p.toLowerCase(Locale.ROOT);
+      return cmds.get(p);
+    }
+    static Map<String, Cmd> cmds = Collections.unmodifiableMap(
+        Stream.of(Cmd.values()).collect(Collectors.toMap(Cmd::toLower, Function.identity())));
+  }
+
+  private SolrClientCache solrClientCache;
+  private String zkHost;
+
+  public ReindexCollectionCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+
+    log.debug("*** called: {}", message);
+
+    String collection = message.getStr(CommonParams.NAME);
+    // before resolving aliases
+    String originalCollection = collection;
+    Aliases aliases = ocmh.zkStateReader.getAliases();
+    if (collection != null) {
+      // resolve aliases - the source may be an alias
+      List<String> aliasList = aliases.resolveAliases(collection);
+      if (aliasList != null && !aliasList.isEmpty()) {
+        collection = aliasList.get(0);
+      }
+    }
+
+    if (collection == null || !clusterState.hasCollection(collection)) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Source collection name must be specified and must exist");
+    }
+    String target = message.getStr(TARGET);
+    if (target == null) {
+      target = collection;
+    } else {
+      // resolve aliases
+      List<String> aliasList = aliases.resolveAliases(target);
+      if (aliasList != null && !aliasList.isEmpty()) {
+        target = aliasList.get(0);
+      }
+    }
+    boolean sameTarget = target.equals(collection) || target.equals(originalCollection);
+    boolean removeSource = message.getBool(REMOVE_SOURCE, false);
+    Cmd command = Cmd.get(message.getStr(COMMAND, Cmd.START.toLower()));
+    if (command == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown command: " + message.getStr(COMMAND));
+    }
+    Map<String, Object> reindexingState = getReindexingState(ocmh.cloudManager.getDistribStateManager(), collection);
+    if (!reindexingState.containsKey(STATE)) {
+      reindexingState.put(STATE, State.IDLE.toLower());
+    }
+    State state = State.get(reindexingState.get(STATE));
+    if (command == Cmd.ABORT) {
+      log.info("Abort requested for collection {}, setting the state to ABORTED.", collection);
+      // check that it's running
+      if (state != State.RUNNING) {
+        log.debug("Abort requested for collection {} but command is not running: {}", collection, state);
+        return;
+      }
+      setReindexingState(collection, State.ABORTED, null);
+      reindexingState.put(STATE, "aborting");
+      results.add(REINDEX_STATUS, reindexingState);
+      // if needed the cleanup will be performed by the running instance of the command
+      return;
+    } else if (command == Cmd.STATUS) {
+      results.add(REINDEX_STATUS, reindexingState);
+      return;
+    }
+    // command == Cmd.START
+
+    // check it's not already running
+    if (state == State.RUNNING) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Reindex is already running for collection " + collection +
+          ". If you are sure this is not the case you can issue &cmd=abort to clean up this state.");
+    }
+
+    DocCollection coll = clusterState.getCollection(collection);
+    boolean aborted = false;
+    int batchSize = message.getInt(CommonParams.ROWS, 100);
+    String query = message.getStr(CommonParams.Q, "*:*");
+    String fl = message.getStr(CommonParams.FL, "*");
+    Integer rf = message.getInt(ZkStateReader.REPLICATION_FACTOR, coll.getReplicationFactor());
+    Integer numNrt = message.getInt(ZkStateReader.NRT_REPLICAS, coll.getNumNrtReplicas());
+    Integer numTlog = message.getInt(ZkStateReader.TLOG_REPLICAS, coll.getNumTlogReplicas());
+    Integer numPull = message.getInt(ZkStateReader.PULL_REPLICAS, coll.getNumPullReplicas());
+    int numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, coll.getActiveSlices().size());
+    int maxShardsPerNode = message.getInt(ZkStateReader.MAX_SHARDS_PER_NODE, coll.getMaxShardsPerNode());
+    DocRouter router = coll.getRouter();
+    if (router == null) {
+      router = DocRouter.DEFAULT;
+    }
+
+    String configName = message.getStr(ZkStateReader.CONFIGNAME_PROP, ocmh.zkStateReader.readConfigName(collection));
+    String targetCollection;
+    int seq = tmpCollectionSeq.getAndIncrement();
+    if (sameTarget) {
+      do {
+        targetCollection = TARGET_COL_PREFIX + originalCollection + "_" + seq;
+        if (!clusterState.hasCollection(targetCollection)) {
+          break;
+        }
+        seq = tmpCollectionSeq.getAndIncrement();
+      } while (clusterState.hasCollection(targetCollection));
+    } else {
+      targetCollection = target;
+    }
+    String chkCollection = CHK_COL_PREFIX + originalCollection;
+    String daemonUrl = null;
+    Exception exc = null;
+    boolean createdTarget = false;
+    try {
+      solrClientCache = new SolrClientCache(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
+      zkHost = ocmh.zkStateReader.getZkClient().getZkServerAddress();
+      // set the running flag
+      reindexingState.clear();
+      reindexingState.put("actualSourceCollection", collection);
+      reindexingState.put("actualTargetCollection", targetCollection);
+      reindexingState.put("checkpointCollection", chkCollection);
+      reindexingState.put("inputDocs", getNumberOfDocs(collection));
+      reindexingState.put(PHASE, "creating target and checkpoint collections");
+      setReindexingState(collection, State.RUNNING, reindexingState);
+
+      // 0. set up target and checkpoint collections
+      NamedList<Object> cmdResults = new NamedList<>();
+      ZkNodeProps cmd;
+      if (clusterState.hasCollection(targetCollection)) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Target collection " + targetCollection + " already exists! Delete it first.");
+      }
+      if (clusterState.hasCollection(chkCollection)) {
+        // delete the checkpoint collection
+        cmd = new ZkNodeProps(
+            Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
+            CommonParams.NAME, chkCollection,
+            CoreAdminParams.DELETE_METRICS_HISTORY, "true"
+        );
+        ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
+        checkResults("deleting old checkpoint collection " + chkCollection, cmdResults, true);
+      }
+
+      if (maybeAbort(collection)) {
+        aborted = true;
+        return;
+      }
+
+      Map<String, Object> propMap = new HashMap<>();
+      propMap.put(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower());
+      propMap.put(CommonParams.NAME, targetCollection);
+      propMap.put(ZkStateReader.NUM_SHARDS_PROP, numShards);
+      propMap.put(CollectionAdminParams.COLL_CONF, configName);
+      // init first from the same router
+      propMap.put("router.name", router.getName());
+      for (String key : coll.keySet()) {
+        if (key.startsWith("router.")) {
+          propMap.put(key, coll.get(key));
+        }
+      }
+      // then apply overrides if present
+      for (String key : message.keySet()) {
+        if (key.startsWith("router.")) {
+          propMap.put(key, message.getStr(key));
+        } else if (COLLECTION_PARAMS.contains(key)) {
+          propMap.put(key, message.get(key));
+        }
+      }
+
+      propMap.put(ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode);
+      propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, true);
+      propMap.put(DocCollection.STATE_FORMAT, message.getInt(DocCollection.STATE_FORMAT, coll.getStateFormat()));
+      if (rf != null) {
+        propMap.put(ZkStateReader.REPLICATION_FACTOR, rf);
+      }
+      if (numNrt != null) {
+        propMap.put(ZkStateReader.NRT_REPLICAS, numNrt);
+      }
+      if (numTlog != null) {
+        propMap.put(ZkStateReader.TLOG_REPLICAS, numTlog);
+      }
+      if (numPull != null) {
+        propMap.put(ZkStateReader.PULL_REPLICAS, numPull);
+      }
+      // create the target collection
+      cmd = new ZkNodeProps(propMap);
+      cmdResults = new NamedList<>();
+      ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, cmd, cmdResults);
+      createdTarget = true;
+      checkResults("creating target collection " + targetCollection, cmdResults, true);
+
+      // create the checkpoint collection - use RF=1 and 1 shard
+      cmd = new ZkNodeProps(
+          Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
+          CommonParams.NAME, chkCollection,
+          ZkStateReader.NUM_SHARDS_PROP, "1",
+          ZkStateReader.REPLICATION_FACTOR, "1",
+          DocCollection.STATE_FORMAT, "2",
+          CollectionAdminParams.COLL_CONF, "_default",
+          CommonAdminParams.WAIT_FOR_FINAL_STATE, "true"
+      );
+      cmdResults = new NamedList<>();
+      ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, cmd, cmdResults);
+      checkResults("creating checkpoint collection " + chkCollection, cmdResults, true);
+      // wait for a while until we see both collections
+      TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, ocmh.timeSource);
+      boolean created = false;
+      while (!waitUntil.hasTimedOut()) {
+        waitUntil.sleep(100);
+        // this also refreshes our local var clusterState
+        clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
+        created = clusterState.hasCollection(targetCollection) && clusterState.hasCollection(chkCollection);
+        if (created) break;
+      }
+      if (!created) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create temporary collection(s)");
+      }
+      if (maybeAbort(collection)) {
+        aborted = true;
+        return;
+      }
+
+      // 1. put the source collection in read-only mode
+      cmd = new ZkNodeProps(
+          Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
+          ZkStateReader.COLLECTION_PROP, collection,
+          ZkStateReader.READ_ONLY, "true");
+      ocmh.overseer.offerStateUpdate(Utils.toJSON(cmd));
+
+      TestInjection.injectReindexLatch();
+
+      if (maybeAbort(collection)) {
+        aborted = true;
+        return;
+      }
+
+      // 2. copy the documents to target
+      // Recipe taken from: http://joelsolr.blogspot.com/2016/10/solr-63-batch-jobs-parallel-etl-and.html
+      ModifiableSolrParams q = new ModifiableSolrParams();
+      q.set(CommonParams.QT, "/stream");
+      q.set("collection", collection);
+      q.set("expr",
+          "daemon(id=\"" + targetCollection + "\"," +
+              "terminate=\"true\"," +
+              "commit(" + targetCollection + "," +
+                "update(" + targetCollection + "," +
+                  "batchSize=" + batchSize + "," +
+                  "topic(" + chkCollection + "," +
+                    collection + "," +
+                    "q=\"" + query + "\"," +
+                    "fl=\"" + fl + "\"," +
+                    "id=\"topic_" + targetCollection + "\"," +
+                    // some of the documents eg. in .system contain large blobs
+                    "rows=\"" + batchSize + "\"," +
+                    "initialCheckpoint=\"0\"))))");
+      log.debug("- starting copying documents from " + collection + " to " + targetCollection);
+      SolrResponse rsp = null;
+      try {
+        rsp = ocmh.cloudManager.request(new QueryRequest(q));
+      } catch (Exception e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to copy documents from " +
+            collection + " to " + targetCollection, e);
+      }
+      daemonUrl = getDaemonUrl(rsp, coll);
+      if (daemonUrl == null) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to copy documents from " +
+            collection + " to " + targetCollection + ": " + Utils.toJSONString(rsp));
+      }
+      reindexingState.put("daemonUrl", daemonUrl);
+      reindexingState.put("daemonName", targetCollection);
+      reindexingState.put(PHASE, "copying documents");
+      setReindexingState(collection, State.RUNNING, reindexingState);
+
+      // wait for the daemon to finish
+      waitForDaemon(targetCollection, daemonUrl, collection, targetCollection, reindexingState);
+      if (maybeAbort(collection)) {
+        aborted = true;
+        return;
+      }
+      log.debug("- finished copying from " + collection + " to " + targetCollection);
+      // fail here or earlier during daemon run
+      TestInjection.injectReindexFailure();
+
+      // 5. if (sameTarget) set up an alias to use targetCollection as the source name
+      if (sameTarget) {
+        log.debug("- setting up alias from " + originalCollection + " to " + targetCollection);
+        cmd = new ZkNodeProps(
+            CommonParams.NAME, originalCollection,
+            "collections", targetCollection);
+        cmdResults = new NamedList<>();
+        ocmh.commandMap.get(CollectionParams.CollectionAction.CREATEALIAS).call(clusterState, cmd, results);
+        checkResults("setting up alias " + originalCollection + " -> " + targetCollection, cmdResults, true);
+        reindexingState.put("alias", originalCollection + " -> " + targetCollection);
+      }
+
+      reindexingState.remove("daemonUrl");
+      reindexingState.remove("daemonName");
+      reindexingState.put("processedDocs", getNumberOfDocs(targetCollection));
+      reindexingState.put(PHASE, "copying done, finalizing");
+      setReindexingState(collection, State.RUNNING, reindexingState);
+
+      if (maybeAbort(collection)) {
+        aborted = true;
+        return;
+      }
+      // 6. delete the checkpoint collection
+      log.debug("- deleting " + chkCollection);
+      cmd = new ZkNodeProps(
+          Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
+          CommonParams.NAME, chkCollection,
+          CoreAdminParams.DELETE_METRICS_HISTORY, "true"
+      );
+      cmdResults = new NamedList<>();
+      ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
+      checkResults("deleting checkpoint collection " + chkCollection, cmdResults, true);
+
+      // 7. optionally delete the source collection
+      if (removeSource) {
+        log.debug("- deleting source collection");
+        cmd = new ZkNodeProps(
+            Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
+            CommonParams.NAME, collection,
+            CoreAdminParams.DELETE_METRICS_HISTORY, "true"
+        );
+        cmdResults = new NamedList<>();
+        ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
+        checkResults("deleting source collection " + collection, cmdResults, true);
+      } else {
+        // 8. clear readOnly on source
+        ZkNodeProps props = new ZkNodeProps(
+            Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
+            ZkStateReader.COLLECTION_PROP, collection,
+            ZkStateReader.READ_ONLY, null);
+        ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+      }
+      // 9. set FINISHED state on the target and clear the state on the source
+      ZkNodeProps props = new ZkNodeProps(
+          Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
+          ZkStateReader.COLLECTION_PROP, targetCollection,
+          REINDEXING_STATE, State.FINISHED.toLower());
+      ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+
+      reindexingState.put(STATE, State.FINISHED.toLower());
+      reindexingState.put(PHASE, "done");
+      removeReindexingState(collection);
+    } catch (Exception e) {
+      log.warn("Error during reindexing of " + originalCollection, e);
+      exc = e;
+      aborted = true;
+    } finally {
+      solrClientCache.close();
+      if (aborted) {
+        cleanup(collection, targetCollection, chkCollection, daemonUrl, targetCollection, createdTarget);
+        if (exc != null) {
+          results.add("error", exc.toString());
+        }
+        reindexingState.put(STATE, State.ABORTED.toLower());
+      }
+      results.add(REINDEX_STATUS, reindexingState);
+    }
+  }
+
+  private static final String REINDEXING_STATE_PATH = "/.reindexing";
+
+  private Map<String, Object> setReindexingState(String collection, State state, Map<String, Object> props) throws Exception {
+    String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + REINDEXING_STATE_PATH;
+    DistribStateManager stateManager = ocmh.cloudManager.getDistribStateManager();
+    Map<String, Object> copyProps = new HashMap<>();
+    if (props == null) { // retrieve existing props, if any
+      props = Utils.getJson(stateManager, path);
+    }
+    copyProps.putAll(props);
+    copyProps.put("state", state.toLower());
+    if (stateManager.hasData(path)) {
+      stateManager.setData(path, Utils.toJSON(copyProps), -1);
+    } else {
+      stateManager.makePath(path, Utils.toJSON(copyProps), CreateMode.PERSISTENT, false);
+    }
+    return copyProps;
+  }
+
+  private void removeReindexingState(String collection) throws Exception {
+    String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + REINDEXING_STATE_PATH;
+    DistribStateManager stateManager = ocmh.cloudManager.getDistribStateManager();
+    if (stateManager.hasData(path)) {
+      stateManager.removeData(path, -1);
+    }
+  }
+
+  @VisibleForTesting
+  public static Map<String, Object> getReindexingState(DistribStateManager stateManager, String collection) throws Exception {
+    String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + REINDEXING_STATE_PATH;
+    // make it modifiable
+    return new TreeMap<>(Utils.getJson(stateManager, path));
+  }
+
+  private long getNumberOfDocs(String collection) {
+    CloudSolrClient solrClient = solrClientCache.getCloudSolrClient(zkHost);
+    try {
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.add(CommonParams.Q, "*:*");
+      params.add(CommonParams.ROWS, "0");
+      QueryResponse rsp = solrClient.query(collection, params);
+      return rsp.getResults().getNumFound();
+    } catch (Exception e) {
+      return 0L;
+    }
+  }
+
+  private void checkResults(String label, NamedList<Object> results, boolean failureIsFatal) throws Exception {
+    Object failure = results.get("failure");
+    if (failure == null) {
+      failure = results.get("error");
+    }
+    if (failure != null) {
+      String msg = "Error: " + label + ": " + Utils.toJSONString(results);
+      if (failureIsFatal) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, msg);
+      } else {
+        log.error(msg);
+      }
+    }
+  }
+
+  private boolean maybeAbort(String collection) throws Exception {
+    DocCollection coll = ocmh.cloudManager.getClusterStateProvider().getClusterState().getCollectionOrNull(collection);
+    if (coll == null) {
+      // collection no longer present - abort
+      log.info("## Aborting - collection {} no longer present.", collection);
+      return true;
+    }
+    Map<String, Object> reindexingState = getReindexingState(ocmh.cloudManager.getDistribStateManager(), collection);
+    State state = State.get(reindexingState.getOrDefault(STATE, State.RUNNING.toLower()));
+    if (state != State.ABORTED) {
+      return false;
+    }
+    log.info("## Aborting - collection {} state is {}", collection, state);
+    return true;
+  }
+
+  // XXX see #waitForDaemon() for why we need this
+  private String getDaemonUrl(SolrResponse rsp, DocCollection coll) {
+    Map<String, Object> rs = (Map<String, Object>)rsp.getResponse().get("result-set");
+    if (rs == null || rs.isEmpty()) {
+      log.debug(" -- Missing daemon information in response: " + Utils.toJSONString(rsp));
+    }
+    List<Object> list = (List<Object>)rs.get("docs");
+    if (list == null) {
+      log.debug(" -- Missing daemon information in response: " + Utils.toJSONString(rsp));
+      return null;
+    }
+    String replicaName = null;
+    for (Object o : list) {
+      Map<String, Object> map = (Map<String, Object>)o;
+      String op = (String)map.get("DaemonOp");
+      if (op == null) {
+        continue;
+      }
+      String[] parts = op.split("\\s+");
+      if (parts.length != 4) {
+        log.debug(" -- Invalid daemon location info, expected 4 tokens: " + op);
+        return null;
+      }
+      // check if it's plausible
+      if (parts[3].contains("shard") && parts[3].contains("replica")) {
+        replicaName = parts[3];
+        break;
+      } else {
+        log.debug(" -- daemon location info likely invalid: " + op);
+        return null;
+      }
+    }
+    if (replicaName == null) {
+      return null;
+    }
+    // build a baseUrl of the replica
+    for (Replica r : coll.getReplicas()) {
+      if (replicaName.equals(r.getCoreName())) {
+        return r.getBaseUrl() + "/" + r.getCoreName();
+      }
+    }
+    return null;
+  }
+
+  // XXX currently this is complicated to due a bug in the way the daemon 'list'
+  // XXX operation is implemented - see SOLR-13245. We need to query the actual
+  // XXX SolrCore where the daemon is running
+  private void waitForDaemon(String daemonName, String daemonUrl, String sourceCollection, String targetCollection, Map<String, Object> reindexingState) throws Exception {
+    HttpClient client = ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
+    try (HttpSolrClient solrClient = new HttpSolrClient.Builder()
+        .withHttpClient(client)
+        .withBaseSolrUrl(daemonUrl).build()) {
+      ModifiableSolrParams q = new ModifiableSolrParams();
+      q.set(CommonParams.QT, "/stream");
+      q.set("action", "list");
+      q.set(CommonParams.DISTRIB, false);
+      QueryRequest req = new QueryRequest(q);
+      boolean isRunning;
+      int statusCheck = 0;
+      do {
+        isRunning = false;
+        statusCheck++;
+        try {
+          NamedList<Object> rsp = solrClient.request(req);
+          Map<String, Object> rs = (Map<String, Object>)rsp.get("result-set");
+          if (rs == null || rs.isEmpty()) {
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can't find daemon list: missing result-set: " + Utils.toJSONString(rsp));
+          }
+          List<Object> list = (List<Object>)rs.get("docs");
+          if (list == null) {
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can't find daemon list: missing result-set: " + Utils.toJSONString(rsp));
+          }
+          if (list.isEmpty()) { // finished?
+            break;
+          }
+          for (Object o : list) {
+            Map<String, Object> map = (Map<String, Object>)o;
+            String id = (String)map.get("id");
+            if (daemonName.equals(id)) {
+              isRunning = true;
+              // fail here
+              TestInjection.injectReindexFailure();
+              break;
+            }
+          }
+        } catch (Exception e) {
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception waiting for daemon " +
+              daemonName + " at " + daemonUrl, e);
+        }
+        if (statusCheck % 5 == 0) {
+          reindexingState.put("processedDocs", getNumberOfDocs(targetCollection));
+          setReindexingState(sourceCollection, State.RUNNING, reindexingState);
+        }
+        ocmh.cloudManager.getTimeSource().sleep(2000);
+      } while (isRunning && !maybeAbort(sourceCollection));
+    }
+  }
+
+  private void killDaemon(String daemonName, String daemonUrl) throws Exception {
+    log.debug("-- killing daemon " + daemonName + " at " + daemonUrl);
+    HttpClient client = ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
+    try (HttpSolrClient solrClient = new HttpSolrClient.Builder()
+        .withHttpClient(client)
+        .withBaseSolrUrl(daemonUrl).build()) {
+      ModifiableSolrParams q = new ModifiableSolrParams();
+      q.set(CommonParams.QT, "/stream");
+      // we should really use 'kill' here, but then we will never
+      // know when the daemon actually finishes running - 'kill' only
+      // sets a flag that may be noticed much later
+      q.set("action", "stop");
+      q.set(CommonParams.ID, daemonName);
+      q.set(CommonParams.DISTRIB, false);
+      QueryRequest req = new QueryRequest(q);
+      NamedList<Object> rsp = solrClient.request(req);
+      // /result-set/docs/[0]/DaemonOp : Deamon:id killed on coreName
+      log.debug(" -- stop daemon response: " + Utils.toJSONString(rsp));
+      Map<String, Object> rs = (Map<String, Object>) rsp.get("result-set");
+      if (rs == null || rs.isEmpty()) {
+        log.warn("Problem killing daemon " + daemonName + ": missing result-set: " + Utils.toJSONString(rsp));
+        return;
+      }
+      List<Object> list = (List<Object>) rs.get("docs");
+      if (list == null) {
+        log.warn("Problem killing daemon " + daemonName + ": missing result-set: " + Utils.toJSONString(rsp));
+        return;
+      }
+      if (list.isEmpty()) { // already finished?
+        return;
+      }
+      for (Object o : list) {
+        Map<String, Object> map = (Map<String, Object>) o;
+        String op = (String) map.get("DaemonOp");
+        if (op == null) {
+          continue;
+        }
+        if (op.contains(daemonName) && op.contains("stopped")) {
+          // now wait for the daemon to really stop
+          q.set("action", "list");
+          req = new QueryRequest(q);
+          TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, ocmh.timeSource);
+          while (!timeOut.hasTimedOut()) {
+            rsp = solrClient.request(req);
+            rs = (Map<String, Object>) rsp.get("result-set");
+            if (rs == null || rs.isEmpty()) {
+              log.warn("Problem killing daemon " + daemonName + ": missing result-set: " + Utils.toJSONString(rsp));
+              break;
+            }
+            List<Object> list2 = (List<Object>) rs.get("docs");
+            if (list2 == null) {
+              log.warn("Problem killing daemon " + daemonName + ": missing result-set: " + Utils.toJSONString(rsp));
+              break;
+            }
+            if (list2.isEmpty()) { // already finished?
+              break;
+            }
+            Map<String, Object> status2 = null;
+            for (Object o2 : list2) {
+              Map<String, Object> map2 = (Map<String, Object>)o2;
+              if (daemonName.equals(map2.get("id"))) {
+                status2 = map2;
+                break;
+              }
+            }
+            if (status2 == null) { // finished?
+              break;
+            }
+            Number stopTime = (Number)status2.get("stopTime");
+            if (stopTime.longValue() > 0) {
+              break;
+            }
+          }
+          if (timeOut.hasTimedOut()) {
+            log.warn("Problem killing daemon " + daemonName + ": timed out waiting for daemon to stop.");
+            // proceed anyway
+          }
+        }
+      }
+      // now kill it - it's already stopped, this simply removes its status
+      q.set("action", "kill");
+      req = new QueryRequest(q);
+      solrClient.request(req);
+    }
+  }
+
+  private void cleanup(String collection, String targetCollection, String chkCollection,
+                       String daemonUrl, String daemonName, boolean createdTarget) throws Exception {
+    log.info("## Cleaning up after abort or error");
+    // 1. kill the daemon
+    // 2. cleanup target / chk collections IFF the source collection still exists and is not empty
+    // 3. cleanup collection state
+
+    if (daemonUrl != null) {
+      killDaemon(daemonName, daemonUrl);
+    }
+    ClusterState clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
+    NamedList<Object> cmdResults = new NamedList<>();
+    if (createdTarget && !collection.equals(targetCollection) && clusterState.hasCollection(targetCollection)) {
+      log.debug(" -- removing " + targetCollection);
+      ZkNodeProps cmd = new ZkNodeProps(
+          Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
+          CommonParams.NAME, targetCollection,
+          CoreAdminParams.DELETE_METRICS_HISTORY, "true"
+      );
+      ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
+      checkResults("CLEANUP: deleting target collection " + targetCollection, cmdResults, false);
+
+    }
+    // remove chk collection
+    if (clusterState.hasCollection(chkCollection)) {
+      log.debug(" -- removing " + chkCollection);
+      ZkNodeProps cmd = new ZkNodeProps(
+          Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
+          CommonParams.NAME, chkCollection,
+          CoreAdminParams.DELETE_METRICS_HISTORY, "true"
+      );
+      cmdResults = new NamedList<>();
+      ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
+      checkResults("CLEANUP: deleting checkpoint collection " + chkCollection, cmdResults, false);
+    }
+    log.debug(" -- turning readOnly mode off for " + collection);
+    ZkNodeProps props = new ZkNodeProps(
+        Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
+        ZkStateReader.COLLECTION_PROP, collection,
+        ZkStateReader.READ_ONLY, null);
+    ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+    removeReindexingState(collection);
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index a447093..545e2b3 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -216,8 +216,10 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
         DaemonStream d = daemons.remove(id);
         if (d != null) {
           d.close();
+          rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " killed on " + coreName));
+        } else {
+          rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " not found on " + coreName));
         }
-        rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " killed on " + coreName));
       }
     }
   }
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
index 7a4e090..b8e56a9 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
@@ -149,6 +149,9 @@ public class ColStatus {
         sliceMap.add("leader", leaderMap);
         leaderMap.add("coreNode", leader.getName());
         leaderMap.addAll(leader.getProperties());
+        if (!leader.isActive(clusterState.getLiveNodes())) {
+          continue;
+        }
         String url = ZkCoreNodeProps.getCoreUrl(leader);
         try (SolrClient client = solrClientCache.getHttpSolrClient(url)) {
           ModifiableSolrParams params = new ModifiableSolrParams();
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 70bcd1a..e933ac3 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -53,6 +53,7 @@ import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
 import org.apache.solr.cloud.ZkController.NotInClusterStateException;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.ZkShardTerms;
+import org.apache.solr.cloud.api.collections.ReindexCollectionCmd;
 import org.apache.solr.cloud.api.collections.RoutedAlias;
 import org.apache.solr.cloud.overseer.SliceMutator;
 import org.apache.solr.cloud.rule.ReplicaAssigner;
@@ -77,6 +78,7 @@ import org.apache.solr.common.params.AutoScalingParams;
 import org.apache.solr.common.params.CollectionAdminParams;
 import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.params.CollectionParams.CollectionAction;
+import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
@@ -540,6 +542,35 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
 
     RELOAD_OP(RELOAD, (req, rsp, h) -> copy(req.getParams().required(), null, NAME)),
 
+    REINDEXCOLLECTION_OP(REINDEXCOLLECTION, (req, rsp, h) -> {
+      Map<String, Object> m = copy(req.getParams().required(), null, NAME);
+      copy(req.getParams(), m,
+          ReindexCollectionCmd.COMMAND,
+          ReindexCollectionCmd.REMOVE_SOURCE,
+          ReindexCollectionCmd.TARGET,
+          ZkStateReader.CONFIGNAME_PROP,
+          NUM_SLICES,
+          NRT_REPLICAS,
+          PULL_REPLICAS,
+          TLOG_REPLICAS,
+          REPLICATION_FACTOR,
+          MAX_SHARDS_PER_NODE,
+          POLICY,
+          CREATE_NODE_SET,
+          CREATE_NODE_SET_SHUFFLE,
+          AUTO_ADD_REPLICAS,
+          "shards",
+          STATE_FORMAT,
+          CommonParams.ROWS,
+          CommonParams.Q,
+          CommonParams.FL);
+      if (req.getParams().get("collection." + ZkStateReader.CONFIGNAME_PROP) != null) {
+        m.put(ZkStateReader.CONFIGNAME_PROP, req.getParams().get("collection." + ZkStateReader.CONFIGNAME_PROP));
+      }
+      copyPropertiesWithPrefix(req.getParams(), m, "router.");
+      return m;
+    }),
+
     SYNCSHARD_OP(SYNCSHARD, (req, rsp, h) -> {
       String collection = req.getParams().required().get("collection");
       String shard = req.getParams().required().get("shard");
diff --git a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java
index b9f9645..e433dc4 100644
--- a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java
+++ b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java
@@ -346,7 +346,13 @@ public class ManagedIndexSchemaFactory extends IndexSchemaFactory implements Sol
           zkCmdExecutor.ensureExists(upgradedSchemaPath, zkController.getZkClient());
           zkController.getZkClient().setData(upgradedSchemaPath, bytes, true);
           // Then delete the non-managed schema znode
-          zkController.getZkClient().delete(nonManagedSchemaPath, -1, true);
+          if (zkController.getZkClient().exists(nonManagedSchemaPath, true)) {
+            try {
+              zkController.getZkClient().delete(nonManagedSchemaPath, -1, true);
+            } catch (KeeperException.NoNodeException ex) {
+              // ignore - someone beat us to it
+            }
+          }
 
           // Set the resource name to the managed schema so that the CoreAdminHandler returns a findable filename 
           schema.setResourceName(managedSchemaResourceName);
diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java b/solr/core/src/java/org/apache/solr/util/TestInjection.java
index cf7681e..7a49ba4 100644
--- a/solr/core/src/java/org/apache/solr/util/TestInjection.java
+++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java
@@ -126,6 +126,10 @@ public class TestInjection {
 
   public volatile static CountDownLatch splitLatch = null;
 
+  public volatile static CountDownLatch reindexLatch = null;
+
+  public volatile static String reindexFailure = null;
+
   public volatile static String failIndexFingerprintRequests = null;
 
   public volatile static String wrongIndexFingerprint = null;
@@ -156,6 +160,8 @@ public class TestInjection {
     splitFailureBeforeReplicaCreation = null;
     splitFailureAfterReplicaCreation = null;
     splitLatch = null;
+    reindexLatch = null;
+    reindexFailure = null;
     prepRecoveryOpPauseForever = null;
     countPrepRecoveryOpPauseForever = new AtomicInteger(0);
     failIndexFingerprintRequests = null;
@@ -423,6 +429,35 @@ public class TestInjection {
     return true;
   }
 
+  public static boolean injectReindexFailure() {
+    if (reindexFailure != null)  {
+      Random rand = random();
+      if (null == rand) return true;
+
+      Pair<Boolean,Integer> pair = parseValue(reindexFailure);
+      boolean enabled = pair.first();
+      int chanceIn100 = pair.second();
+      if (enabled && rand.nextInt(100) >= (100 - chanceIn100)) {
+        log.info("Test injection failure");
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Test injection failure");
+      }
+    }
+    return true;
+  }
+
+
+  public static boolean injectReindexLatch() {
+    if (reindexLatch != null) {
+      try {
+        log.info("Waiting in ReindexCollectionCmd for up to 60s");
+        return reindexLatch.await(60, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+    return true;
+  }
+
   private static Pair<Boolean,Integer> parseValue(final String raw) {
     if (raw == null) return new Pair<>(false, 0);
     Matcher m = ENABLED_PERCENT.matcher(raw);
diff --git a/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java b/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java
new file mode 100644
index 0000000..8413cf2
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.apache.solr.client.solrj.cloud.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.api.collections.ReindexCollectionCmd;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.util.LogLevel;
+import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.TimeOut;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ */
+@LogLevel("org.apache.solr.cloud.api.collections.ReindexCollectionCmd=DEBUG")
+public class ReindexCollectionTest extends SolrCloudTestCase {
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(2)
+        // only *_s
+        .addConfig("conf1", configset("cloud-minimal"))
+        // every combination of field flags
+        .addConfig("conf2", configset("cloud-dynamic"))
+        // catch-all * field, indexed+stored
+        .addConfig("conf3", configset("cloud-minimal-inplace-updates"))
+        .configure();
+  }
+
+  private CloudSolrClient solrClient;
+  private SolrCloudManager cloudManager;
+  private DistribStateManager stateManager;
+
+  @Before
+  public void doBefore() throws Exception {
+    ZkController zkController = cluster.getJettySolrRunner(0).getCoreContainer().getZkController();
+    cloudManager = zkController.getSolrCloudManager();
+    stateManager = cloudManager.getDistribStateManager();
+    solrClient = new CloudSolrClientBuilder(Collections.singletonList(zkController.getZkServerAddress()),
+        Optional.empty()).build();
+  }
+
+  private ReindexCollectionCmd.State getState(String collection) {
+    try {
+      return ReindexCollectionCmd.State.get(ReindexCollectionCmd
+          .getReindexingState(stateManager, collection)
+          .get(ReindexCollectionCmd.STATE));
+    } catch (Exception e) {
+      fail("Unexpected exception checking state of " + collection + ": " + e);
+      return null;
+    }
+  }
+
+  private void waitForState(String collection, ReindexCollectionCmd.State expected) throws Exception {
+    TimeOut timeOut = new TimeOut(30, TimeUnit.SECONDS, cloudManager.getTimeSource());
+    ReindexCollectionCmd.State current = null;
+    while (!timeOut.hasTimedOut()) {
+      current = getState(collection);
+      if (expected == current) {
+        return;
+      }
+      timeOut.sleep(500);
+    }
+    throw new Exception("timeout waiting for state, current=" + current + ", expected=" + expected);
+  }
+
+  @After
+  public void doAfter() throws Exception {
+    cluster.deleteAllCollections(); // deletes aliases too
+
+    solrClient.close();
+
+    TestInjection.reset();
+  }
+
+  private static final int NUM_DOCS = 200; // at least two batches, default batchSize=100
+
+  @Test
+  public void testBasicReindexing() throws Exception {
+    final String sourceCollection = "basicReindexing";
+
+    createCollection(sourceCollection, "conf1", 2, 2);
+
+    indexDocs(sourceCollection, NUM_DOCS,
+        i -> new SolrInputDocument("id", String.valueOf(i), "string_s", String.valueOf(i)));
+
+    final String targetCollection = "basicReindexingTarget";
+
+    CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
+        .setTarget(targetCollection);
+    CollectionAdminResponse rsp = req.process(solrClient);
+    assertNotNull(rsp.toString(), rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS));
+    Map<String, Object> status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
+    assertEquals(status.toString(), (long)NUM_DOCS, ((Number)status.get("inputDocs")).longValue());
+    assertEquals(status.toString(), (long)NUM_DOCS, ((Number)status.get("processedDocs")).longValue());
+
+    CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
+      ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
+      return ReindexCollectionCmd.State.FINISHED == state;
+    });
+    // verify the target docs exist
+    QueryResponse queryResponse = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
+    assertEquals("copied num docs", NUM_DOCS, queryResponse.getResults().getNumFound());
+  }
+
+  public void testSameTargetReindexing() throws Exception {
+    final String sourceCollection = "sameTargetReindexing";
+    final String targetCollection = sourceCollection;
+
+    createCollection(sourceCollection, "conf1", 2, 2);
+    indexDocs(sourceCollection, NUM_DOCS,
+        i -> new SolrInputDocument("id", String.valueOf(i), "string_s", String.valueOf(i)));
+
+    CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
+        .setTarget(targetCollection);
+    req.process(solrClient);
+
+    String realTargetCollection = null;
+    TimeOut timeOut = new TimeOut(30, TimeUnit.SECONDS, cloudManager.getTimeSource());
+    String prefix = ReindexCollectionCmd.TARGET_COL_PREFIX + targetCollection;
+    while (!timeOut.hasTimedOut()) {
+      timeOut.sleep(500);
+      for (String name : cloudManager.getClusterStateProvider().getClusterState().getCollectionsMap().keySet()) {
+        if (name.startsWith(prefix)) {
+          realTargetCollection = name;
+          break;
+        }
+      }
+      if (realTargetCollection != null) {
+        break;
+      }
+    }
+    assertNotNull("target collection not present after 30s", realTargetCollection);
+
+    CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", realTargetCollection, (liveNodes, coll) -> {
+      ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
+      return ReindexCollectionCmd.State.FINISHED == state;
+    });
+    // verify the target docs exist
+    QueryResponse rsp = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
+    assertEquals("copied num docs", NUM_DOCS, rsp.getResults().getNumFound());
+  }
+
+  @Test
+  public void testLossySchema() throws Exception {
+    final String sourceCollection = "sourceLossyReindexing";
+    final String targetCollection = "targetLossyReindexing";
+
+
+    createCollection(sourceCollection, "conf2", 2, 2);
+
+    indexDocs(sourceCollection, NUM_DOCS, i ->
+      new SolrInputDocument(
+          "id", String.valueOf(i),
+          "string_s", String.valueOf(i),
+          "sind", "this is a test " + i)); // "sind": indexed=true, stored=false, will be lost...
+
+    CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
+        .setTarget(targetCollection)
+        .setConfigName("conf3");
+    req.process(solrClient);
+
+    CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
+      ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
+      return ReindexCollectionCmd.State.FINISHED == state;
+    });
+    // verify the target docs exist
+    QueryResponse rsp = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
+    assertEquals("copied num docs", NUM_DOCS, rsp.getResults().getNumFound());
+    for (SolrDocument doc : rsp.getResults()) {
+      String id = (String)doc.getFieldValue("id");
+      assertEquals(id, doc.getFieldValue("string_s"));
+      assertFalse(doc.containsKey("sind")); // lost in translation ...
+    }
+  }
+
+  @Test
+  public void testReshapeReindexing() throws Exception {
+    final String sourceCollection = "reshapeReindexing";
+    final String targetCollection = "reshapeReindexingTarget";
+    createCollection(sourceCollection, "conf1", 2, 2);
+    indexDocs(sourceCollection, NUM_DOCS,
+        i -> new SolrInputDocument(
+            "id", String.valueOf(i),
+            "string_s", String.valueOf(i),
+            "remove_s", String.valueOf(i)));
+
+    CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
+        .setTarget(targetCollection)
+        .setCollectionParam(ZkStateReader.NUM_SHARDS_PROP, 3)
+        .setCollectionParam(ZkStateReader.REPLICATION_FACTOR, 1)
+        .setCollectionParam("router.name", ImplicitDocRouter.NAME)
+        .setCollectionParam("shards", "foo,bar,baz")
+        .setCollectionParam("fl", "id,string_s")
+        .setCollectionParam("q", "id:10*");
+    req.process(solrClient);
+
+    CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
+      ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
+      return ReindexCollectionCmd.State.FINISHED == state;
+    });
+    // verify the target docs exist
+    QueryResponse rsp = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
+    // 10 and 100-109
+    assertEquals("copied num docs", 11, rsp.getResults().getNumFound());
+    // verify the correct fields exist
+    for (SolrDocument doc : rsp.getResults()) {
+      assertNotNull(doc.getFieldValue("id"));
+      assertNotNull(doc.getFieldValue("string_s"));
+      assertNull(doc.getFieldValue("remove_s"));
+    }
+
+    // check the shape of the new collection
+    ClusterState clusterState = solrClient.getClusterStateProvider().getClusterState();
+    List<String> aliases = solrClient.getZkStateReader().getAliases().resolveAliases(targetCollection);
+    assertFalse(aliases.isEmpty());
+    String realTargetCollection = aliases.get(0);
+    DocCollection coll = clusterState.getCollection(realTargetCollection);
+    assertNotNull(coll);
+    assertEquals(3, coll.getSlices().size());
+    assertNotNull("foo", coll.getSlice("foo"));
+    assertNotNull("bar", coll.getSlice("bar"));
+    assertNotNull("baz", coll.getSlice("baz"));
+    assertEquals(Integer.valueOf(1), coll.getReplicationFactor());
+    assertEquals(ImplicitDocRouter.NAME, coll.getRouter().getName());
+  }
+
+  @Test
+  public void testFailure() throws Exception {
+    final String sourceCollection = "failReindexing";
+    final String targetCollection = "failReindexingTarget";
+    final String aliasTarget = "failAlias";
+    createCollection(sourceCollection, "conf1", 2, 2);
+    createCollection(targetCollection, "conf1", 1, 1);
+    CollectionAdminRequest.createAlias(aliasTarget, targetCollection).process(solrClient);
+    indexDocs(sourceCollection, NUM_DOCS,
+        i -> new SolrInputDocument(
+            "id", String.valueOf(i),
+            "string_s", String.valueOf(i)));
+
+    CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
+        .setTarget(targetCollection);
+    CollectionAdminResponse rsp = req.process(solrClient);
+    assertNotNull(rsp.getResponse().get("error"));
+    assertTrue(rsp.toString(), rsp.getResponse().get("error").toString().contains("already exists"));
+
+    req = CollectionAdminRequest.reindexCollection(sourceCollection)
+        .setTarget(aliasTarget);
+    rsp = req.process(solrClient);
+    assertNotNull(rsp.getResponse().get("error"));
+    assertTrue(rsp.toString(), rsp.getResponse().get("error").toString().contains("already exists"));
+
+    CollectionAdminRequest.deleteAlias(aliasTarget).process(solrClient);
+    CollectionAdminRequest.deleteCollection(targetCollection).process(solrClient);
+
+    req = CollectionAdminRequest.reindexCollection(sourceCollection)
+        .setTarget(targetCollection);
+
+    TestInjection.reindexFailure = "true:100";
+    rsp = req.process(solrClient);
+    assertNotNull(rsp.getResponse().get("error"));
+    assertTrue(rsp.toString(), rsp.getResponse().get("error").toString().contains("waiting for daemon"));
+
+    // verify that the target and checkpoint collections don't exist
+    cloudManager.getClusterStateProvider().getClusterState().forEachCollection(coll -> {
+      assertFalse(coll.getName() + " still exists", coll.getName().startsWith(ReindexCollectionCmd.TARGET_COL_PREFIX));
+      assertFalse(coll.getName() + " still exists", coll.getName().startsWith(ReindexCollectionCmd.CHK_COL_PREFIX));
+    });
+    // verify that the source collection is read-write and has no reindexing flags
+    CloudTestUtils.waitForState(cloudManager, "collection state is incorrect", sourceCollection,
+        ((liveNodes, collectionState) ->
+            !collectionState.isReadOnly() &&
+            collectionState.getStr(ReindexCollectionCmd.REINDEXING_STATE) == null &&
+            getState(sourceCollection) == null));
+  }
+
+  @Test
+  public void testAbort() throws Exception {
+    final String sourceCollection = "abortReindexing";
+    final String targetCollection = "abortReindexingTarget";
+    createCollection(sourceCollection, "conf1", 2, 1);
+
+    TestInjection.reindexLatch = new CountDownLatch(1);
+    CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
+        .setTarget(targetCollection);
+    String asyncId = req.processAsync(solrClient);
+    // wait for the source collection to be put in readOnly mode
+    CloudTestUtils.waitForState(cloudManager, "source collection didn't become readOnly",
+        sourceCollection, (liveNodes, coll) -> coll.isReadOnly());
+
+    req = CollectionAdminRequest.reindexCollection(sourceCollection);
+    req.setCommand("abort");
+    CollectionAdminResponse rsp = req.process(solrClient);
+    Map<String, Object> status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
+    assertNotNull(rsp.toString(), status);
+    assertEquals(status.toString(), "aborting", status.get("state"));
+
+    CloudTestUtils.waitForState(cloudManager, "incorrect collection state", sourceCollection,
+        ((liveNodes, collectionState) ->
+            collectionState.isReadOnly() &&
+            getState(sourceCollection) == ReindexCollectionCmd.State.ABORTED));
+
+    // verify status
+    req.setCommand("status");
+    rsp = req.process(solrClient);
+    status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
+    assertNotNull(rsp.toString(), status);
+    assertEquals(status.toString(), "aborted", status.get("state"));
+    // let the process continue
+    TestInjection.reindexLatch.countDown();
+    CloudTestUtils.waitForState(cloudManager, "source collection is in wrong state",
+        sourceCollection, (liveNodes, docCollection) -> !docCollection.isReadOnly() && getState(sourceCollection) == null);
+    // verify the response
+    rsp = CollectionAdminRequest.requestStatus(asyncId).process(solrClient);
+    status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
+    assertNotNull(rsp.toString(), status);
+    assertEquals(status.toString(), "aborted", status.get("state"));
+  }
+
+  private void createCollection(String name, String config, int numShards, int numReplicas) throws Exception {
+    CollectionAdminRequest.createCollection(name, config, numShards, numReplicas)
+        .setMaxShardsPerNode(-1)
+        .process(solrClient);
+
+    cluster.waitForActiveCollection(name, numShards, numShards * numReplicas);
+  }
+
+  private void indexDocs(String collection, int numDocs, Function<Integer, SolrInputDocument> generator) throws Exception {
+    List<SolrInputDocument> docs = new ArrayList<>();
+    for (int i = 0; i < numDocs; i++) {
+      docs.add(generator.apply(i));
+    }
+    solrClient.add(collection, docs);
+    solrClient.commit(collection);
+    // verify the docs exist
+    QueryResponse rsp = solrClient.query(collection, params(CommonParams.Q, "*:*"));
+    assertEquals("num docs", NUM_DOCS, rsp.getResults().getNumFound());
+
+  }
+}
diff --git a/solr/core/src/test/org/apache/solr/cloud/SystemCollectionCompatTest.java b/solr/core/src/test/org/apache/solr/cloud/SystemCollectionCompatTest.java
new file mode 100644
index 0000000..d2e98cb
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/SystemCollectionCompatTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.schema.SchemaRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.schema.SchemaResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.params.CollectionAdminParams;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.RetryUtil;
+import org.apache.solr.logging.LogWatcher;
+import org.apache.solr.logging.LogWatcherConfig;
+import org.apache.solr.util.IdUtils;
+import org.apache.solr.util.TimeOut;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class SystemCollectionCompatTest extends SolrCloudTestCase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    System.setProperty("managed.schema.mutable", "true");
+    configureCluster(2)
+        .addConfig("conf1", configset("cloud-managed"))
+        .configure();
+    if (! log.isWarnEnabled()) {
+      fail("Test requires that log-level is at-least WARN, but WARN is disabled");
+    }
+  }
+
+  private SolrCloudManager cloudManager;
+  private CloudSolrClient solrClient;
+
+  @Before
+  public void setupSystemCollection() throws Exception {
+    CollectionAdminRequest.createCollection(CollectionAdminParams.SYSTEM_COLL, null, 1, 2)
+        .process(cluster.getSolrClient());
+    cluster.waitForActiveCollection(CollectionAdminParams.SYSTEM_COLL,  1, 2);
+    ZkController zkController = cluster.getJettySolrRunner(0).getCoreContainer().getZkController();
+    cloudManager = zkController.getSolrCloudManager();
+    solrClient = new CloudSolrClientBuilder(Collections.singletonList(zkController.getZkServerAddress()),
+        Optional.empty()).build();
+    // send a dummy doc to the .system collection
+    SolrInputDocument doc = new SolrInputDocument(
+        "id", IdUtils.timeRandomId(),
+        CommonParams.TYPE, "dummy");
+    doc.addField("time_l", cloudManager.getTimeSource().getEpochTimeNs());
+    doc.addField("timestamp", new Date());
+    solrClient.add(CollectionAdminParams.SYSTEM_COLL, doc);
+    solrClient.commit(CollectionAdminParams.SYSTEM_COLL);
+
+    Replica leader
+        = solrClient.getZkStateReader().getLeaderRetry(CollectionAdminParams.SYSTEM_COLL, "shard1", DEFAULT_TIMEOUT);
+    final AtomicReference<Long> coreStartTime = new AtomicReference<>(getCoreStatus(leader).getCoreStartTime().getTime());
+    // trigger compat report by changing the schema
+    SchemaRequest req = new SchemaRequest();
+    SchemaResponse rsp = req.process(solrClient, CollectionAdminParams.SYSTEM_COLL);
+    Map<String, Object> field = getSchemaField("timestamp", rsp);
+    // make some obviously incompatible changes
+    field.put("type", "string");
+    field.put("docValues", false);
+    SchemaRequest.ReplaceField replaceFieldRequest = new SchemaRequest.ReplaceField(field);
+    SchemaResponse.UpdateResponse replaceFieldResponse = replaceFieldRequest.process(solrClient, CollectionAdminParams.SYSTEM_COLL);
+    assertEquals(replaceFieldResponse.toString(), 0, replaceFieldResponse.getStatus());
+    CollectionAdminRequest.Reload reloadRequest = CollectionAdminRequest.reloadCollection(CollectionAdminParams.SYSTEM_COLL);
+    CollectionAdminResponse response = reloadRequest.process(solrClient);
+    assertEquals(0, response.getStatus());
+    assertTrue(response.isSuccess());
+    // wait for the reload to complete
+    RetryUtil.retryUntil("Timed out waiting for core to reload", 30, 1000, TimeUnit.MILLISECONDS, () -> {
+      long restartTime = 0;
+      try {
+        restartTime = getCoreStatus(leader).getCoreStartTime().getTime();
+      } catch (Exception e) {
+        log.warn("Exception getting core start time: {}", e.getMessage());
+        return false;
+      }
+      return restartTime > coreStartTime.get();
+    });
+    cluster.waitForActiveCollection(CollectionAdminParams.SYSTEM_COLL,  1, 2);
+
+  }
+
+  @After
+  public void doAfter() throws Exception {
+    cluster.deleteAllCollections();
+
+    solrClient.close();
+  }
+
+  private Map<String, Object> getSchemaField(String name, SchemaResponse schemaResponse) {
+    List<Map<String, Object>> fields = schemaResponse.getSchemaRepresentation().getFields();
+    for (Map<String, Object> field : fields) {
+      if (name.equals(field.get("name"))) {
+        return field;
+      }
+    }
+    return null;
+  }
+
+  @Test
+  public void testBackCompat() throws Exception {
+    CollectionAdminRequest.OverseerStatus status = new CollectionAdminRequest.OverseerStatus();
+    CloudSolrClient solrClient = cluster.getSolrClient();
+    CollectionAdminResponse adminResponse = status.process(solrClient);
+    NamedList<Object> response = adminResponse.getResponse();
+    String leader = (String) response.get("leader");
+    JettySolrRunner overseerNode = null;
+    int index = -1;
+    List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
+    for (int i = 0; i < jettySolrRunners.size(); i++) {
+      JettySolrRunner runner = jettySolrRunners.get(i);
+      if (runner.getNodeName().equals(leader)) {
+        overseerNode = runner;
+        index = i;
+        break;
+      }
+    }
+    assertNotNull(overseerNode);
+    LogWatcherConfig watcherCfg = new LogWatcherConfig(true, null, "WARN", 100);
+    LogWatcher watcher = LogWatcher.newRegisteredLogWatcher(watcherCfg, null);
+
+    watcher.reset();
+
+    // restart Overseer to trigger the back-compat check
+    cluster.stopJettySolrRunner(index);
+    TimeOut timeOut = new TimeOut(30, TimeUnit.SECONDS, cloudManager.getTimeSource());
+    while (!timeOut.hasTimedOut()) {
+      adminResponse = status.process(solrClient);
+      response = adminResponse.getResponse();
+      String newLeader = (String) response.get("leader");
+      if (newLeader != null && !leader.equals(newLeader)) {
+        break;
+      }
+      timeOut.sleep(200);
+    }
+    if (timeOut.hasTimedOut()) {
+      fail("time out waiting for new Overseer leader");
+    }
+
+    TimeOut timeOut1 = new TimeOut(60, TimeUnit.SECONDS, cloudManager.getTimeSource());
+    boolean foundWarning = false;
+    boolean foundSchemaWarning = false;
+    while (!timeOut1.hasTimedOut()) {
+      timeOut1.sleep(1000);
+      SolrDocumentList history = watcher.getHistory(-1, null);
+      for (SolrDocument doc : history) {
+        if (!Overseer.class.getName().equals(doc.getFieldValue("logger"))) {
+          continue;
+        }
+        if (doc.getFieldValue("message").toString().contains("re-indexing")) {
+          foundWarning = true;
+        }
+        if (doc.getFieldValue("message").toString().contains("timestamp")) {
+          foundSchemaWarning = true;
+        }
+      }
+      if (foundWarning && foundSchemaWarning) {
+        break;
+      }
+    }
+    assertTrue("re-indexing warning not found", foundWarning);
+    assertTrue("timestamp field incompatibility warning not found", foundSchemaWarning);
+
+  }
+
+}
diff --git a/solr/solr-ref-guide/src/collections-api.adoc b/solr/solr-ref-guide/src/collections-api.adoc
index c5f68cc..07e112f 100644
--- a/solr/solr-ref-guide/src/collections-api.adoc
+++ b/solr/solr-ref-guide/src/collections-api.adoc
@@ -198,6 +198,7 @@ The attributes that can be modified are:
 
 See the <<create,CREATE action>> section above for details on these attributes.
 
+[[readonlymode]]
 ==== Read-only mode
 Setting the `readOnly` attribute to `true` puts the collection in read-only mode,
 in which any index update requests are rejected. Other collection-level actions (eg. adding /
@@ -218,6 +219,125 @@ NOTE: This may potentially take a long time if there are still major segment mer
 Removing the `readOnly` property or setting it to false enables the
 processing of updates and reloads the collection.
 
+[[reindexcollection]]
+== REINDEXCOLLECTION: Re-index a Collection
+
+`/admin/collections?action=REINDEXCOLLECTION&name=_name_`
+
+The REINDEXCOLLECTION command re-indexes a collection using existing data from the
+source collection.
+
+NOTE: Re-indexing is potentially a lossy operation - some of the existing indexed data that is not
+available as stored fields may be lost, so users should use this command
+with caution, evaluating the potential impact by using different source and target
+collection names first, and preserving the source collection until the evaluation is
+complete.
+
+The target collection must not exist (and may not be an alias). If the target
+collection name is the same as the source collection then first a unique sequential name
+will be generated for the target collection, and then after re-indexing is done an alias
+will be created that points from the source name to the actual sequentially-named target collection.
+
+When re-indexing is started the source collection is put in <<readonlymode,read-only mode>> to ensure that
+all source documents are properly processed.
+
+Using optional parameters a different index schema, collection shape (number of shards and replicas)
+or routing parameters can be requested for the target collection.
+
+Re-indexing is executed as a streaming expression daemon, which runs on one of the
+source collection's replicas. It is usually a time-consuming operation so it's recommended to execute
+it as an asynchronous request in order to avoid request time outs. Only one re-indexing operation may
+execute concurrently for a given source collection. Long-running, erroneous or crashed re-indexing
+operations may be terminated by using the `abort` option, which also removes partial results.
+
+=== REINDEXCOLLECTION Parameters
+
+`name`::
+Source collection name, may be an alias. This parameter is required.
+
+`cmd`::
+Optional command. Default command is `start`. Currently supported commands are:
+* `start` - default, starts processing if not already running,
+* `abort` - aborts an already running re-indexing (or clears a left-over status after a crash),
+and deletes partial results,
+* `status` - returns detailed status of a running re-indexing command.
+
+`target`::
+Target collection name, optional. If not specified a unique name will be generated and
+after all documents have been copied an alias will be created that points from the source
+collection name to the unique sequentially-named collection, effectively "hiding"
+the original source collection from regular update and search operations.
+
+`q`::
+Optional query to select documents for re-indexing. Default value is `\*:*`.
+
+`fl`::
+Optional list of fields to re-index. Default value is `*`.
+
+`rows`::
+Documents are transferred in batches. Depending on the average size of the document large
+batch sizes may cause memory issues. Default value is 100.
+
+`configName`::
+`collection.configName`::
+Optional name of the configset for the target collection. Default is the same as the
+source collection.
+
+There's a number of optional parameters that determine the target collection layout. If they
+are not specified in the request then their values are copied from the source collection.
+The following parameters are currently supported (described in details in the <<create,CREATE collection>> section):
+`numShards`, `replicationFactor`, `nrtReplicas`, `tlogReplicas`, `pullReplicas`, `maxShardsPerNode`,
+`autoAddReplicas`, `shards`, `policy`, `createNodeSet`, `createNodeSet.shuffle`, `router.*`.
+
+`removeSource`::
+Optional boolean. If true then after the processing is successfully finished the source collection will
+be deleted.
+
+`async`::
+Optional request ID to track this action which will be <<Asynchronous Calls,processed asynchronously>>.
+
+When the re-indexing process has completed the target collection is marked using
+`property.rx: "finished"`, and the source collection state is updated to become read-write.
+On any errors the command will delete any temporary and target collections and also reset the
+state of the source collection's read-only flag.
+
+=== Examples using REINDEXCOLLECTION
+
+*Input*
+
+[source,text]
+----
+http://localhost:8983/solr/admin/collections?action=REINDEXCOLLECTION&name=newCollection&numShards=3&configName=conf2&q=id:aa*&fl=id,string_s
+----
+This request specifies a different schema for the target collection, copies only some of the fields, selects only the documents
+matching a query, and also potentially re-shapes the collection by explicitly specifying 3 shards. Since the target collection
+hasn't been specified in the parameters a collection with a unique name eg. `.rx_newCollection_2` will be created and on success
+an alias pointing from `newCollection` to `.rx_newCollection_2` will be created, effectively replacing the source collection
+for the purpose of indexing and searching. The source collection is assumed to be small so a synchronous request was made.
+
+*Output*
+
+[source,json]
+----
+{
+  "responseHeader":{
+    "status":0,
+    "QTime":10757},
+  "reindexStatus":{
+    "phase":"done",
+    "inputDocs":13416,
+    "processedDocs":376,
+    "actualSourceCollection":".rx_newCollection_1",
+    "state":"finished",
+    "actualTargetCollection":".rx_newCollection_2",
+    "checkpointCollection":".rx_ck_newCollection"
+  }
+}
+----
+As a result a new collection `.rx_newCollection_2` has been created, with selected documents re-indexed to 3 shards, and
+with an alias pointing from `newCollection` to this one. The status also shows that the source collection
+was already an alias to `.rx_newCollection_1`, which was likely a result of a previous re-indexing.
+
 [[reload]]
 == RELOAD: Reload a Collection
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
index 9d02ec2..ec56bfe 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -51,7 +52,7 @@ public class DaemonStream extends TupleStream implements Expressible {
   private ArrayBlockingQueue<Tuple> queue;
   private int queueSize;
   private boolean eatTuples;
-  private long iterations;
+  private AtomicLong iterations = new AtomicLong();
   private long startTime;
   private long stopTime;
   private Exception exception;
@@ -240,7 +241,7 @@ public class DaemonStream extends TupleStream implements Expressible {
     tuple.put(ID, id);
     tuple.put("startTime", startTime);
     tuple.put("stopTime", stopTime);
-    tuple.put("iterations", iterations);
+    tuple.put("iterations", iterations.get());
     tuple.put("state", streamRunner.getState().toString());
     if(exception != null) {
       tuple.put("exception", exception.getMessage());
@@ -253,10 +254,6 @@ public class DaemonStream extends TupleStream implements Expressible {
     this.daemons = daemons;
   }
 
-  private synchronized void incrementIterations() {
-    ++iterations;
-  }
-
   private synchronized void setStartTime(long startTime) {
     this.startTime = startTime;
   }
@@ -332,7 +329,7 @@ public class DaemonStream extends TupleStream implements Expressible {
             log.error("Error in DaemonStream:" + id, e);
             ++errors;
             if (errors > 100) {
-              log.error("Too many consectutive errors. Stopping DaemonStream:" + id);
+              log.error("Too many consecutive errors. Stopping DaemonStream:" + id);
               break OUTER;
             }
           } catch (Throwable t) {
@@ -351,7 +348,7 @@ public class DaemonStream extends TupleStream implements Expressible {
             }
           }
         }
-        incrementIterations();
+        iterations.incrementAndGet();
 
         if (sleepMillis > 0) {
           try {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 1654689..ad1c6b7 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -784,6 +784,90 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
   }
 
   /**
+   * Returns a SolrRequest to reindex a collection
+   */
+  public static ReindexCollection reindexCollection(String collection) {
+    return new ReindexCollection(collection);
+  }
+
+  public static class ReindexCollection extends AsyncCollectionSpecificAdminRequest {
+    String target;
+    String query;
+    String fields;
+    String configName;
+    Boolean removeSource;
+    String cmd;
+    Integer batchSize;
+    Map<String, Object> collectionParams = new HashMap<>();
+
+    private ReindexCollection(String collection) {
+      super(CollectionAction.REINDEXCOLLECTION, collection);
+    }
+
+    /** Target collection name (null if the same). */
+    public ReindexCollection setTarget(String target) {
+      this.target = target;
+      return this;
+    }
+
+    /** Set optional command (eg. abort, status). */
+    public ReindexCollection setCommand(String command) {
+      this.cmd = command;
+      return this;
+    }
+
+    /** Query matching the documents to reindex (default is '*:*'). */
+    public ReindexCollection setQuery(String query) {
+      this.query = query;
+      return this;
+    }
+
+    /** Fields to reindex (the same syntax as {@link CommonParams#FL}), default is '*'. */
+    public ReindexCollection setFields(String fields) {
+      this.fields = fields;
+      return this;
+    }
+
+    /** Remove source collection after success. Default is false. */
+    public ReindexCollection setRemoveSource(boolean removeSource) {
+      this.removeSource = removeSource;
+      return this;
+    }
+
+    /** Copy documents in batches of this size. Default is 100. */
+    public ReindexCollection setBatchSize(int batchSize) {
+      this.batchSize = batchSize;
+      return this;
+    }
+
+    /** Config name for the target collection. Default is the same as source. */
+    public ReindexCollection setConfigName(String configName) {
+      this.configName = configName;
+      return this;
+    }
+
+    /** Set other supported collection CREATE parameters. */
+    public ReindexCollection setCollectionParam(String key, Object value) {
+      this.collectionParams.put(key, value);
+      return this;
+    }
+
+    @Override
+    public SolrParams getParams() {
+      ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
+      params.setNonNull("target", target);
+      params.setNonNull("cmd", cmd);
+      params.setNonNull(ZkStateReader.CONFIGNAME_PROP, configName);
+      params.setNonNull(CommonParams.Q, query);
+      params.setNonNull(CommonParams.FL, fields);
+      params.setNonNull("removeSource", removeSource);
+      params.setNonNull(CommonParams.ROWS, batchSize);
+      collectionParams.forEach((k, v) -> params.setNonNull(k, v));
+      return params;
+    }
+  }
+
+  /**
    * Return a SolrRequest for low-level detailed status of the collection.
    */
   public static ColStatus collectionStatus(String collection) {
@@ -823,10 +907,10 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     @Override
     public SolrParams getParams() {
       ModifiableSolrParams params = (ModifiableSolrParams)super.getParams();
-      params.setNonNull("segments", withSegments.toString());
-      params.setNonNull("fieldInfo", withFieldInfo.toString());
-      params.setNonNull("coreInfo", withCoreInfo.toString());
-      params.setNonNull("sizeInfo", withSizeInfo.toString());
+      params.setNonNull("segments", withSegments);
+      params.setNonNull("fieldInfo", withFieldInfo);
+      params.setNonNull("coreInfo", withCoreInfo);
+      params.setNonNull("sizeInfo", withSizeInfo);
       return params;
     }
   }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
index 30778b8..c4dad33 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
@@ -100,6 +100,11 @@ public class CompositeIdRouter extends HashBasedRouter {
     return targetSlices;
   }
 
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
   public List<Range> partitionRangeByKey(String key, Range range) {
     List<Range> result = new ArrayList<>(3);
     Range keyRange = keyHashRange(key);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java
index 111c74b..335c86d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java
@@ -223,6 +223,7 @@ public abstract class DocRouter {
 
   public abstract boolean isTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, String shardId, DocCollection collection);
 
+  public abstract String getName();
 
   /** This method is consulted to determine what slices should be queried for a request when
    *  an explicit shards parameter was not used.
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java
index 0b25fcb..7e51621 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java
@@ -76,6 +76,11 @@ public class ImplicitDocRouter extends DocRouter {
   }
 
   @Override
+  public String getName() {
+    return NAME;
+  }
+
+  @Override
   public Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection) {
 
     if (shardKey == null) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PlainIdRouter.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PlainIdRouter.java
index f1cea47..d63c5cd 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PlainIdRouter.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PlainIdRouter.java
@@ -19,4 +19,9 @@ package org.apache.solr.common.cloud;
 
 public class PlainIdRouter extends HashBasedRouter {
   public static final String NAME = "plain";
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
index de6b247..cfef82c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
@@ -123,7 +123,9 @@ public interface CollectionParams {
     NONE(false, LockLevel.NONE),
     // TODO: not implemented yet
     MERGESHARDS(true, LockLevel.SHARD),
-    COLSTATUS(true, LockLevel.NONE)
+    COLSTATUS(true, LockLevel.NONE),
+    // this command implements its own locking
+    REINDEXCOLLECTION(true, LockLevel.NONE)
     ;
     public final boolean isWrite;
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/Utils.java b/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
index faaddc8..3d8c6f9 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
@@ -40,6 +40,7 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
@@ -572,7 +573,7 @@ public class Utils {
     VersionedData data = null;
     try {
       data = distribStateManager.getData(path);
-    } catch (KeeperException.NoNodeException e) {
+    } catch (KeeperException.NoNodeException | NoSuchElementException e) {
       return Collections.emptyMap();
     }
     if (data == null || data.getData() == null || data.getData().length == 0) return Collections.emptyMap();