You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2019/03/12 18:35:33 UTC

[lucene-solr] 03/05: SOLR-11127: Improved error handling, bug fixes, more tests.

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch jira/solr-11127-2
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit fb37b7c730e01e2b3b246cb32f34c7001269196e
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Tue Mar 12 18:02:43 2019 +0100

    SOLR-11127: Improved error handling, bug fixes, more tests.
---
 .../OverseerCollectionMessageHandler.java          |   2 +-
 .../api/collections/ReindexCollectionCmd.java      | 176 ++++++++++++++++-----
 .../java/org/apache/solr/core/CoreContainer.java   |   6 +
 .../solr/handler/admin/CollectionsHandler.java     |   7 +-
 .../java/org/apache/solr/util/TestInjection.java   |  35 ++++
 .../apache/solr/cloud/ReindexCollectionTest.java   | 150 ++++++++++++++++--
 .../solrj/request/CollectionAdminRequest.java      |  32 +++-
 .../solr/common/params/CollectionParams.java       |   3 +-
 8 files changed, 349 insertions(+), 62 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 26818c4..86b8570 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -241,7 +241,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
         .put(DELETEREPLICA, new DeleteReplicaCmd(this))
         .put(ADDREPLICA, new AddReplicaCmd(this))
         .put(MOVEREPLICA, new MoveReplicaCmd(this))
-        .put(REINDEX_COLLECTION, new ReindexCollectionCmd(this))
+        .put(REINDEXCOLLECTION, new ReindexCollectionCmd(this))
         .put(UTILIZENODE, new UtilizeNodeCmd(this))
         .build()
     ;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
index d676791..452e394 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
@@ -38,6 +38,7 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Aliases;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
@@ -52,6 +53,7 @@ import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.TestInjection;
 import org.apache.solr.util.TimeOut;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,12 +86,11 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   public static final String ABORT = "abort";
-  public static final String KEEP_SOURCE = "keepSource";
+  public static final String REMOVE_SOURCE = "removeSource";
   public static final String TARGET = "target";
-  public static final String TARGET_COL_PREFIX = ".reindex_";
-  public static final String CHK_COL_PREFIX = ".reindex_ck_";
-  public static final String REINDEXING_PROP = CollectionAdminRequest.PROPERTY_PREFIX + "reindexing";
-  public static final String REINDEX_PHASE_PROP = CollectionAdminRequest.PROPERTY_PREFIX + "reindex_phase";
+  public static final String TARGET_COL_PREFIX = ".rx_";
+  public static final String CHK_COL_PREFIX = ".rx_ck_";
+  public static final String REINDEXING_PROP = CollectionAdminRequest.PROPERTY_PREFIX + "rx";
 
   private static final List<String> COLLECTION_PARAMS = Arrays.asList(
       ZkStateReader.CONFIGNAME_PROP,
@@ -141,21 +142,39 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
   @Override
   public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
 
-    log.info("*** called: {}", message);
+    log.debug("*** called: {}", message);
 
     String collection = message.getStr(CommonParams.NAME);
+    // before resolving aliases
+    String originalCollection = collection;
+    Aliases aliases = ocmh.zkStateReader.getAliases();
+    if (collection != null) {
+      // resolve aliases - the source may be an alias
+      List<String> aliasList = aliases.resolveAliases(collection);
+      if (aliasList != null && !aliasList.isEmpty()) {
+        collection = aliasList.get(0);
+      }
+    }
+
     if (collection == null || clusterState.getCollectionOrNull(collection) == null) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection name must be specified and must exist");
     }
     String target = message.getStr(TARGET);
     if (target == null) {
       target = collection;
+    } else {
+      // resolve aliases
+      List<String> aliasList = aliases.resolveAliases(target);
+      if (aliasList != null && !aliasList.isEmpty()) {
+        target = aliasList.get(0);
+      }
     }
-    boolean sameTarget = target.equals(collection);
-    boolean keepSource = message.getBool(KEEP_SOURCE, true);
+    boolean sameTarget = target.equals(collection) || target.equals(originalCollection);
+    boolean removeSource = message.getBool(REMOVE_SOURCE, false);
     boolean abort = message.getBool(ABORT, false);
     DocCollection coll = clusterState.getCollection(collection);
     if (abort) {
+      log.info("Abort requested for collection " + collection + ", setting the state to ABORTED.");
       ZkNodeProps props = new ZkNodeProps(
           Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
           ZkStateReader.COLLECTION_PROP, collection,
@@ -181,6 +200,7 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
     boolean aborted = false;
     int batchSize = message.getInt(CommonParams.ROWS, 100);
     String query = message.getStr(CommonParams.Q, "*:*");
+    String fl = message.getStr(CommonParams.FL, "*");
     Integer rf = message.getInt(ZkStateReader.REPLICATION_FACTOR, coll.getReplicationFactor());
     Integer numNrt = message.getInt(ZkStateReader.NRT_REPLICAS, coll.getNumNrtReplicas());
     Integer numTlog = message.getInt(ZkStateReader.TLOG_REPLICAS, coll.getNumTlogReplicas());
@@ -193,12 +213,23 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
     }
 
     String configName = message.getStr(ZkStateReader.CONFIGNAME_PROP, ocmh.zkStateReader.readConfigName(collection));
+    String targetCollection;
     int seq = tmpCollectionSeq.getAndIncrement();
-    String targetCollection = sameTarget ?
-        TARGET_COL_PREFIX + collection + "_" + seq : target;
-    String chkCollection = CHK_COL_PREFIX + collection + "_" + seq;
+    if (sameTarget) {
+      do {
+        targetCollection = TARGET_COL_PREFIX + originalCollection + "_" + seq;
+        if (!clusterState.hasCollection(targetCollection)) {
+          break;
+        }
+        seq = tmpCollectionSeq.getAndIncrement();
+      } while (clusterState.hasCollection(targetCollection));
+    } else {
+      targetCollection = target;
+    }
+    String chkCollection = CHK_COL_PREFIX + originalCollection + "_" + seq;
     String daemonUrl = null;
     Exception exc = null;
+    boolean createdTarget = false;
     try {
       // 0. set up target and checkpoint collections
       NamedList<Object> cmdResults = new NamedList<>();
@@ -214,7 +245,7 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
             CoreAdminParams.DELETE_METRICS_HISTORY, "true"
         );
         ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
-        // nocommit error checking
+        checkResults("deleting old checkpoint collection " + chkCollection, cmdResults, true);
       }
 
       if (maybeAbort(collection)) {
@@ -259,8 +290,10 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
       }
       // create the target collection
       cmd = new ZkNodeProps(propMap);
+      cmdResults = new NamedList<>();
       ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, cmd, cmdResults);
-      // nocommit error checking
+      createdTarget = true;
+      checkResults("creating target collection " + targetCollection, cmdResults, true);
 
       // create the checkpoint collection - use RF=1 and 1 shard
       cmd = new ZkNodeProps(
@@ -271,8 +304,9 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
           CollectionAdminParams.COLL_CONF, "_default",
           CommonAdminParams.WAIT_FOR_FINAL_STATE, "true"
       );
+      cmdResults = new NamedList<>();
       ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, cmd, cmdResults);
-      // nocommit error checking
+      checkResults("creating checkpoint collection " + chkCollection, cmdResults, true);
       // wait for a while until we see both collections
       TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, ocmh.timeSource);
       boolean created = false;
@@ -296,7 +330,14 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
           Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
           ZkStateReader.COLLECTION_PROP, collection,
           ZkStateReader.READ_ONLY, "true");
-      ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+      ocmh.overseer.offerStateUpdate(Utils.toJSON(cmd));
+
+      TestInjection.injectReindexLatch();
+
+      if (maybeAbort(collection)) {
+        aborted = true;
+        return;
+      }
 
       // 2. copy the documents to target
       // Recipe taken from: http://joelsolr.blogspot.com/2016/10/solr-63-batch-jobs-parallel-etl-and.html
@@ -312,12 +353,12 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
                   "topic(" + chkCollection + "," +
                     collection + "," +
                     "q=\"" + query + "\"," +
-                    "fl=\"*\"," +
+                    "fl=\"" + fl + "\"," +
                     "id=\"topic_" + targetCollection + "\"," +
                     // some of the documents eg. in .system contain large blobs
                     "rows=\"" + batchSize + "\"," +
                     "initialCheckpoint=\"0\"))))");
-      log.info("- starting copying documents from " + collection + " to " + targetCollection);
+      log.debug("- starting copying documents from " + collection + " to " + targetCollection);
       SolrResponse rsp = null;
       try {
         rsp = ocmh.cloudManager.request(new QueryRequest(q));
@@ -337,14 +378,18 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
         aborted = true;
         return;
       }
-      log.info("- finished copying from " + collection + " to " + targetCollection);
+      TestInjection.injectReindexFailure();
+      log.debug("- finished copying from " + collection + " to " + targetCollection);
 
       // 5. if (sameTarget) set up an alias to use targetCollection as the source name
       if (sameTarget) {
+        log.debug("- setting up alias from " + originalCollection + " to " + targetCollection);
         cmd = new ZkNodeProps(
-            CommonParams.NAME, collection,
+            CommonParams.NAME, originalCollection,
             "collections", targetCollection);
+        cmdResults = new NamedList<>();
         ocmh.commandMap.get(CollectionParams.CollectionAction.CREATEALIAS).call(clusterState, cmd, results);
+        checkResults("setting up alias " + originalCollection + " -> " + targetCollection, cmdResults, true);
       }
 
       if (maybeAbort(collection)) {
@@ -352,42 +397,51 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
         return;
       }
       // 6. delete the checkpoint collection
+      log.debug("- deleting " + chkCollection);
       cmd = new ZkNodeProps(
           Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
           CommonParams.NAME, chkCollection,
           CoreAdminParams.DELETE_METRICS_HISTORY, "true"
       );
+      cmdResults = new NamedList<>();
       ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
-
-      // nocommit error checking
+      checkResults("deleting checkpoint collection " + chkCollection, cmdResults, true);
 
       // 7. optionally delete the source collection
-      if (keepSource) {
-        // 8. set the FINISHED state and clear readOnly
-        props = new ZkNodeProps(
-            Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
-            ZkStateReader.COLLECTION_PROP, collection,
-            REINDEXING_PROP, State.FINISHED.toLower(),
-            ZkStateReader.READ_ONLY, "");
-        ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
-      } else {
+      if (removeSource) {
+        log.debug("- deleting source collection");
         cmd = new ZkNodeProps(
             Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
             CommonParams.NAME, collection,
             CoreAdminParams.DELETE_METRICS_HISTORY, "true"
         );
+        cmdResults = new NamedList<>();
         ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
+        checkResults("deleting source collection " + collection, cmdResults, true);
+      } else {
+        // 8. clear readOnly on source
+        props = new ZkNodeProps(
+            Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
+            ZkStateReader.COLLECTION_PROP, collection,
+            ZkStateReader.READ_ONLY, null);
+        ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
       }
+      // 9. set FINISHED state on the target
+      props = new ZkNodeProps(
+          Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
+          ZkStateReader.COLLECTION_PROP, targetCollection,
+          REINDEXING_PROP, State.FINISHED.toLower());
+      ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
 
-      results.add(State.FINISHED.toLower(), collection);
+      results.add(State.FINISHED.toLower(), originalCollection);
     } catch (Exception e) {
-      log.warn("Error during reindexing of " + collection, e);
+      log.warn("Error during reindexing of " + originalCollection, e);
       exc = e;
       aborted = true;
       throw e;
     } finally {
       if (aborted) {
-        cleanup(collection, targetCollection, chkCollection, daemonUrl, targetCollection);
+        cleanup(collection, targetCollection, chkCollection, daemonUrl, targetCollection, createdTarget);
         results.add(State.ABORTED.toLower(), collection);
         if (exc != null) {
           results.add("error", exc.toString());
@@ -396,16 +450,33 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
     }
   }
 
+  private void checkResults(String label, NamedList<Object> results, boolean failureIsFatal) throws Exception {
+    Object failure = results.get("failure");
+    if (failure == null) {
+      failure = results.get("error");
+    }
+    if (failure != null) {
+      String msg = "Error: " + label + ": " + Utils.toJSONString(results);
+      if (failureIsFatal) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, msg);
+      } else {
+        log.error(msg);
+      }
+    }
+  }
+
   private boolean maybeAbort(String collection) throws Exception {
     DocCollection coll = ocmh.cloudManager.getClusterStateProvider().getClusterState().getCollectionOrNull(collection);
     if (coll == null) {
       // collection no longer present - abort
+      log.info("## Aborting - collection {} no longer present.", collection);
       return true;
     }
     State state = State.get(coll.getStr(REINDEXING_PROP, State.RUNNING.toLower()));
     if (state != State.ABORTED) {
       return false;
     }
+    log.info("## Aborting - collection {} state is {}", collection, state);
     return true;
   }
 
@@ -413,11 +484,11 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
   private String getDaemonUrl(SolrResponse rsp, DocCollection coll) {
     Map<String, Object> rs = (Map<String, Object>)rsp.getResponse().get("result-set");
     if (rs == null || rs.isEmpty()) {
-      log.debug("Missing daemon information in response: " + Utils.toJSONString(rsp));
+      log.debug(" -- Missing daemon information in response: " + Utils.toJSONString(rsp));
     }
     List<Object> list = (List<Object>)rs.get("docs");
     if (list == null) {
-      log.debug("Missing daemon information in response: " + Utils.toJSONString(rsp));
+      log.debug(" -- Missing daemon information in response: " + Utils.toJSONString(rsp));
       return null;
     }
     String replicaName = null;
@@ -429,7 +500,7 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
       }
       String[] parts = op.split("\\s+");
       if (parts.length != 4) {
-        log.debug("Invalid daemon location info, expected 4 tokens: " + op);
+        log.debug(" -- Invalid daemon location info, expected 4 tokens: " + op);
         return null;
       }
       // check if it's plausible
@@ -437,7 +508,7 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
         replicaName = parts[3];
         break;
       } else {
-        log.debug("daemon location info likely invalid: " + op);
+        log.debug(" -- daemon location info likely invalid: " + op);
         return null;
       }
     }
@@ -493,12 +564,13 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
         } catch (Exception e) {
           throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception copying the documents", e);
         }
-        ocmh.cloudManager.getTimeSource().sleep(5000);
+        ocmh.cloudManager.getTimeSource().sleep(2000);
       } while (isRunning && !maybeAbort(collection));
     }
   }
 
   private void killDaemon(String daemonName, String daemonUrl) throws Exception {
+    log.debug("-- killing daemon " + daemonName + " at " + daemonUrl);
     HttpClient client = ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
     try (HttpSolrClient solrClient = new HttpSolrClient.Builder()
         .withHttpClient(client)
@@ -516,7 +588,9 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
     }
   }
 
-  private void cleanup(String collection, String targetCollection, String chkCollection, String daemonUrl, String daemonName) throws Exception {
+  private void cleanup(String collection, String targetCollection, String chkCollection,
+                       String daemonUrl, String daemonName, boolean createdTarget) throws Exception {
+    log.info("## Cleaning up after abort or error");
     // 1. kill the daemon
     // 2. cleanup target / chk collections IFF the source collection still exists and is not empty
     // 3. cleanup collection state
@@ -526,20 +600,36 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
     }
     ClusterState clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
     NamedList<Object> cmdResults = new NamedList<>();
-    if (!collection.equals(targetCollection) && clusterState.hasCollection(targetCollection)) {
+    if (createdTarget && !collection.equals(targetCollection) && clusterState.hasCollection(targetCollection)) {
+      log.debug(" -- removing " + targetCollection);
       ZkNodeProps cmd = new ZkNodeProps(
           Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
           CommonParams.NAME, targetCollection,
           CoreAdminParams.DELETE_METRICS_HISTORY, "true"
       );
       ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
-      // nocommit error checking
+      checkResults("CLEANUP: deleting target collection " + targetCollection, cmdResults, false);
+
+    }
+    // remove chk collection
+    if (clusterState.hasCollection(chkCollection)) {
+      log.debug(" -- removing " + chkCollection);
+      ZkNodeProps cmd = new ZkNodeProps(
+          Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
+          CommonParams.NAME, chkCollection,
+          CoreAdminParams.DELETE_METRICS_HISTORY, "true"
+      );
+      cmdResults = new NamedList<>();
+      ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
+      checkResults("CLEANUP: deleting checkpoint collection " + chkCollection, cmdResults, false);
     }
+    log.debug(" -- turning readOnly mode off for " + collection);
     ZkNodeProps props = new ZkNodeProps(
         Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
         ZkStateReader.COLLECTION_PROP, collection,
-        REINDEXING_PROP, State.ABORTED.toLower(),
-        ZkStateReader.READ_ONLY, "");
+        // remove the rx flag, we already aborted
+        REINDEXING_PROP, null,
+        ZkStateReader.READ_ONLY, null);
     ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 8c5e227..58c9f69 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -747,10 +747,16 @@ public class CoreContainer {
       containerHandlers.put(AutoScalingHandler.HANDLER_PATH, autoScalingHandler);
       autoScalingHandler.initializeMetrics(metricManager, SolrInfoBean.Group.node.toString(), metricTag, AutoScalingHandler.HANDLER_PATH);
     }
+    // verify .system compatibility
+    systemCollCompatCheck();
     // This is a bit redundant but these are two distinct concepts for all they're accomplished at the same time.
     status |= LOAD_COMPLETE | INITIAL_CORE_LOAD_COMPLETE;
   }
 
+  private void systemCollCompatCheck() {
+
+  }
+
   // MetricsHistoryHandler supports both cloud and standalone configs
   private void createMetricsHistoryHandler() {
     PluginInfo plugin = cfg.getMetricsConfig().getHistoryHandler();
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 6b53c00..106eece 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -542,11 +542,11 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
 
     RELOAD_OP(RELOAD, (req, rsp, h) -> copy(req.getParams().required(), null, NAME)),
 
-    REINDEX_COLLECTION_OP(REINDEX_COLLECTION, (req, rsp, h) -> {
+    REINDEXCOLLECTION_OP(REINDEXCOLLECTION, (req, rsp, h) -> {
       Map<String, Object> m = copy(req.getParams().required(), null, NAME);
       copy(req.getParams(), m,
           ReindexCollectionCmd.ABORT,
-          ReindexCollectionCmd.KEEP_SOURCE,
+          ReindexCollectionCmd.REMOVE_SOURCE,
           ReindexCollectionCmd.TARGET,
           ZkStateReader.CONFIGNAME_PROP,
           NUM_SLICES,
@@ -561,7 +561,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           AUTO_ADD_REPLICAS,
           "shards",
           CommonParams.ROWS,
-          CommonParams.Q);
+          CommonParams.Q,
+          CommonParams.FL);
       copyPropertiesWithPrefix(req.getParams(), m, "router.");
       return m;
     }),
diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java b/solr/core/src/java/org/apache/solr/util/TestInjection.java
index cf7681e..7a49ba4 100644
--- a/solr/core/src/java/org/apache/solr/util/TestInjection.java
+++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java
@@ -126,6 +126,10 @@ public class TestInjection {
 
   public volatile static CountDownLatch splitLatch = null;
 
+  public volatile static CountDownLatch reindexLatch = null;
+
+  public volatile static String reindexFailure = null;
+
   public volatile static String failIndexFingerprintRequests = null;
 
   public volatile static String wrongIndexFingerprint = null;
@@ -156,6 +160,8 @@ public class TestInjection {
     splitFailureBeforeReplicaCreation = null;
     splitFailureAfterReplicaCreation = null;
     splitLatch = null;
+    reindexLatch = null;
+    reindexFailure = null;
     prepRecoveryOpPauseForever = null;
     countPrepRecoveryOpPauseForever = new AtomicInteger(0);
     failIndexFingerprintRequests = null;
@@ -423,6 +429,35 @@ public class TestInjection {
     return true;
   }
 
+  public static boolean injectReindexFailure() {
+    if (reindexFailure != null)  {
+      Random rand = random();
+      if (null == rand) return true;
+
+      Pair<Boolean,Integer> pair = parseValue(reindexFailure);
+      boolean enabled = pair.first();
+      int chanceIn100 = pair.second();
+      if (enabled && rand.nextInt(100) >= (100 - chanceIn100)) {
+        log.info("Test injection failure");
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Test injection failure");
+      }
+    }
+    return true;
+  }
+
+
+  public static boolean injectReindexLatch() {
+    if (reindexLatch != null) {
+      try {
+        log.info("Waiting in ReindexCollectionCmd for up to 60s");
+        return reindexLatch.await(60, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+    return true;
+  }
+
   private static Pair<Boolean,Integer> parseValue(final String raw) {
     if (raw == null) return new Pair<>(false, 0);
     Matcher m = ENABLED_PERCENT.matcher(raw);
diff --git a/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java b/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java
index 2a9cfd4..218476c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java
@@ -21,21 +21,27 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.api.collections.ReindexCollectionCmd;
 import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.ImplicitDocRouter;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.util.LogLevel;
+import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.TimeOut;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -44,6 +50,7 @@ import org.junit.Test;
 /**
  *
  */
+@LogLevel("org.apache.solr.cloud.api.collections.ReindexCollectionCmd=DEBUG")
 public class ReindexCollectionTest extends SolrCloudTestCase {
 
   @BeforeClass
@@ -74,6 +81,8 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
     cluster.deleteAllCollections(); // deletes aliases too
 
     solrClient.close();
+
+    TestInjection.reset();
   }
 
   private static final int NUM_DOCS = 200; // at least two batches, default batchSize=100
@@ -93,7 +102,7 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
         .setTarget(targetCollection);
     req.process(solrClient);
 
-    CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", sourceCollection, (liveNodes, coll) -> {
+    CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
       ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_PROP));
       return ReindexCollectionCmd.State.FINISHED == state;
     });
@@ -114,7 +123,24 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
         .setTarget(targetCollection);
     req.process(solrClient);
 
-    CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", sourceCollection, (liveNodes, coll) -> {
+    String realTargetCollection = null;
+    TimeOut timeOut = new TimeOut(30, TimeUnit.SECONDS, cloudManager.getTimeSource());
+    String prefix = ReindexCollectionCmd.TARGET_COL_PREFIX + targetCollection;
+    while (!timeOut.hasTimedOut()) {
+      timeOut.sleep(500);
+      for (String name : cloudManager.getClusterStateProvider().getClusterState().getCollectionsMap().keySet()) {
+        if (name.startsWith(prefix)) {
+          realTargetCollection = name;
+          break;
+        }
+      }
+      if (realTargetCollection != null) {
+        break;
+      }
+    }
+    assertNotNull("target collection not present after 30s", realTargetCollection);
+
+    CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", realTargetCollection, (liveNodes, coll) -> {
       ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_PROP));
       return ReindexCollectionCmd.State.FINISHED == state;
     });
@@ -142,7 +168,7 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
         .setConfigName("conf3");
     req.process(solrClient);
 
-    CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", sourceCollection, (liveNodes, coll) -> {
+    CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
       ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_PROP));
       return ReindexCollectionCmd.State.FINISHED == state;
     });
@@ -159,26 +185,38 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
   @Test
   public void testReshapeReindexing() throws Exception {
     final String sourceCollection = "reshapeReindexing";
-    final String targetCollection = sourceCollection;
+    final String targetCollection = "reshapeReindexingTarget";
     createCollection(sourceCollection, "conf1", 2, 2);
     indexDocs(sourceCollection, NUM_DOCS,
-        i -> new SolrInputDocument("id", String.valueOf(i), "string_s", String.valueOf(i)));
+        i -> new SolrInputDocument(
+            "id", String.valueOf(i),
+            "string_s", String.valueOf(i),
+            "remove_s", String.valueOf(i)));
 
     CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
         .setTarget(targetCollection)
         .setCollectionParam(ZkStateReader.NUM_SHARDS_PROP, 3)
         .setCollectionParam(ZkStateReader.REPLICATION_FACTOR, 1)
         .setCollectionParam("router.name", ImplicitDocRouter.NAME)
-        .setCollectionParam("shards", "foo,bar,baz");
+        .setCollectionParam("shards", "foo,bar,baz")
+        .setCollectionParam("fl", "id,string_s")
+        .setCollectionParam("q", "id:10*");
     req.process(solrClient);
 
-    CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", sourceCollection, (liveNodes, coll) -> {
+    CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
       ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_PROP));
       return ReindexCollectionCmd.State.FINISHED == state;
     });
     // verify the target docs exist
     QueryResponse rsp = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
-    assertEquals("copied num docs", NUM_DOCS, rsp.getResults().getNumFound());
+    // 10 and 100-109
+    assertEquals("copied num docs", 11, rsp.getResults().getNumFound());
+    // verify the correct fields exist
+    for (SolrDocument doc : rsp.getResults()) {
+      assertNotNull(doc.getFieldValue("id"));
+      assertNotNull(doc.getFieldValue("string_s"));
+      assertNull(doc.getFieldValue("remove_s"));
+    }
 
     // check the shape of the new collection
     ClusterState clusterState = solrClient.getClusterStateProvider().getClusterState();
@@ -195,6 +233,100 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
     assertEquals(ImplicitDocRouter.NAME, coll.getRouter().getName());
   }
 
+  @Test
+  public void testFailure() throws Exception {
+    final String sourceCollection = "failReindexing";
+    final String targetCollection = "failReindexingTarget";
+    final String aliasTarget = "failAlias";
+    createCollection(sourceCollection, "conf1", 2, 2);
+    createCollection(targetCollection, "conf1", 1, 1);
+    CollectionAdminRequest.createAlias(aliasTarget, targetCollection).process(solrClient);
+    indexDocs(sourceCollection, NUM_DOCS,
+        i -> new SolrInputDocument(
+            "id", String.valueOf(i),
+            "string_s", String.valueOf(i)));
+
+    CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
+        .setTarget(targetCollection);
+    try {
+      req.process(solrClient);
+      fail("succeeded but expected reindexing to fail due to the target collection already present");
+    } catch (Exception e) {
+      assertTrue(e instanceof BaseHttpSolrClient.RemoteSolrException);
+      BaseHttpSolrClient.RemoteSolrException rse = (BaseHttpSolrClient.RemoteSolrException)e;
+      assertEquals(SolrException.ErrorCode.SERVER_ERROR.code, rse.code());
+    }
+    req = CollectionAdminRequest.reindexCollection(sourceCollection)
+        .setTarget(aliasTarget);
+    try {
+      req.process(solrClient);
+      fail("succeeded but expected reindexing to fail due to the target collection already present");
+    } catch (Exception e) {
+      assertTrue(e instanceof BaseHttpSolrClient.RemoteSolrException);
+      BaseHttpSolrClient.RemoteSolrException rse = (BaseHttpSolrClient.RemoteSolrException)e;
+      assertEquals(SolrException.ErrorCode.SERVER_ERROR.code, rse.code());
+    }
+
+    CollectionAdminRequest.deleteAlias(aliasTarget).process(solrClient);
+    CollectionAdminRequest.deleteCollection(targetCollection).process(solrClient);
+
+    req = CollectionAdminRequest.reindexCollection(sourceCollection)
+        .setTarget(targetCollection);
+
+    TestInjection.reindexFailure = "true:100";
+    try {
+      req.process(solrClient);
+      fail("succeeded but expected reindexing to fail due to a test-injected failure");
+    } catch (Exception e) {
+      assertTrue(e instanceof BaseHttpSolrClient.RemoteSolrException);
+      BaseHttpSolrClient.RemoteSolrException rse = (BaseHttpSolrClient.RemoteSolrException)e;
+      assertEquals(SolrException.ErrorCode.SERVER_ERROR.code, rse.code());
+    }
+    // verify that the target and checkpoint collections don't exist
+    cloudManager.getClusterStateProvider().getClusterState().forEachCollection(coll -> {
+      assertFalse(coll.getName() + " still exists", coll.getName().startsWith(ReindexCollectionCmd.TARGET_COL_PREFIX));
+      assertFalse(coll.getName() + " still exists", coll.getName().startsWith(ReindexCollectionCmd.CHK_COL_PREFIX));
+    });
+    // verify that the source collection is read-write and has no reindexing flags
+    CloudTestUtils.waitForState(cloudManager, "collection state is incorrect", sourceCollection,
+        ((liveNodes, collectionState) ->
+            !collectionState.isReadOnly() &&
+            collectionState.getStr(ReindexCollectionCmd.REINDEXING_PROP) == null));
+  }
+
+  @Test
+  public void testAbort() throws Exception {
+    final String sourceCollection = "abortReindexing";
+    final String targetCollection = "abortReindexingTarget";
+    createCollection(sourceCollection, "conf1", 2, 1);
+
+    TestInjection.reindexLatch = new CountDownLatch(1);
+    CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
+        .setTarget(targetCollection);
+    String asyncId = req.processAsync(solrClient);
+    // wait for the source collection to be put in readOnly mode
+    CloudTestUtils.waitForState(cloudManager, "source collection didn't become readOnly",
+        sourceCollection, (liveNodes, coll) -> coll.isReadOnly());
+
+    req = CollectionAdminRequest.reindexCollection(sourceCollection);
+    req.setAbort(true);
+    req.process(solrClient);
+    CloudTestUtils.waitForState(cloudManager, "incorrect collection state", sourceCollection,
+        ((liveNodes, collectionState) ->
+            collectionState.isReadOnly() &&
+            ReindexCollectionCmd.State.ABORTED.toLower().equals(collectionState.getStr(ReindexCollectionCmd.REINDEXING_PROP))));
+    // let the process continue
+    TestInjection.reindexLatch.countDown();
+    CloudTestUtils.waitForState(cloudManager, "source collection is in wrong state",
+        sourceCollection, (liveNodes, docCollection) -> {
+          System.err.println("-- coll " + docCollection);
+          return !docCollection.isReadOnly() && docCollection.getStr(ReindexCollectionCmd.REINDEXING_PROP) == null;
+        });
+    // verify the response
+    CollectionAdminRequest.RequestStatusResponse rsp = CollectionAdminRequest.requestStatus(asyncId).process(solrClient);
+    rsp.getRequestStatus();
+  }
+
   private void createCollection(String name, String config, int numShards, int numReplicas) throws Exception {
     CollectionAdminRequest.createCollection(name, config, numShards, numReplicas)
         .setMaxShardsPerNode(-1)
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index ddfe90e..d0ae477 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -793,40 +793,60 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
   public static class ReindexCollection extends AsyncCollectionSpecificAdminRequest {
     String target;
     String query;
+    String fields;
     String configName;
-    Boolean keepSource;
+    Boolean removeSource;
+    Boolean abort;
     Integer batchSize;
     Map<String, Object> collectionParams = new HashMap<>();
 
     private ReindexCollection(String collection) {
-      super(CollectionAction.REINDEX_COLLECTION, collection);
+      super(CollectionAction.REINDEXCOLLECTION, collection);
     }
 
+    /** Target collection name (null if the same). */
     public ReindexCollection setTarget(String target) {
       this.target = target;
       return this;
     }
 
+    /** Set to true to abort already running requests. */
+    public ReindexCollection setAbort(boolean abort) {
+      this.abort = abort;
+      return this;
+    }
+
+    /** Query matching the documents to reindex (default is '*:*'). */
     public ReindexCollection setQuery(String query) {
       this.query = query;
       return this;
     }
 
-    public ReindexCollection setKeepSource(boolean keepSource) {
-      this.keepSource = keepSource;
+    /** Fields to reindex (the same syntax as {@link CommonParams#FL}), default is '*'. */
+    public ReindexCollection setFields(String fields) {
+      this.fields = fields;
+      return this;
+    }
+
+    /** Remove source collection after success. Default is false. */
+    public ReindexCollection setRemoveSource(boolean removeSource) {
+      this.removeSource = removeSource;
       return this;
     }
 
+    /** Copy documents in batches of this size. Default is 100. */
     public ReindexCollection setBatchSize(int batchSize) {
       this.batchSize = batchSize;
       return this;
     }
 
+    /** Config name for the target collection. Default is the same as source. */
     public ReindexCollection setConfigName(String configName) {
       this.configName = configName;
       return this;
     }
 
+    /** Set other supported collection CREATE parameters. */
     public ReindexCollection setCollectionParam(String key, Object value) {
       this.collectionParams.put(key, value);
       return this;
@@ -836,9 +856,11 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     public SolrParams getParams() {
       ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
       params.setNonNull("target", target);
+      params.setNonNull("abort", abort);
       params.setNonNull(ZkStateReader.CONFIGNAME_PROP, configName);
       params.setNonNull(CommonParams.Q, query);
-      params.setNonNull("keepSource", keepSource);
+      params.setNonNull(CommonParams.FL, fields);
+      params.setNonNull("removeSource", removeSource);
       params.setNonNull(CommonParams.ROWS, batchSize);
       collectionParams.forEach((k, v) -> params.setNonNull(k, v));
       return params;
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
index df97c35..1485e34 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
@@ -123,7 +123,8 @@ public interface CollectionParams {
     // TODO: not implemented yet
     MERGESHARDS(true, LockLevel.SHARD),
     COLSTATUS(true, LockLevel.NONE),
-    REINDEX_COLLECTION(true, LockLevel.COLLECTION)
+    // this command implements its own locking
+    REINDEXCOLLECTION(true, LockLevel.NONE)
     ;
     public final boolean isWrite;