You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2019/03/13 21:25:38 UTC

[lucene-solr] branch jira/solr-11127-2 updated: SOLR-11127: Improved status reporting and error handling. Documentation.

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch jira/solr-11127-2
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/jira/solr-11127-2 by this push:
     new 5ad99eb  SOLR-11127: Improved status reporting and error handling. Documentation.
5ad99eb is described below

commit 5ad99eb131df3bf7f272b3bda355d0a349950f81
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Wed Mar 13 22:25:10 2019 +0100

    SOLR-11127: Improved status reporting and error handling. Documentation.
---
 .../api/collections/ReindexCollectionCmd.java      | 226 ++++++++++++++++-----
 .../solr/handler/admin/CollectionsHandler.java     |   6 +-
 .../apache/solr/cloud/ReindexCollectionTest.java   | 115 +++++++----
 solr/solr-ref-guide/src/collections-api.adoc       | 120 +++++++++++
 .../solr/client/solrj/io/stream/DaemonStream.java  |  13 +-
 .../solrj/request/CollectionAdminRequest.java      |  10 +-
 .../java/org/apache/solr/common/util/Utils.java    |   3 +-
 7 files changed, 381 insertions(+), 112 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
index f1b31d7..553c4bf 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
@@ -24,18 +24,24 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.http.client.HttpClient;
 import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.cloud.DistribStateManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.Aliases;
@@ -55,42 +61,46 @@ import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.util.TestInjection;
 import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Reindex a collection, usually in order to change the index schema.
- * <p>WARNING: Reindexing is a potentially lossy operation - some indexed data that is not available as
+ * <p>WARNING: Reindexing is potentially a lossy operation - some indexed data that is not available as
  * stored fields may be irretrievably lost, so users should use this command with caution, evaluating
  * the potential impact by using different source and target collection names first, and preserving
  * the source collection until the evaluation is complete.</p>
  * <p>Reindexing follows these steps:</p>
  * <ol>
- *    <li>create a temporary collection using the most recent schema of the source collection
- *    (or the one specified in the parameters, which must already exist).</li>
- *    <li>copy the source documents to the temporary collection, reconstructing them from their stored
- *    fields and reindexing them using the specified schema. NOTE: some data
+ *    <li>creates a temporary collection using the most recent schema of the source collection
+ *    (or the one specified in the parameters, which must already exist), and the shape of the original
+ *    collection, unless overridden by parameters.</li>
+ *    <li>copy the source documents to the temporary collection, using their stored fields and
+ *    reindexing them using the specified schema. NOTE: some data
  *    loss may occur if the original stored field data is not available!</li>
- *    <li>if the target collection name is not specified
- *    then the same name as the source is assumed and at this step the source collection is permanently removed.</li>
  *    <li>create the target collection from scratch with the specified name (or the same as source if not
- *    specified), but using the new specified schema. NOTE: if the target name was not specified or is the same
- *    as the source collection then the original collection has been deleted in the previous step and it's
- *    not possible to roll-back the changes if the process is interrupted. The (possibly incomplete) data
- *    is still available in the temporary collection.</li>
- *    <li>copy the documents from the temporary collection to the target collection, using the specified schema.</li>
- *    <li>delete temporary collection(s) and optionally delete the source collection if it still exists.</li>
+ *    specified) and the specified parameters. NOTE: if the target name was not specified or is the same
+ *    as the source collection then a unique sequential collection name will be used.</li>
+ *    <li>copy the documents from the source collection to the target collection.</li>
+ *    <li>if the source and target collection name was the same then set up an alias pointing from the source collection name to the actual
+ *    (sequentially named) target collection</li>
+ *    <li>optionally delete the source collection.</li>
  * </ol>
  */
 public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  public static final String ABORT = "abort";
+  public static final String COMMAND = "cmd";
+  public static final String REINDEX_STATUS = "reindexStatus";
   public static final String REMOVE_SOURCE = "removeSource";
   public static final String TARGET = "target";
   public static final String TARGET_COL_PREFIX = ".rx_";
   public static final String CHK_COL_PREFIX = ".rx_ck_";
-  public static final String REINDEXING_PROP = CollectionAdminRequest.PROPERTY_PREFIX + "rx";
+  public static final String REINDEXING_STATE = CollectionAdminRequest.PROPERTY_PREFIX + "rx";
+
+  public static final String STATE = "state";
+  public static final String PHASE = "phase";
 
   private static final List<String> COLLECTION_PARAMS = Arrays.asList(
       ZkStateReader.CONFIGNAME_PROP,
@@ -121,20 +131,40 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
       return toString().toLowerCase(Locale.ROOT);
     }
 
-    public static State get(String p) {
+    public static State get(Object p) {
       if (p == null) {
         return null;
       }
-      p = p.toLowerCase(Locale.ROOT);
-      if (p.startsWith(CollectionAdminRequest.PROPERTY_PREFIX)) {
-        p = p.substring(CollectionAdminRequest.PROPERTY_PREFIX.length());
-      }
+      p = String.valueOf(p).toLowerCase(Locale.ROOT);
       return states.get(p);
     }
     static Map<String, State> states = Collections.unmodifiableMap(
         Stream.of(State.values()).collect(Collectors.toMap(State::toLower, Function.identity())));
   }
 
+  public enum Cmd {
+    START,
+    ABORT,
+    STATUS;
+
+    public String toLower() {
+      return toString().toLowerCase(Locale.ROOT);
+    }
+
+    public static Cmd get(String p) {
+      if (p == null) {
+        return null;
+      }
+      p = p.toLowerCase(Locale.ROOT);
+      return cmds.get(p);
+    }
+    static Map<String, Cmd> cmds = Collections.unmodifiableMap(
+        Stream.of(Cmd.values()).collect(Collectors.toMap(Cmd::toLower, Function.identity())));
+  }
+
+  private SolrClientCache solrClientCache;
+  private String zkHost;
+
   public ReindexCollectionCmd(OverseerCollectionMessageHandler ocmh) {
     this.ocmh = ocmh;
   }
@@ -156,8 +186,8 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
       }
     }
 
-    if (collection == null || clusterState.getCollectionOrNull(collection) == null) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection name must be specified and must exist");
+    if (collection == null || !clusterState.hasCollection(collection)) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Source collection name must be specified and must exist");
     }
     String target = message.getStr(TARGET);
     if (target == null) {
@@ -171,32 +201,40 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
     }
     boolean sameTarget = target.equals(collection) || target.equals(originalCollection);
     boolean removeSource = message.getBool(REMOVE_SOURCE, false);
-    boolean abort = message.getBool(ABORT, false);
-    DocCollection coll = clusterState.getCollection(collection);
-    if (abort) {
-      log.info("Abort requested for collection " + collection + ", setting the state to ABORTED.");
-      ZkNodeProps props = new ZkNodeProps(
-          Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
-          ZkStateReader.COLLECTION_PROP, collection,
-          REINDEXING_PROP, State.ABORTED.toLower());
-      ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
-      results.add(State.ABORTED.toLower(), collection);
+    Cmd command = Cmd.get(message.getStr(COMMAND, Cmd.START.toLower()));
+    if (command == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown command: " + message.getStr(COMMAND));
+    }
+    Map<String, Object> reindexingState = getReindexingState(ocmh.cloudManager.getDistribStateManager(), collection);
+    if (!reindexingState.containsKey(STATE)) {
+      reindexingState.put(STATE, State.IDLE.toLower());
+    }
+    State state = State.get(reindexingState.get(STATE));
+    if (command == Cmd.ABORT) {
+      log.info("Abort requested for collection {}, setting the state to ABORTED.", collection);
+      // check that it's running
+      if (state != State.RUNNING) {
+        log.debug("Abort requested for collection {} but command is not running: {}", collection, state);
+        return;
+      }
+      setReindexingState(collection, State.ABORTED, null);
+      reindexingState.put(STATE, "aborting");
+      results.add(REINDEX_STATUS, reindexingState);
       // if needed the cleanup will be performed by the running instance of the command
       return;
+    } else if (command == Cmd.STATUS) {
+      results.add(REINDEX_STATUS, reindexingState);
+      return;
     }
+    // command == Cmd.START
+
     // check it's not already running
-    State state = State.get(coll.getStr(REINDEXING_PROP, State.IDLE.toLower()));
     if (state == State.RUNNING) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Reindex is already running for collection " + collection +
-          ". If you are sure this is not the case you can issue &abort=true to clean up this state.");
+          ". If you are sure this is not the case you can issue &cmd=abort to clean up this state.");
     }
-    // set the running flag
-    ZkNodeProps props = new ZkNodeProps(
-        Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
-        ZkStateReader.COLLECTION_PROP, collection,
-        REINDEXING_PROP, State.RUNNING.toLower());
-    ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
 
+    DocCollection coll = clusterState.getCollection(collection);
     boolean aborted = false;
     int batchSize = message.getInt(CommonParams.ROWS, 100);
     String query = message.getStr(CommonParams.Q, "*:*");
@@ -226,11 +264,22 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
     } else {
       targetCollection = target;
     }
-    String chkCollection = CHK_COL_PREFIX + originalCollection + "_" + seq;
+    String chkCollection = CHK_COL_PREFIX + originalCollection;
     String daemonUrl = null;
     Exception exc = null;
     boolean createdTarget = false;
     try {
+      solrClientCache = new SolrClientCache(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
+      zkHost = ocmh.zkStateReader.getZkClient().getZkServerAddress();
+      // set the running flag
+      reindexingState.clear();
+      reindexingState.put("actualSourceCollection", collection);
+      reindexingState.put("actualTargetCollection", targetCollection);
+      reindexingState.put("checkpointCollection", chkCollection);
+      reindexingState.put("inputDocs", getNumberOfDocs(collection));
+      reindexingState.put(PHASE, "creating target and checkpoint collections");
+      setReindexingState(collection, State.RUNNING, reindexingState);
+
       // 0. set up target and checkpoint collections
       NamedList<Object> cmdResults = new NamedList<>();
       ZkNodeProps cmd;
@@ -276,6 +325,7 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
 
       propMap.put(ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode);
       propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, true);
+      propMap.put(DocCollection.STATE_FORMAT, message.getInt(DocCollection.STATE_FORMAT, coll.getStateFormat()));
       if (rf != null) {
         propMap.put(ZkStateReader.REPLICATION_FACTOR, rf);
       }
@@ -301,6 +351,7 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
           CommonParams.NAME, chkCollection,
           ZkStateReader.NUM_SHARDS_PROP, "1",
           ZkStateReader.REPLICATION_FACTOR, "1",
+          DocCollection.STATE_FORMAT, "2",
           CollectionAdminParams.COLL_CONF, "_default",
           CommonAdminParams.WAIT_FOR_FINAL_STATE, "true"
       );
@@ -371,9 +422,13 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to copy documents from " +
             collection + " to " + targetCollection + ": " + Utils.toJSONString(rsp));
       }
+      reindexingState.put("daemonUrl", daemonUrl);
+      reindexingState.put("daemonName", targetCollection);
+      reindexingState.put(PHASE, "copying documents");
+      setReindexingState(collection, State.RUNNING, reindexingState);
 
       // wait for the daemon to finish
-      waitForDaemon(targetCollection, daemonUrl, collection);
+      waitForDaemon(targetCollection, daemonUrl, collection, targetCollection, reindexingState);
       if (maybeAbort(collection)) {
         aborted = true;
         return;
@@ -391,8 +446,15 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
         cmdResults = new NamedList<>();
         ocmh.commandMap.get(CollectionParams.CollectionAction.CREATEALIAS).call(clusterState, cmd, results);
         checkResults("setting up alias " + originalCollection + " -> " + targetCollection, cmdResults, true);
+        reindexingState.put("alias", originalCollection + " -> " + targetCollection);
       }
 
+      reindexingState.remove("daemonUrl");
+      reindexingState.remove("daemonName");
+      reindexingState.put("processedDocs", getNumberOfDocs(targetCollection));
+      reindexingState.put(PHASE, "copying done, finalizing");
+      setReindexingState(collection, State.RUNNING, reindexingState);
+
       if (maybeAbort(collection)) {
         aborted = true;
         return;
@@ -421,33 +483,83 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
         checkResults("deleting source collection " + collection, cmdResults, true);
       } else {
         // 8. clear readOnly on source
-        props = new ZkNodeProps(
+        ZkNodeProps props = new ZkNodeProps(
             Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
             ZkStateReader.COLLECTION_PROP, collection,
             ZkStateReader.READ_ONLY, null);
         ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
       }
-      // 9. set FINISHED state on the target
-      props = new ZkNodeProps(
+      // 9. set FINISHED state on the target and clear the state on the source
+      ZkNodeProps props = new ZkNodeProps(
           Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
           ZkStateReader.COLLECTION_PROP, targetCollection,
-          REINDEXING_PROP, State.FINISHED.toLower());
+          REINDEXING_STATE, State.FINISHED.toLower());
       ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
 
-      results.add(State.FINISHED.toLower(), originalCollection);
+      reindexingState.put(STATE, State.FINISHED.toLower());
+      reindexingState.put(PHASE, "done");
+      removeReindexingState(collection);
     } catch (Exception e) {
       log.warn("Error during reindexing of " + originalCollection, e);
       exc = e;
       aborted = true;
-      throw e;
     } finally {
+      solrClientCache.close();
       if (aborted) {
         cleanup(collection, targetCollection, chkCollection, daemonUrl, targetCollection, createdTarget);
-        results.add(State.ABORTED.toLower(), collection);
         if (exc != null) {
           results.add("error", exc.toString());
         }
+        reindexingState.put(STATE, State.ABORTED.toLower());
       }
+      results.add(REINDEX_STATUS, reindexingState);
+    }
+  }
+
+  private static final String REINDEXING_STATE_PATH = "/.reindexing";
+
+  private Map<String, Object> setReindexingState(String collection, State state, Map<String, Object> props) throws Exception {
+    String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + REINDEXING_STATE_PATH;
+    DistribStateManager stateManager = ocmh.cloudManager.getDistribStateManager();
+    Map<String, Object> copyProps = new HashMap<>();
+    if (props == null) { // retrieve existing props, if any
+      props = Utils.getJson(stateManager, path);
+    }
+    copyProps.putAll(props);
+    copyProps.put("state", state.toLower());
+    if (stateManager.hasData(path)) {
+      stateManager.setData(path, Utils.toJSON(copyProps), -1);
+    } else {
+      stateManager.makePath(path, Utils.toJSON(copyProps), CreateMode.PERSISTENT, false);
+    }
+    return copyProps;
+  }
+
+  private void removeReindexingState(String collection) throws Exception {
+    String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + REINDEXING_STATE_PATH;
+    DistribStateManager stateManager = ocmh.cloudManager.getDistribStateManager();
+    if (stateManager.hasData(path)) {
+      stateManager.removeData(path, -1);
+    }
+  }
+
+  @VisibleForTesting
+  public static Map<String, Object> getReindexingState(DistribStateManager stateManager, String collection) throws Exception {
+    String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + REINDEXING_STATE_PATH;
+    // make it modifiable
+    return new TreeMap<>(Utils.getJson(stateManager, path));
+  }
+
+  private long getNumberOfDocs(String collection) {
+    CloudSolrClient solrClient = solrClientCache.getCloudSolrClient(zkHost);
+    try {
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.add(CommonParams.Q, "*:*");
+      params.add(CommonParams.ROWS, "0");
+      QueryResponse rsp = solrClient.query(collection, params);
+      return rsp.getResults().getNumFound();
+    } catch (Exception e) {
+      return 0L;
     }
   }
 
@@ -473,7 +585,8 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
       log.info("## Aborting - collection {} no longer present.", collection);
       return true;
     }
-    State state = State.get(coll.getStr(REINDEXING_PROP, State.RUNNING.toLower()));
+    Map<String, Object> reindexingState = getReindexingState(ocmh.cloudManager.getDistribStateManager(), collection);
+    State state = State.get(reindexingState.getOrDefault(STATE, State.RUNNING.toLower()));
     if (state != State.ABORTED) {
       return false;
     }
@@ -528,7 +641,7 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
   // XXX currently this is complicated to due a bug in the way the daemon 'list'
   // XXX operation is implemented - see SOLR-13245. We need to query the actual
   // XXX SolrCore where the daemon is running
-  private void waitForDaemon(String daemonName, String daemonUrl, String collection) throws Exception {
+  private void waitForDaemon(String daemonName, String daemonUrl, String sourceCollection, String targetCollection, Map<String, Object> reindexingState) throws Exception {
     HttpClient client = ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
     try (HttpSolrClient solrClient = new HttpSolrClient.Builder()
         .withHttpClient(client)
@@ -539,8 +652,10 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
       q.set(CommonParams.DISTRIB, false);
       QueryRequest req = new QueryRequest(q);
       boolean isRunning;
+      int statusCheck = 0;
       do {
         isRunning = false;
+        statusCheck++;
         try {
           NamedList<Object> rsp = solrClient.request(req);
           Map<String, Object> rs = (Map<String, Object>)rsp.get("result-set");
@@ -568,8 +683,12 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
           throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception waiting for daemon " +
               daemonName + " at " + daemonUrl, e);
         }
+        if (statusCheck % 5 == 0) {
+          reindexingState.put("processedDocs", getNumberOfDocs(targetCollection));
+          setReindexingState(sourceCollection, State.RUNNING, reindexingState);
+        }
         ocmh.cloudManager.getTimeSource().sleep(2000);
-      } while (isRunning && !maybeAbort(collection));
+      } while (isRunning && !maybeAbort(sourceCollection));
     }
   }
 
@@ -698,9 +817,8 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
     ZkNodeProps props = new ZkNodeProps(
         Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
         ZkStateReader.COLLECTION_PROP, collection,
-        // remove the rx flag, we already aborted
-        REINDEXING_PROP, null,
         ZkStateReader.READ_ONLY, null);
     ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+    removeReindexingState(collection);
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 106eece..0772def 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -545,7 +545,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
     REINDEXCOLLECTION_OP(REINDEXCOLLECTION, (req, rsp, h) -> {
       Map<String, Object> m = copy(req.getParams().required(), null, NAME);
       copy(req.getParams(), m,
-          ReindexCollectionCmd.ABORT,
+          ReindexCollectionCmd.COMMAND,
           ReindexCollectionCmd.REMOVE_SOURCE,
           ReindexCollectionCmd.TARGET,
           ZkStateReader.CONFIGNAME_PROP,
@@ -560,9 +560,13 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           CREATE_NODE_SET_SHUFFLE,
           AUTO_ADD_REPLICAS,
           "shards",
+          STATE_FORMAT,
           CommonParams.ROWS,
           CommonParams.Q,
           CommonParams.FL);
+      if (req.getParams().get("collection." + ZkStateReader.CONFIGNAME_PROP) != null) {
+        m.put(ZkStateReader.CONFIGNAME_PROP, req.getParams().get("collection." + ZkStateReader.CONFIGNAME_PROP));
+      }
       copyPropertiesWithPrefix(req.getParams(), m, "router.");
       return m;
     }),
diff --git a/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java b/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java
index 218476c..a9f9a22 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java
@@ -20,19 +20,20 @@ package org.apache.solr.cloud;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
+import org.apache.solr.client.solrj.cloud.DistribStateManager;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
-import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.api.collections.ReindexCollectionCmd;
 import org.apache.solr.common.SolrDocument;
-import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
@@ -67,15 +68,41 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
 
   private CloudSolrClient solrClient;
   private SolrCloudManager cloudManager;
+  private DistribStateManager stateManager;
 
   @Before
   public void doBefore() throws Exception {
     ZkController zkController = cluster.getJettySolrRunner(0).getCoreContainer().getZkController();
     cloudManager = zkController.getSolrCloudManager();
+    stateManager = cloudManager.getDistribStateManager();
     solrClient = new CloudSolrClientBuilder(Collections.singletonList(zkController.getZkServerAddress()),
         Optional.empty()).build();
   }
 
+  private ReindexCollectionCmd.State getState(String collection) {
+    try {
+      return ReindexCollectionCmd.State.get(ReindexCollectionCmd
+          .getReindexingState(stateManager, collection)
+          .get(ReindexCollectionCmd.STATE));
+    } catch (Exception e) {
+      fail("Unexpected exception checking state of " + collection + ": " + e);
+      return null;
+    }
+  }
+
+  private void waitForState(String collection, ReindexCollectionCmd.State expected) throws Exception {
+    TimeOut timeOut = new TimeOut(30, TimeUnit.SECONDS, cloudManager.getTimeSource());
+    ReindexCollectionCmd.State current = null;
+    while (!timeOut.hasTimedOut()) {
+      current = getState(collection);
+      if (expected == current) {
+        return;
+      }
+      timeOut.sleep(500);
+    }
+    throw new Exception("timeout waiting for state, current=" + current + ", expected=" + expected);
+  }
+
   @After
   public void doAfter() throws Exception {
     cluster.deleteAllCollections(); // deletes aliases too
@@ -100,15 +127,19 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
 
     CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
         .setTarget(targetCollection);
-    req.process(solrClient);
+    CollectionAdminResponse rsp = req.process(solrClient);
+    assertNotNull(rsp.toString(), rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS));
+    Map<String, Object> status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
+    assertEquals(status.toString(), (long)NUM_DOCS, ((Number)status.get("inputDocs")).longValue());
+    assertEquals(status.toString(), (long)NUM_DOCS, ((Number)status.get("processedDocs")).longValue());
 
     CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
-      ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_PROP));
+      ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
       return ReindexCollectionCmd.State.FINISHED == state;
     });
     // verify the target docs exist
-    QueryResponse rsp = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
-    assertEquals("copied num docs", NUM_DOCS, rsp.getResults().getNumFound());
+    QueryResponse queryResponse = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
+    assertEquals("copied num docs", NUM_DOCS, queryResponse.getResults().getNumFound());
   }
 
   public void testSameTargetReindexing() throws Exception {
@@ -141,7 +172,7 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
     assertNotNull("target collection not present after 30s", realTargetCollection);
 
     CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", realTargetCollection, (liveNodes, coll) -> {
-      ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_PROP));
+      ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
       return ReindexCollectionCmd.State.FINISHED == state;
     });
     // verify the target docs exist
@@ -169,7 +200,7 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
     req.process(solrClient);
 
     CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
-      ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_PROP));
+      ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
       return ReindexCollectionCmd.State.FINISHED == state;
     });
     // verify the target docs exist
@@ -204,7 +235,7 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
     req.process(solrClient);
 
     CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
-      ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_PROP));
+      ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
       return ReindexCollectionCmd.State.FINISHED == state;
     });
     // verify the target docs exist
@@ -248,24 +279,15 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
 
     CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
         .setTarget(targetCollection);
-    try {
-      req.process(solrClient);
-      fail("succeeded but expected reindexing to fail due to the target collection already present");
-    } catch (Exception e) {
-      assertTrue(e instanceof BaseHttpSolrClient.RemoteSolrException);
-      BaseHttpSolrClient.RemoteSolrException rse = (BaseHttpSolrClient.RemoteSolrException)e;
-      assertEquals(SolrException.ErrorCode.SERVER_ERROR.code, rse.code());
-    }
+    CollectionAdminResponse rsp = req.process(solrClient);
+    assertNotNull(rsp.getResponse().get("error"));
+    assertTrue(rsp.toString(), rsp.getResponse().get("error").toString().contains("already exists"));
+
     req = CollectionAdminRequest.reindexCollection(sourceCollection)
         .setTarget(aliasTarget);
-    try {
-      req.process(solrClient);
-      fail("succeeded but expected reindexing to fail due to the target collection already present");
-    } catch (Exception e) {
-      assertTrue(e instanceof BaseHttpSolrClient.RemoteSolrException);
-      BaseHttpSolrClient.RemoteSolrException rse = (BaseHttpSolrClient.RemoteSolrException)e;
-      assertEquals(SolrException.ErrorCode.SERVER_ERROR.code, rse.code());
-    }
+    rsp = req.process(solrClient);
+    assertNotNull(rsp.getResponse().get("error"));
+    assertTrue(rsp.toString(), rsp.getResponse().get("error").toString().contains("already exists"));
 
     CollectionAdminRequest.deleteAlias(aliasTarget).process(solrClient);
     CollectionAdminRequest.deleteCollection(targetCollection).process(solrClient);
@@ -274,14 +296,10 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
         .setTarget(targetCollection);
 
     TestInjection.reindexFailure = "true:100";
-    try {
-      req.process(solrClient);
-      fail("succeeded but expected reindexing to fail due to a test-injected failure");
-    } catch (Exception e) {
-      assertTrue(e instanceof BaseHttpSolrClient.RemoteSolrException);
-      BaseHttpSolrClient.RemoteSolrException rse = (BaseHttpSolrClient.RemoteSolrException)e;
-      assertEquals(SolrException.ErrorCode.SERVER_ERROR.code, rse.code());
-    }
+    rsp = req.process(solrClient);
+    assertNotNull(rsp.getResponse().get("error"));
+    assertTrue(rsp.toString(), rsp.getResponse().get("error").toString().contains("waiting for daemon"));
+
     // verify that the target and checkpoint collections don't exist
     cloudManager.getClusterStateProvider().getClusterState().forEachCollection(coll -> {
       assertFalse(coll.getName() + " still exists", coll.getName().startsWith(ReindexCollectionCmd.TARGET_COL_PREFIX));
@@ -291,7 +309,8 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
     CloudTestUtils.waitForState(cloudManager, "collection state is incorrect", sourceCollection,
         ((liveNodes, collectionState) ->
             !collectionState.isReadOnly() &&
-            collectionState.getStr(ReindexCollectionCmd.REINDEXING_PROP) == null));
+            collectionState.getStr(ReindexCollectionCmd.REINDEXING_STATE) == null &&
+            getState(sourceCollection) == null));
   }
 
   @Test
@@ -309,22 +328,32 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
         sourceCollection, (liveNodes, coll) -> coll.isReadOnly());
 
     req = CollectionAdminRequest.reindexCollection(sourceCollection);
-    req.setAbort(true);
-    req.process(solrClient);
+    req.setCommand("abort");
+    CollectionAdminResponse rsp = req.process(solrClient);
+    Map<String, Object> status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
+    assertNotNull(rsp.toString(), status);
+    assertEquals(status.toString(), "aborting", status.get("state"));
+
     CloudTestUtils.waitForState(cloudManager, "incorrect collection state", sourceCollection,
         ((liveNodes, collectionState) ->
             collectionState.isReadOnly() &&
-            ReindexCollectionCmd.State.ABORTED.toLower().equals(collectionState.getStr(ReindexCollectionCmd.REINDEXING_PROP))));
+            getState(sourceCollection) == ReindexCollectionCmd.State.ABORTED));
+
+    // verify status
+    req.setCommand("status");
+    rsp = req.process(solrClient);
+    status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
+    assertNotNull(rsp.toString(), status);
+    assertEquals(status.toString(), "aborted", status.get("state"));
     // let the process continue
     TestInjection.reindexLatch.countDown();
     CloudTestUtils.waitForState(cloudManager, "source collection is in wrong state",
-        sourceCollection, (liveNodes, docCollection) -> {
-          System.err.println("-- coll " + docCollection);
-          return !docCollection.isReadOnly() && docCollection.getStr(ReindexCollectionCmd.REINDEXING_PROP) == null;
-        });
+        sourceCollection, (liveNodes, docCollection) -> !docCollection.isReadOnly() && getState(sourceCollection) == null);
     // verify the response
-    CollectionAdminRequest.RequestStatusResponse rsp = CollectionAdminRequest.requestStatus(asyncId).process(solrClient);
-    rsp.getRequestStatus();
+    rsp = CollectionAdminRequest.requestStatus(asyncId).process(solrClient);
+    status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
+    assertNotNull(rsp.toString(), status);
+    assertEquals(status.toString(), "aborted", status.get("state"));
   }
 
   private void createCollection(String name, String config, int numShards, int numReplicas) throws Exception {
diff --git a/solr/solr-ref-guide/src/collections-api.adoc b/solr/solr-ref-guide/src/collections-api.adoc
index a66ddfa..48bef7e 100644
--- a/solr/solr-ref-guide/src/collections-api.adoc
+++ b/solr/solr-ref-guide/src/collections-api.adoc
@@ -198,6 +198,7 @@ The attributes that can be modified are:
 
 See the <<create,CREATE action>> section above for details on these attributes.
 
+[[readonlymode]]
 ==== Read-only mode
 Setting the `readOnly` attribute to `true` puts the collection in read-only mode,
 in which any index update requests are rejected. Other collection-level actions (eg. adding /
@@ -218,6 +219,125 @@ NOTE: This may potentially take a long time if there are still major segment mer
 Removing the `readOnly` property or setting it to false enables the
 processing of updates and reloads the collection.
 
+[[reindexcollection]]
+== REINDEXCOLLECTION: Re-index a Collection
+
+`/admin/collections?action=REINDEXCOLLECTION&name=_name_`
+
+The REINDEXCOLLECTION command re-indexes a collection using existing data from the
+source collection.
+
+NOTE: Re-indexing is potentially a lossy operation - some of the existing indexed data that is not
+available as stored fields may be lost, so users should use this command
+with caution, evaluating the potential impact by using different source and target
+collection names first, and preserving the source collection until the evaluation is
+complete.
+
+The target collection must not exist (and may not be an alias). If the target
+collection name is the same as the source collection then first a unique sequential name
+will be generated for the target collection, and then after re-indexing is done an alias
+will be created that points from the source name to the actual sequentially-named target collection.
+
+When re-indexing is started the source collection is put in <<readonlymode,read-only mode>> to ensure that
+all source documents are properly processed.
+
+Using optional parameters a different index schema, collection shape (number of shards and replicas)
+or routing parameters can be requested for the target collection.
+
+Re-indexing is executed as a streaming expression daemon, which runs on one of the
+source collection's replicas. It is usually a time-consuming operation so it's recommended to execute
+it as an asynchronous request in order to avoid request time outs. Only one re-indexing operation may
+execute concurrently for a given source collection. Long-running, erroneous or crashed re-indexing
+operations may be terminated by using the `abort` option, which also removes partial results.
+
+=== REINDEXCOLLECTION Parameters
+
+`name`::
+Source collection name, may be an alias. This parameter is required.
+
+`cmd`::
+Optional command. Default command is `start`. Currently supported commands are:
+* `start` - default, starts processing if not already running,
+* `abort` - aborts an already running re-indexing (or clears a left-over status after a crash),
+and deletes partial results,
+* `status` - returns detailed status of a running re-indexing command.
+
+`target`::
+Target collection name, optional. If not specified a unique name will be generated and
+after all documents have been copied an alias will be created that points from the source
+collection name to the unique sequentially-named collection, effectively "hiding"
+the original source collection from regular update and search operations.
+
+`q`::
+Optional query to select documents for re-indexing. Default value is `\*:*`.
+
+`fl`::
+Optional list of fields to re-index. Default value is `*`.
+
+`rows`::
+Documents are transferred in batches. Depending on the average size of the document large
+batch sizes may cause memory issues. Default value is 100.
+
+`configName`::
+`collection.configName`::
+Optional name of the configset for the target collection. Default is the same as the
+source collection.
+
+There's a number of optional parameters that determine the target collection layout. If they
+are not specified in the request then their values are copied from the source collection.
+The following parameters are currently supported (described in details in the <<create,CREATE collection>> section):
+`numShards`, `replicationFactor`, `nrtReplicas`, `tlogReplicas`, `pullReplicas`, `maxShardsPerNode`,
+`autoAddReplicas`, `shards`, `policy`, `createNodeSet`, `createNodeSet.shuffle`, `router.*`.
+
+`removeSource`::
+Optional boolean. If true then after the processing is successfully finished the source collection will
+be deleted.
+
+`async`::
+Optional request ID to track this action which will be <<Asynchronous Calls,processed asynchronously>>.
+
+When the re-indexing process has completed the target collection is marked using
+`property.rx: "finished"`, and the source collection state is updated to become read-write.
+On any errors the command will delete any temporary and target collections and also reset the
+state of the source collection's read-only flag.
+
+=== Examples using REINDEXCOLLECTION
+
+*Input*
+
+[source,text]
+----
+http://localhost:8983/solr/admin/collections?action=REINDEXCOLLECTION&name=newCollection&numShards=3&configName=conf2&q=id:aa*&fl=id,string_s
+----
+This request specifies a different schema for the target collection, copies only some of the fields, selects only the documents
+matching a query, and also potentially re-shapes the collection by explicitly specifying 3 shards. Since the target collection
+hasn't been specified in the parameters a collection with a unique name eg. `.rx_newCollection_2` will be created and on success
+an alias pointing from `newCollection -> .rx_newCollection_2` will be created, effectively replacing the source collection
+for the purpose of indexing and searching. The source collection is assumed to be small so a synchronous request was made.
+
+*Output*
+
+[source,json]
+----
+{
+  "responseHeader":{
+    "status":0,
+    "QTime":10757},
+  "reindexStatus":{
+    "phase":"done",
+    "inputDocs":13416,
+    "processedDocs":376,
+    "actualSourceCollection":".rx_newCollection_1",
+    "state":"finished",
+    "actualTargetCollection":".rx_newCollection_2",
+    "checkpointCollection":".rx_ck_newCollection"
+  }
+}
+----
+As a result a new collection `.rx_newCollection_2` has been created, with selected documents re-indexed to 3 shards, and
+with an alias pointing from `newCollection` to this one. The status also shows that the source collection
+was already an alias to `.rx_newCollection_1`, which was likely a result of a previous re-indexing.
+
 [[reload]]
 == RELOAD: Reload a Collection
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
index 9d02ec2..ec56bfe 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -51,7 +52,7 @@ public class DaemonStream extends TupleStream implements Expressible {
   private ArrayBlockingQueue<Tuple> queue;
   private int queueSize;
   private boolean eatTuples;
-  private long iterations;
+  private AtomicLong iterations = new AtomicLong();
   private long startTime;
   private long stopTime;
   private Exception exception;
@@ -240,7 +241,7 @@ public class DaemonStream extends TupleStream implements Expressible {
     tuple.put(ID, id);
     tuple.put("startTime", startTime);
     tuple.put("stopTime", stopTime);
-    tuple.put("iterations", iterations);
+    tuple.put("iterations", iterations.get());
     tuple.put("state", streamRunner.getState().toString());
     if(exception != null) {
       tuple.put("exception", exception.getMessage());
@@ -253,10 +254,6 @@ public class DaemonStream extends TupleStream implements Expressible {
     this.daemons = daemons;
   }
 
-  private synchronized void incrementIterations() {
-    ++iterations;
-  }
-
   private synchronized void setStartTime(long startTime) {
     this.startTime = startTime;
   }
@@ -332,7 +329,7 @@ public class DaemonStream extends TupleStream implements Expressible {
             log.error("Error in DaemonStream:" + id, e);
             ++errors;
             if (errors > 100) {
-              log.error("Too many consectutive errors. Stopping DaemonStream:" + id);
+              log.error("Too many consecutive errors. Stopping DaemonStream:" + id);
               break OUTER;
             }
           } catch (Throwable t) {
@@ -351,7 +348,7 @@ public class DaemonStream extends TupleStream implements Expressible {
             }
           }
         }
-        incrementIterations();
+        iterations.incrementAndGet();
 
         if (sleepMillis > 0) {
           try {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index d0ae477..799f7cf 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -796,7 +796,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     String fields;
     String configName;
     Boolean removeSource;
-    Boolean abort;
+    String cmd;
     Integer batchSize;
     Map<String, Object> collectionParams = new HashMap<>();
 
@@ -810,9 +810,9 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       return this;
     }
 
-    /** Set to true to abort already running requests. */
-    public ReindexCollection setAbort(boolean abort) {
-      this.abort = abort;
+    /** Set optional command (eg. abort, status). */
+    public ReindexCollection setCommand(String command) {
+      this.cmd = command;
       return this;
     }
 
@@ -856,7 +856,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     public SolrParams getParams() {
       ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
       params.setNonNull("target", target);
-      params.setNonNull("abort", abort);
+      params.setNonNull("cmd", cmd);
       params.setNonNull(ZkStateReader.CONFIGNAME_PROP, configName);
       params.setNonNull(CommonParams.Q, query);
       params.setNonNull(CommonParams.FL, fields);
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/Utils.java b/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
index d079052..50e7c0c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
@@ -40,6 +40,7 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
@@ -571,7 +572,7 @@ public class Utils {
     VersionedData data = null;
     try {
       data = distribStateManager.getData(path);
-    } catch (KeeperException.NoNodeException e) {
+    } catch (KeeperException.NoNodeException | NoSuchElementException e) {
       return Collections.emptyMap();
     }
     if (data == null || data.getData() == null || data.getData().length == 0) return Collections.emptyMap();