You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2019/02/27 18:26:28 UTC

[lucene-solr] branch jira/solr-11127-2 created (now 16a7ea8)

This is an automated email from the ASF dual-hosted git repository.

ab pushed a change to branch jira/solr-11127-2
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git.


      at 16a7ea8  SOLR-11127: Complete implementation + test.

This branch includes the following new commits:

     new 35e3683  SOLR-13271: Implement read-only collections.
     new efb8e61  SOLR-11127: Initial version (depends on SOLR-13271)
     new 16a7ea8  SOLR-11127: Complete implementation + test.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[lucene-solr] 01/03: SOLR-13271: Implement read-only collections.

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch jira/solr-11127-2
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 35e36838ccfb65f251008f2a70acb830d00c5585
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Tue Feb 26 14:16:17 2019 +0100

    SOLR-13271: Implement read-only collections.
---
 .../OverseerCollectionMessageHandler.java          |   6 +
 .../solr/cloud/overseer/CollectionMutator.java     |  11 ++
 .../solr/handler/admin/CollectionsHandler.java     |   3 +-
 .../processor/DistributedUpdateProcessor.java      |  49 ++++++-
 .../apache/solr/cloud/CollectionsAPISolrJTest.java | 143 +++++++++++++++++++++
 .../solrj/request/CollectionAdminRequest.java      |   2 +-
 .../apache/solr/common/cloud/ZkStateReader.java    |   1 +
 .../apache/solr/common/params/UpdateParams.java    |   3 +
 8 files changed, 212 insertions(+), 6 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index db117a3..03ec81c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -94,6 +94,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.UpdateResponse;
 import org.apache.solr.cloud.LockTree;
@@ -702,6 +703,11 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
 
     if (!areChangesVisible)
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not modify collection " + message);
+
+    // if switching TO read-only mode reload the collection
+    if (message.getBool(CollectionAdminRequest.PROPERTY_PREFIX + ZkStateReader.READ_ONLY_PROP, false)) {
+      reloadCollection(null, new ZkNodeProps(NAME, collectionName), results);
+    }
   }
 
   void cleanupCollection(String collectionName, NamedList results) throws Exception {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
index 88e18e2..bb549f4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
@@ -119,6 +119,17 @@ public class CollectionMutator {
         }
       }
     }
+    // other aux properties are also modifiable
+    for (String prop : message.keySet()) {
+      if (prop.startsWith(CollectionAdminRequest.PROPERTY_PREFIX)) {
+        hasAnyOps = true;
+        if (message.get(prop) == null) {
+          m.remove(prop);
+        } else {
+          m.put(prop, message.get(prop));
+        }
+      }
+    }
 
     if (!hasAnyOps) {
       return ZkStateWriter.NO_OP;
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 725d2bd..3e026af 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -935,6 +935,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
     }),
     MODIFYCOLLECTION_OP(MODIFYCOLLECTION, (req, rsp, h) -> {
       Map<String, Object> m = copy(req.getParams(), null, CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES);
+      copyPropertiesWithPrefix(req.getParams(), m, COLL_PROP_PREFIX);
       if (m.isEmpty())  {
         throw new SolrException(ErrorCode.BAD_REQUEST,
             formatString("no supported values provided {0}", CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES.toString()));
@@ -942,7 +943,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
       copy(req.getParams().required(), m, COLLECTION_PROP);
       addMapObject(m, RULE);
       addMapObject(m, SNITCH);
-      for (String prop : CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES) {
+      for (String prop : m.keySet()) {
         if ("".equals(m.get(prop)))  {
           // set to an empty string is equivalent to removing the property, see SOLR-12507
           m.put(prop, null);
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index 2a76ab9..393e48c 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -40,6 +40,7 @@ import org.apache.lucene.util.CharsRefBuilder;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrRequest.METHOD;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.GenericSolrRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.SimpleSolrResponse;
@@ -80,6 +81,8 @@ import org.apache.solr.schema.SchemaField;
 import org.apache.solr.update.AddUpdateCommand;
 import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.update.DeleteUpdateCommand;
+import org.apache.solr.update.MergeIndexesCommand;
+import org.apache.solr.update.RollbackUpdateCommand;
 import org.apache.solr.update.SolrCmdDistributor;
 import org.apache.solr.update.SolrCmdDistributor.Error;
 import org.apache.solr.update.SolrCmdDistributor.Node;
@@ -182,6 +185,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
   private Set<String> skippedCoreNodeNames;
   private boolean isIndexChanged = false;
 
+  private boolean readOnly = false;
+
   /**
    * Number of times requests forwarded to some other shard's leader can be retried
    */
@@ -232,7 +237,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     // this should always be used - see filterParams
     DistributedUpdateProcessorFactory.addParamToDistributedRequestWhitelist
       (this.req, UpdateParams.UPDATE_CHAIN, TEST_DISTRIB_SKIP_SERVERS, CommonParams.VERSION_FIELD,
-          UpdateParams.EXPUNGE_DELETES, UpdateParams.OPTIMIZE, UpdateParams.MAX_OPTIMIZE_SEGMENTS);
+          UpdateParams.EXPUNGE_DELETES, UpdateParams.OPTIMIZE, UpdateParams.MAX_OPTIMIZE_SEGMENTS,
+          UpdateParams.READ_ONLY_IGNORE);
 
     CoreContainer cc = req.getCore().getCoreContainer();
 
@@ -247,6 +253,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     if (cloudDesc != null) {
       collection = cloudDesc.getCollectionName();
       replicaType = cloudDesc.getReplicaType();
+      DocCollection coll = zkController.getClusterState().getCollectionOrNull(collection);
+      if (coll != null) {
+        // check readOnly property in coll state, unless overridden by params
+        if (!req.getParams().getBool(UpdateParams.READ_ONLY_IGNORE, false)) {
+          readOnly = coll.getBool(CollectionAdminRequest.PROPERTY_PREFIX + ZkStateReader.READ_ONLY_PROP, false);
+        }
+      }
     } else {
       collection = null;
       replicaType = Replica.Type.NRT;
@@ -669,6 +682,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
     assert TestInjection.injectFailUpdateRequests();
 
+    if (readOnly) {
+      throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
+    }
+
     updateCommand = cmd;
 
     if (zkEnabled) {
@@ -1416,7 +1433,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
   public void processDelete(DeleteUpdateCommand cmd) throws IOException {
     
     assert TestInjection.injectFailUpdateRequests();
-    
+
+    if (readOnly) {
+      throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
+    }
+
     updateCommand = cmd;
 
     if (!cmd.isDeleteById()) {
@@ -1925,7 +1946,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
   public void processCommit(CommitUpdateCommand cmd) throws IOException {
     
     assert TestInjection.injectFailUpdateRequests();
-    
+
+    if (readOnly) {
+      throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
+    }
+
     updateCommand = cmd;
     List<Node> nodes = null;
     Replica leaderReplica = null;
@@ -2035,7 +2060,23 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
       }
     }
   }
-  
+
+  @Override
+  public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException {
+    if (readOnly) {
+      throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
+    }
+    super.processMergeIndexes(cmd);
+  }
+
+  @Override
+  public void processRollback(RollbackUpdateCommand cmd) throws IOException {
+    if (readOnly) {
+      throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
+    }
+    super.processRollback(cmd);
+  }
+
   @Override
   public void finish() throws IOException {
     assert ! finished : "lifecycle sanity check";
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
index b685aa8..52ba89e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
@@ -25,10 +25,12 @@ import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION;
 import static org.apache.solr.common.params.CollectionAdminParams.DEFAULTS;
 
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -38,22 +40,29 @@ import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.TestUtil;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.CoreAdminRequest;
 import org.apache.solr.client.solrj.request.CoreStatus;
+import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.request.V2Request;
 import org.apache.solr.client.solrj.response.CollectionAdminResponse;
 import org.apache.solr.client.solrj.response.CoreAdminResponse;
+import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.client.solrj.response.V2Response;
+import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.ClusterProperties;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.UpdateParams;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.RetryUtil;
 import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.util.TimeOut;
@@ -63,9 +72,12 @@ import org.junit.Before;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @LuceneTestCase.Slow
 public class CollectionsAPISolrJTest extends SolrCloudTestCase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   @Before
   public void beforeTest() throws Exception {
@@ -590,6 +602,137 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
     fail("Timed out waiting for cluster property value");
   }
 
+  private static final int NUM_DOCS = 10;
+
+  @Test
+  public void testReadOnlyCollection() throws Exception {
+    final String collectionName = "readOnlyTest";
+    CloudSolrClient solrClient = cluster.getSolrClient();
+
+    CollectionAdminRequest.createCollection(collectionName, "conf", 2, 2)
+        .process(solrClient);
+
+    solrClient.setDefaultCollection(collectionName);
+
+    cluster.waitForActiveCollection(collectionName, 2, 4);
+
+    // verify that indexing works
+    List<SolrInputDocument> docs = new ArrayList<>();
+    for (int i = 0; i < NUM_DOCS; i++) {
+      docs.add(new SolrInputDocument("id", String.valueOf(i), "string_s", String.valueOf(i)));
+    }
+    solrClient.add(docs);
+    solrClient.commit();
+    // verify the docs exist
+    QueryResponse rsp = solrClient.query(params(CommonParams.Q, "*:*"));
+    assertEquals("initial num docs", NUM_DOCS, rsp.getResults().getNumFound());
+
+    // index more but don't commit
+    docs.clear();
+    for (int i = NUM_DOCS; i < NUM_DOCS * 2; i++) {
+      docs.add(new SolrInputDocument("id", String.valueOf(i), "string_s", String.valueOf(i)));
+    }
+    solrClient.add(docs);
+
+    Replica leader
+        = solrClient.getZkStateReader().getLeaderRetry(collectionName, "shard1", DEFAULT_TIMEOUT);
+
+    long coreStartTime = getCoreStatus(leader).getCoreStartTime().getTime();
+
+    // Check for value change
+    CollectionAdminRequest.modifyCollection(collectionName,
+        Collections.singletonMap(CollectionAdminRequest.PROPERTY_PREFIX + ZkStateReader.READ_ONLY_PROP, "true"))
+        .process(solrClient);
+
+    DocCollection coll = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
+    assertNotNull(coll.toString(), coll.getProperties().get(CollectionAdminRequest.PROPERTY_PREFIX + ZkStateReader.READ_ONLY_PROP));
+    assertEquals(coll.toString(), coll.getProperties().get(CollectionAdminRequest.PROPERTY_PREFIX + ZkStateReader.READ_ONLY_PROP).toString(), "true");
+
+    // wait for the expected collection reload
+    RetryUtil.retryUntil("Timed out waiting for core to reload", 30, 1000, TimeUnit.MILLISECONDS, () -> {
+      long restartTime = 0;
+      try {
+        restartTime = getCoreStatus(leader).getCoreStartTime().getTime();
+      } catch (Exception e) {
+        log.warn("Exception getting core start time: {}", e.getMessage());
+        return false;
+      }
+      return restartTime > coreStartTime;
+    });
+
+    // check for docs - reloading should have committed the new docs
+    // this also verifies that searching works in read-only mode
+    rsp = solrClient.query(params(CommonParams.Q, "*:*"));
+    assertEquals("num docs after turning on read-only", NUM_DOCS * 2, rsp.getResults().getNumFound());
+
+    // try sending updates
+    try {
+      solrClient.add(new SolrInputDocument("id", "shouldFail"));
+      fail("add() should fail in read-only mode");
+    } catch (Exception e) {
+      // expected - ignore
+    }
+    try {
+      solrClient.deleteById("shouldFail");
+      fail("deleteById() should fail in read-only mode");
+    } catch (Exception e) {
+      // expected - ignore
+    }
+    try {
+      solrClient.deleteByQuery("id:shouldFail");
+      fail("deleteByQuery() should fail in read-only mode");
+    } catch (Exception e) {
+      // expected - ignore
+    }
+    try {
+      solrClient.commit();
+      fail("commit() should fail in read-only mode");
+    } catch (Exception e) {
+      // expected - ignore
+    }
+    try {
+      solrClient.optimize();
+      fail("optimize() should fail in read-only mode");
+    } catch (Exception e) {
+      // expected - ignore
+    }
+    try {
+      solrClient.rollback();
+      fail("rollback() should fail in read-only mode");
+    } catch (Exception e) {
+      // expected - ignore
+    }
+
+    // check that the override works
+    log.info("=== readOnlyIgnore override test");
+    UpdateRequest ureq = new UpdateRequest();
+    ureq.add(new SolrInputDocument("id", "shouldWork"));
+    ureq.setParam(UpdateParams.READ_ONLY_IGNORE, "true");
+    NamedList<Object> res = solrClient.request(ureq, collectionName);
+    ureq = new UpdateRequest();
+    ureq.deleteById("shouldWork");
+    ureq.setParam(UpdateParams.READ_ONLY_IGNORE, "true");
+    res = solrClient.request(ureq, collectionName);
+
+    // Check for removing value
+    // setting to empty string is equivalent to removing the property, see SOLR-12507
+    CollectionAdminRequest.modifyCollection(collectionName,
+        Collections.singletonMap(CollectionAdminRequest.PROPERTY_PREFIX + ZkStateReader.READ_ONLY_PROP, ""))
+        .process(cluster.getSolrClient());
+    coll = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(collectionName);
+    assertNull(coll.toString(), coll.getProperties().get(CollectionAdminRequest.PROPERTY_PREFIX + ZkStateReader.READ_ONLY_PROP));
+
+    // check that updates are working now
+    docs.clear();
+    for (int i = NUM_DOCS * 2; i < NUM_DOCS * 3; i++) {
+      docs.add(new SolrInputDocument("id", String.valueOf(i), "string_s", String.valueOf(i)));
+    }
+    solrClient.add(docs);
+    solrClient.commit();
+    rsp = solrClient.query(params(CommonParams.Q, "*:*"));
+    assertEquals("num docs after turning off read-only", NUM_DOCS * 3, rsp.getResults().getNumFound());
+  }
+
 
   @Test
   public void testOverseerStatus() throws IOException, SolrServerException {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 6f7af617..ea42590 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -91,7 +91,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
 
   protected final CollectionAction action;
 
-  private static String PROPERTY_PREFIX = "property.";
+  public static String PROPERTY_PREFIX = "property.";
 
   public CollectionAdminRequest(CollectionAction action) {
     this("/admin/collections", action);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 7a3f7d2..2069457 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -103,6 +103,7 @@ public class ZkStateReader implements Closeable {
   public static final String MAX_AT_ONCE_PROP = "maxAtOnce";
   public static final String MAX_WAIT_SECONDS_PROP = "maxWaitSeconds";
   public static final String STATE_TIMESTAMP_PROP = "stateTimestamp";
+  public static final String READ_ONLY_PROP = "readOnly";
   public static final String COLLECTIONS_ZKNODE = "/collections";
   public static final String LIVE_NODES_ZKNODE = "/live_nodes";
   public static final String ALIASES = "/aliases.json";
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/UpdateParams.java b/solr/solrj/src/java/org/apache/solr/common/params/UpdateParams.java
index c4633bd..867924e 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/UpdateParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/UpdateParams.java
@@ -68,4 +68,7 @@ public interface UpdateParams
 
   /** Return versions of updates? */
   public static final String VERSIONS = "versions";
+
+  /** Ignore the "readOnly" collection status. */
+  public static final String READ_ONLY_IGNORE = "readOnlyIgnore";
 }


[lucene-solr] 02/03: SOLR-11127: Initial version (depends on SOLR-13271)

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch jira/solr-11127-2
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit efb8e61bcba7f0da4d77443365527fb18c9387b0
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Tue Feb 26 16:59:45 2019 +0100

    SOLR-11127: Initial version (depends on SOLR-13271)
---
 .../cloud/api/collections/DeleteCollectionCmd.java |   2 +-
 .../OverseerCollectionMessageHandler.java          |  35 +--
 .../api/collections/ReindexCollectionCmd.java      | 309 +++++++++++++++++++++
 .../solr/handler/admin/CollectionsHandler.java     |  10 +
 .../solrj/request/CollectionAdminRequest.java      |  14 +
 .../solr/common/params/CollectionParams.java       |   3 +-
 6 files changed, 338 insertions(+), 35 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index e5f6f2d..7177f03 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -181,7 +181,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
     }
   }
 
-  private String referencedByAlias(String collection, Aliases aliases) {
+  public static String referencedByAlias(String collection, Aliases aliases) {
     Objects.requireNonNull(aliases);
     return aliases.getCollectionAliasListMap().entrySet().stream()
         .filter(e -> e.getValue().contains(collection))
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 03ec81c..4a458a2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -31,39 +31,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
 import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION;
 import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
 import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ALIASPROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.BACKUP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATEALIAS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESNAPSHOT;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEALIAS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETENODE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESNAPSHOT;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MAINTAINROUTEDALIAS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATESTATEFORMAT;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_COLL_TASK;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_REPLICA_TASK;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_SHARD_TASK;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.RELOAD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.REPLACENODE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.RESTORE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.UTILIZENODE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
 import static org.apache.solr.common.params.CommonParams.NAME;
 import static org.apache.solr.common.util.Utils.makeMap;
@@ -274,6 +242,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
         .put(DELETEREPLICA, new DeleteReplicaCmd(this))
         .put(ADDREPLICA, new AddReplicaCmd(this))
         .put(MOVEREPLICA, new MoveReplicaCmd(this))
+        .put(REINDEX_COLLECTION, new ReindexCollectionCmd(this))
         .put(UTILIZENODE, new UtilizeNodeCmd(this))
         .build()
     ;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
new file mode 100644
index 0000000..fa65ecb
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionAdminParams;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.TimeOut;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Reindex a collection, usually in order to change the index schema.
+ * <p>WARNING: Reindexing is a potentially lossy operation - some indexed data that is not available as
+ * stored fields may be irretrievably lost, so users should use this command with caution, evaluating
+ * the potential impact by using different source and target collection names first, and preserving
+ * the source collection until the evaluation is complete.</p>
+ * <p>Reindexing follows these steps:</p>
+ * <ol>
+ *    <li>create a temporary collection using the most recent schema of the source collection
+ *    (or the one specified in the parameters, which must already exist).</li>
+ *    <li>copy the source documents to the temporary collection, reconstructing them from their stored
+ *    fields and reindexing them using the specified schema. NOTE: some data
+ *    loss may occur if the original stored field data is not available!</li>
+ *    <li>if the target collection name is not specified
+ *    then the same name as the source is assumed and at this step the source collection is permanently removed.</li>
+ *    <li>create the target collection from scratch with the specified name (or the same as source if not
+ *    specified), but using the new specified schema. NOTE: if the target name was not specified or is the same
+ *    as the source collection then the original collection has been deleted in the previous step and it's
+ *    not possible to roll-back the changes if the process is interrupted. The (possibly incomplete) data
+ *    is still available in the temporary collection.</li>
+ *    <li>copy the documents from the temporary collection to the target collection, using the specified schema.</li>
+ *    <li>delete temporary collection(s) and optionally delete the source collection if it still exists.</li>
+ * </ol>
+ */
+public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public static final String ABORT = "abort";
+  public static final String KEEP_SOURCE = "keepSource";
+  public static final String TARGET = "target";
+  public static final String TMP_COL_PREFIX = ".reindex_";
+  public static final String CHK_COL_PREFIX = ".reindex_ck_";
+  public static final String REINDEX_PROP = CollectionAdminRequest.PROPERTY_PREFIX + "reindex";
+  public static final String REINDEX_PHASE_PROP = CollectionAdminRequest.PROPERTY_PREFIX + "reindex_phase";
+  public static final String READONLY_PROP = CollectionAdminRequest.PROPERTY_PREFIX + ZkStateReader.READ_ONLY_PROP;
+
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public enum State {
+    IDLE,
+    RUNNING,
+    ABORTED,
+    FINISHED;
+
+    public String toLower() {
+      return toString().toLowerCase(Locale.ROOT);
+    }
+
+    public static State get(String p) {
+      if (p == null) {
+        return null;
+      }
+      p = p.toLowerCase(Locale.ROOT);
+      if (p.startsWith(CollectionAdminRequest.PROPERTY_PREFIX)) {
+        p = p.substring(CollectionAdminRequest.PROPERTY_PREFIX.length());
+      }
+      return states.get(p);
+    }
+    static Map<String, State> states = Collections.unmodifiableMap(
+        Stream.of(State.values()).collect(Collectors.toMap(State::toLower, Function.identity())));
+  }
+
+  public ReindexCollectionCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+
+    log.info("*** called: {}", message);
+
+    String collection = message.getStr(CommonParams.NAME);
+    if (collection == null || clusterState.getCollectionOrNull(collection) == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection name must be specified and must exist");
+    }
+    String target = message.getStr(TARGET);
+    if (target == null) {
+      target = collection;
+    }
+    boolean keepSource = message.getBool(KEEP_SOURCE, false);
+    if (keepSource && target.equals(collection)) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Can't specify keepSource=true when target is the same as source");
+    }
+    boolean abort = message.getBool(ABORT, false);
+    DocCollection coll = clusterState.getCollection(collection);
+    if (abort) {
+      ZkNodeProps props = new ZkNodeProps(
+          Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
+          ZkStateReader.COLLECTION_PROP, collection,
+          REINDEX_PROP, State.ABORTED.toLower());
+      ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+      results.add(State.ABORTED.toLower(), collection);
+      // if needed the cleanup will be performed by the running instance of the command
+      return;
+    }
+    // check it's not already running
+    State state = State.get(coll.getStr(REINDEX_PROP, State.IDLE.toLower()));
+    if (state == State.RUNNING) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Reindex is already running for collection " + collection);
+    }
+    // set the running flag
+    ZkNodeProps props = new ZkNodeProps(
+        Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
+        ZkStateReader.COLLECTION_PROP, collection,
+        REINDEX_PROP, State.RUNNING.toLower());
+    ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+
+    boolean aborted = false;
+    Integer rf = coll.getReplicationFactor();
+    Integer numNrt = coll.getNumNrtReplicas();
+    Integer numTlog = coll.getNumTlogReplicas();
+    Integer numPull = coll.getNumPullReplicas();
+    int numShards = coll.getActiveSlices().size();
+
+    String configName = message.getStr(ZkStateReader.CONFIGNAME_PROP, ocmh.zkStateReader.readConfigName(collection));
+    String tmpCollection = TMP_COL_PREFIX + collection;
+    String chkCollection = CHK_COL_PREFIX + collection;
+
+    try {
+      // 0. set up temp and checkpoint collections - delete first if necessary
+      NamedList<Object> cmdResults = new NamedList<>();
+      ZkNodeProps cmd;
+      if (clusterState.getCollectionOrNull(tmpCollection) != null) {
+        // delete the tmp collection
+        cmd = new ZkNodeProps(
+            CommonParams.NAME, tmpCollection,
+            CoreAdminParams.DELETE_METRICS_HISTORY, "true"
+        );
+        ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
+        // nocommit error checking
+      }
+      if (clusterState.getCollectionOrNull(chkCollection) != null) {
+        // delete the checkpoint collection
+        cmd = new ZkNodeProps(
+            CommonParams.NAME, chkCollection,
+            CoreAdminParams.DELETE_METRICS_HISTORY, "true"
+        );
+        ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
+        // nocommit error checking
+      }
+
+      if (maybeAbort(collection)) {
+        aborted = true;
+        return;
+      }
+
+      // create the tmp collection - use RF=1
+      cmd = new ZkNodeProps(
+          CommonParams.NAME, tmpCollection,
+          ZkStateReader.NUM_SHARDS_PROP, String.valueOf(numShards),
+          ZkStateReader.REPLICATION_FACTOR, "1",
+          CollectionAdminParams.COLL_CONF, configName,
+          CommonAdminParams.WAIT_FOR_FINAL_STATE, "true"
+      );
+      ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, cmd, cmdResults);
+      // nocommit error checking
+
+      // create the checkpoint collection - use RF=1 and 1 shard
+      cmd = new ZkNodeProps(
+          CommonParams.NAME, chkCollection,
+          ZkStateReader.NUM_SHARDS_PROP, "1",
+          ZkStateReader.REPLICATION_FACTOR, "1",
+          CollectionAdminParams.COLL_CONF, configName,
+          CommonAdminParams.WAIT_FOR_FINAL_STATE, "true"
+      );
+      ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, cmd, cmdResults);
+      // nocommit error checking
+      // wait for a while until we see both collections
+      TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, ocmh.timeSource);
+      boolean created = false;
+      while (!waitUntil.hasTimedOut()) {
+        waitUntil.sleep(100);
+        // this also refreshes our local var clusterState
+        clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
+        created = clusterState.hasCollection(tmpCollection) && clusterState.hasCollection(chkCollection);
+        if (created) break;
+      }
+      if (!created) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create temporary collection(s)");
+      }
+      if (maybeAbort(collection)) {
+        aborted = true;
+        return;
+      }
+
+      // 1. put the collection in read-only mode
+      cmd = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
+          ZkStateReader.COLLECTION_PROP, collection,
+          READONLY_PROP, "true");
+      ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+
+      // 2. copy the documents to tmp
+      // Recipe taken from: http://joelsolr.blogspot.com/2016/10/solr-63-batch-jobs-parallel-etl-and.html
+      ModifiableSolrParams q = new ModifiableSolrParams();
+      q.set(CommonParams.QT, "/stream");
+      q.set("expr",
+          "daemon(id=\"" + tmpCollection + "\"," +
+              "terminate=\"true\"," +
+              "commit(" + tmpCollection + "," +
+              "update(" + tmpCollection + "," +
+              "batchSize=100," +
+              "topic(" + chkCollection + "," +
+              collection + "," +
+              "q=\"*:*\"," +
+              "fl=\"*\"," +
+              "id=\"topic_" + tmpCollection + "\"," +
+              // some of the documents eg. in .system contain large blobs
+              "rows=\"100\"," +
+              "initialCheckpoint=\"0\"))))");
+      SolrResponse rsp = ocmh.cloudManager.request(new QueryRequest(q));
+
+      // wait for the daemon to finish
+
+      // 5. set up an alias to use the tmp collection as the target name
+
+      // 6. optionally delete the source collection
+
+      // 7. delete the checkpoint collection
+
+      // nocommit error checking
+    } catch (Exception e) {
+      aborted = true;
+    } finally {
+      if (aborted) {
+        // nocommit - cleanup
+
+        // 1. kill the daemons
+        // 2. cleanup tmp / chk collections IFF the source collection still exists and is not empty
+        // 3. cleanup collection state
+        results.add(State.ABORTED.toLower(), collection);
+      }
+    }
+  }
+
+  private boolean maybeAbort(String collection) throws Exception {
+    DocCollection coll = ocmh.cloudManager.getClusterStateProvider().getClusterState().getCollectionOrNull(collection);
+    if (coll == null) {
+      // collection no longer present - abort
+      return true;
+    }
+    State state = State.get(coll.getStr(REINDEX_PROP, State.RUNNING.toLower()));
+    if (state != State.ABORTED) {
+      return false;
+    }
+    return true;
+  }
+
+  private String getDaemonUrl(SolrResponse rsp) {
+    return null;
+  }
+
+  private void cleanup(String collection, String daemonUrl) throws Exception {
+
+    if (daemonUrl != null) {
+      // kill the daemon
+    }
+    ClusterState clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
+
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 3e026af..7b7b1e5 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -52,6 +52,7 @@ import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
 import org.apache.solr.cloud.ZkController.NotInClusterStateException;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.ZkShardTerms;
+import org.apache.solr.cloud.api.collections.ReindexCollectionCmd;
 import org.apache.solr.cloud.overseer.SliceMutator;
 import org.apache.solr.cloud.rule.ReplicaAssigner;
 import org.apache.solr.cloud.rule.Rule;
@@ -524,6 +525,15 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
 
     RELOAD_OP(RELOAD, (req, rsp, h) -> copy(req.getParams().required(), null, NAME)),
 
+    REINDEX_COLLECTION_OP(REINDEX_COLLECTION, (req, rsp, h) -> {
+      Map<String, Object> m = copy(req.getParams().required(), null, NAME);
+      copy(req.getParams(), m,
+          ReindexCollectionCmd.ABORT,
+          ReindexCollectionCmd.KEEP_SOURCE,
+          ReindexCollectionCmd.TARGET);
+      return m;
+    }),
+
     SYNCSHARD_OP(SYNCSHARD, (req, rsp, h) -> {
       String collection = req.getParams().required().get("collection");
       String shard = req.getParams().required().get("shard");
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index ea42590..a326abe 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -782,6 +782,20 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
   }
 
   /**
+   * Returns a SolrRequest to reindex a collection
+   */
+  public static ReindexCollection reindexCollection(String collection) {
+    return new ReindexCollection(collection);
+  }
+
+  public static class ReindexCollection extends AsyncCollectionSpecificAdminRequest {
+
+    private ReindexCollection(String collection) {
+      super(CollectionAction.REINDEX_COLLECTION, collection);
+    }
+  }
+
+  /**
    * Returns a SolrRequest to delete a collection
    */
   public static Delete deleteCollection(String collection) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
index dee2f5f..6176263 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
@@ -121,7 +121,8 @@ public interface CollectionParams {
     MOCK_REPLICA_TASK(false, LockLevel.REPLICA),
     NONE(false, LockLevel.NONE),
     // TODO: not implemented yet
-    MERGESHARDS(true, LockLevel.SHARD)
+    MERGESHARDS(true, LockLevel.SHARD),
+    REINDEX_COLLECTION(true, LockLevel.COLLECTION)
     ;
     public final boolean isWrite;
 


[lucene-solr] 03/03: SOLR-11127: Complete implementation + test.

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch jira/solr-11127-2
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 16a7ea836c17480193d4c1965a938b45ef3df413
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Wed Feb 27 19:26:02 2019 +0100

    SOLR-11127: Complete implementation + test.
---
 .../api/collections/ReindexCollectionCmd.java      | 345 +++++++++++++++++----
 .../solr/handler/admin/CollectionsHandler.java     |  17 +-
 .../apache/solr/cloud/ReindexCollectionTest.java   | 101 ++++++
 .../solrj/request/CollectionAdminRequest.java      |  46 +++
 .../solr/common/cloud/CompositeIdRouter.java       |   5 +
 .../org/apache/solr/common/cloud/DocRouter.java    |   1 +
 .../solr/common/cloud/ImplicitDocRouter.java       |   5 +
 .../apache/solr/common/cloud/PlainIdRouter.java    |   5 +
 8 files changed, 457 insertions(+), 68 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
index fa65ecb..1d90f16 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
@@ -19,20 +19,27 @@ package org.apache.solr.cloud.api.collections;
 
 import java.lang.invoke.MethodHandles;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import org.apache.http.client.HttpClient;
 import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CollectionAdminParams;
@@ -77,14 +84,16 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
   public static final String ABORT = "abort";
   public static final String KEEP_SOURCE = "keepSource";
   public static final String TARGET = "target";
-  public static final String TMP_COL_PREFIX = ".reindex_";
+  public static final String TARGET_COL_PREFIX = ".reindex_";
   public static final String CHK_COL_PREFIX = ".reindex_ck_";
-  public static final String REINDEX_PROP = CollectionAdminRequest.PROPERTY_PREFIX + "reindex";
+  public static final String REINDEXING_PROP = CollectionAdminRequest.PROPERTY_PREFIX + "reindexing";
   public static final String REINDEX_PHASE_PROP = CollectionAdminRequest.PROPERTY_PREFIX + "reindex_phase";
   public static final String READONLY_PROP = CollectionAdminRequest.PROPERTY_PREFIX + ZkStateReader.READ_ONLY_PROP;
 
   private final OverseerCollectionMessageHandler ocmh;
 
+  private static AtomicInteger tmpCollectionSeq = new AtomicInteger();
+
   public enum State {
     IDLE,
     RUNNING,
@@ -126,61 +135,65 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
     if (target == null) {
       target = collection;
     }
-    boolean keepSource = message.getBool(KEEP_SOURCE, false);
-    if (keepSource && target.equals(collection)) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Can't specify keepSource=true when target is the same as source");
-    }
+    boolean sameTarget = target.equals(collection);
+    boolean keepSource = message.getBool(KEEP_SOURCE, true);
     boolean abort = message.getBool(ABORT, false);
     DocCollection coll = clusterState.getCollection(collection);
     if (abort) {
       ZkNodeProps props = new ZkNodeProps(
           Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
           ZkStateReader.COLLECTION_PROP, collection,
-          REINDEX_PROP, State.ABORTED.toLower());
+          REINDEXING_PROP, State.ABORTED.toLower());
       ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
       results.add(State.ABORTED.toLower(), collection);
       // if needed the cleanup will be performed by the running instance of the command
       return;
     }
     // check it's not already running
-    State state = State.get(coll.getStr(REINDEX_PROP, State.IDLE.toLower()));
+    State state = State.get(coll.getStr(REINDEXING_PROP, State.IDLE.toLower()));
     if (state == State.RUNNING) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Reindex is already running for collection " + collection);
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Reindex is already running for collection " + collection +
+          ". If you are sure this is not the case you can issue &abort=true to clean up this state.");
     }
     // set the running flag
     ZkNodeProps props = new ZkNodeProps(
         Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
         ZkStateReader.COLLECTION_PROP, collection,
-        REINDEX_PROP, State.RUNNING.toLower());
+        REINDEXING_PROP, State.RUNNING.toLower());
     ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
 
     boolean aborted = false;
-    Integer rf = coll.getReplicationFactor();
-    Integer numNrt = coll.getNumNrtReplicas();
-    Integer numTlog = coll.getNumTlogReplicas();
-    Integer numPull = coll.getNumPullReplicas();
-    int numShards = coll.getActiveSlices().size();
+    int batchSize = message.getInt(CommonParams.ROWS, 100);
+    String query = message.getStr(CommonParams.Q, "*:*");
+    Integer rf = message.getInt(ZkStateReader.REPLICATION_FACTOR, coll.getReplicationFactor());
+    Integer numNrt = message.getInt(ZkStateReader.NRT_REPLICAS, coll.getNumNrtReplicas());
+    Integer numTlog = message.getInt(ZkStateReader.TLOG_REPLICAS, coll.getNumTlogReplicas());
+    Integer numPull = message.getInt(ZkStateReader.PULL_REPLICAS, coll.getNumPullReplicas());
+    int numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, coll.getActiveSlices().size());
+    int maxShardsPerNode = message.getInt(ZkStateReader.MAX_SHARDS_PER_NODE, coll.getMaxShardsPerNode());
+    DocRouter router = coll.getRouter();
+    if (router == null) {
+      router = DocRouter.DEFAULT;
+    }
 
     String configName = message.getStr(ZkStateReader.CONFIGNAME_PROP, ocmh.zkStateReader.readConfigName(collection));
-    String tmpCollection = TMP_COL_PREFIX + collection;
-    String chkCollection = CHK_COL_PREFIX + collection;
+    int seq = tmpCollectionSeq.getAndIncrement();
+    String targetCollection = sameTarget ?
+        TARGET_COL_PREFIX + collection + "_" + seq : target;
+    String chkCollection = CHK_COL_PREFIX + collection + "_" + seq;
+    String daemonUrl = null;
 
     try {
-      // 0. set up temp and checkpoint collections - delete first if necessary
+      // 0. set up target and checkpoint collections
       NamedList<Object> cmdResults = new NamedList<>();
       ZkNodeProps cmd;
-      if (clusterState.getCollectionOrNull(tmpCollection) != null) {
-        // delete the tmp collection
-        cmd = new ZkNodeProps(
-            CommonParams.NAME, tmpCollection,
-            CoreAdminParams.DELETE_METRICS_HISTORY, "true"
-        );
-        ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
-        // nocommit error checking
+      if (clusterState.hasCollection(targetCollection)) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Target collection " + targetCollection + " already exists! Delete it first.");
       }
-      if (clusterState.getCollectionOrNull(chkCollection) != null) {
+      if (clusterState.hasCollection(chkCollection)) {
         // delete the checkpoint collection
         cmd = new ZkNodeProps(
+            Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
             CommonParams.NAME, chkCollection,
             CoreAdminParams.DELETE_METRICS_HISTORY, "true"
         );
@@ -193,23 +206,50 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
         return;
       }
 
-      // create the tmp collection - use RF=1
-      cmd = new ZkNodeProps(
-          CommonParams.NAME, tmpCollection,
-          ZkStateReader.NUM_SHARDS_PROP, String.valueOf(numShards),
-          ZkStateReader.REPLICATION_FACTOR, "1",
-          CollectionAdminParams.COLL_CONF, configName,
-          CommonAdminParams.WAIT_FOR_FINAL_STATE, "true"
-      );
+      Map<String, Object> propMap = new HashMap<>();
+      propMap.put(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower());
+      propMap.put(CommonParams.NAME, targetCollection);
+      propMap.put(ZkStateReader.NUM_SHARDS_PROP, numShards);
+      propMap.put(CollectionAdminParams.COLL_CONF, configName);
+      // init first from the same router
+      propMap.put("router.name", router.getName());
+      for (String key : coll.keySet()) {
+        if (key.startsWith("router.")) {
+          propMap.put(key, coll.get(key));
+        }
+      }
+      // then apply overrides if present
+      for (String key : message.keySet()) {
+        if (key.startsWith("router.")) {
+          propMap.put(key, message.getStr(key));
+        }
+      }
+      propMap.put(ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode);
+      propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, true);
+      if (rf != null) {
+        propMap.put(ZkStateReader.REPLICATION_FACTOR, rf);
+      }
+      if (numNrt != null) {
+        propMap.put(ZkStateReader.NRT_REPLICAS, numNrt);
+      }
+      if (numTlog != null) {
+        propMap.put(ZkStateReader.TLOG_REPLICAS, numTlog);
+      }
+      if (numPull != null) {
+        propMap.put(ZkStateReader.PULL_REPLICAS, numPull);
+      }
+      // create the target collection
+      cmd = new ZkNodeProps(propMap);
       ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, cmd, cmdResults);
       // nocommit error checking
 
       // create the checkpoint collection - use RF=1 and 1 shard
       cmd = new ZkNodeProps(
+          Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
           CommonParams.NAME, chkCollection,
           ZkStateReader.NUM_SHARDS_PROP, "1",
           ZkStateReader.REPLICATION_FACTOR, "1",
-          CollectionAdminParams.COLL_CONF, configName,
+          CollectionAdminParams.COLL_CONF, "_default",
           CommonAdminParams.WAIT_FOR_FINAL_STATE, "true"
       );
       ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, cmd, cmdResults);
@@ -221,7 +261,7 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
         waitUntil.sleep(100);
         // this also refreshes our local var clusterState
         clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
-        created = clusterState.hasCollection(tmpCollection) && clusterState.hasCollection(chkCollection);
+        created = clusterState.hasCollection(targetCollection) && clusterState.hasCollection(chkCollection);
         if (created) break;
       }
       if (!created) {
@@ -232,50 +272,100 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
         return;
       }
 
-      // 1. put the collection in read-only mode
-      cmd = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
+      // 1. put the source collection in read-only mode
+      cmd = new ZkNodeProps(
+          Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
           ZkStateReader.COLLECTION_PROP, collection,
           READONLY_PROP, "true");
       ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
 
-      // 2. copy the documents to tmp
+      // 2. copy the documents to target
       // Recipe taken from: http://joelsolr.blogspot.com/2016/10/solr-63-batch-jobs-parallel-etl-and.html
       ModifiableSolrParams q = new ModifiableSolrParams();
       q.set(CommonParams.QT, "/stream");
+      q.set("collection", collection);
       q.set("expr",
-          "daemon(id=\"" + tmpCollection + "\"," +
+          "daemon(id=\"" + targetCollection + "\"," +
               "terminate=\"true\"," +
-              "commit(" + tmpCollection + "," +
-              "update(" + tmpCollection + "," +
-              "batchSize=100," +
-              "topic(" + chkCollection + "," +
-              collection + "," +
-              "q=\"*:*\"," +
-              "fl=\"*\"," +
-              "id=\"topic_" + tmpCollection + "\"," +
-              // some of the documents eg. in .system contain large blobs
-              "rows=\"100\"," +
-              "initialCheckpoint=\"0\"))))");
-      SolrResponse rsp = ocmh.cloudManager.request(new QueryRequest(q));
+              "commit(" + targetCollection + "," +
+                "update(" + targetCollection + "," +
+                  "batchSize=" + batchSize + "," +
+                  "topic(" + chkCollection + "," +
+                    collection + "," +
+                    "q=\"" + query + "\"," +
+                    "fl=\"*\"," +
+                    "id=\"topic_" + targetCollection + "\"," +
+                    // some of the documents eg. in .system contain large blobs
+                    "rows=\"" + batchSize + "\"," +
+                    "initialCheckpoint=\"0\"))))");
+      log.info("- starting copying documents from " + collection + " to " + targetCollection);
+      SolrResponse rsp = null;
+      try {
+        rsp = ocmh.cloudManager.request(new QueryRequest(q));
+      } catch (Exception e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to copy documents from " +
+            collection + " to " + targetCollection, e);
+      }
+      daemonUrl = getDaemonUrl(rsp, coll);
+      if (daemonUrl == null) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to copy documents from " +
+            collection + " to " + targetCollection + ": " + Utils.toJSONString(rsp));
+      }
 
       // wait for the daemon to finish
+      waitForDaemon(targetCollection, daemonUrl, collection);
+      if (maybeAbort(collection)) {
+        aborted = true;
+        return;
+      }
+      log.info("- finished copying from " + collection + " to " + targetCollection);
 
-      // 5. set up an alias to use the tmp collection as the target name
-
-      // 6. optionally delete the source collection
+      // 5. if (sameTarget) set up an alias to use targetCollection as the source name
+      if (sameTarget) {
+        cmd = new ZkNodeProps(
+            CommonParams.NAME, collection,
+            "collections", targetCollection);
+        ocmh.commandMap.get(CollectionParams.CollectionAction.CREATEALIAS).call(clusterState, cmd, results);
+      }
 
-      // 7. delete the checkpoint collection
+      if (maybeAbort(collection)) {
+        aborted = true;
+        return;
+      }
+      // 6. delete the checkpoint collection
+      cmd = new ZkNodeProps(
+          Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
+          CommonParams.NAME, chkCollection,
+          CoreAdminParams.DELETE_METRICS_HISTORY, "true"
+      );
+      ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
 
       // nocommit error checking
+
+      // 7. optionally delete the source collection
+      if (keepSource) {
+        // 8. set the FINISHED state and clear readOnly
+        props = new ZkNodeProps(
+            Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
+            ZkStateReader.COLLECTION_PROP, collection,
+            REINDEXING_PROP, State.FINISHED.toLower(),
+            READONLY_PROP, "");
+        ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+      } else {
+        cmd = new ZkNodeProps(
+            Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
+            CommonParams.NAME, collection,
+            CoreAdminParams.DELETE_METRICS_HISTORY, "true"
+        );
+        ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
+      }
+
+      results.add(State.FINISHED.toLower(), collection);
     } catch (Exception e) {
       aborted = true;
     } finally {
       if (aborted) {
-        // nocommit - cleanup
-
-        // 1. kill the daemons
-        // 2. cleanup tmp / chk collections IFF the source collection still exists and is not empty
-        // 3. cleanup collection state
+        cleanup(collection, targetCollection, chkCollection, daemonUrl, targetCollection);
         results.add(State.ABORTED.toLower(), collection);
       }
     }
@@ -287,23 +377,144 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
       // collection no longer present - abort
       return true;
     }
-    State state = State.get(coll.getStr(REINDEX_PROP, State.RUNNING.toLower()));
+    State state = State.get(coll.getStr(REINDEXING_PROP, State.RUNNING.toLower()));
     if (state != State.ABORTED) {
       return false;
     }
     return true;
   }
 
-  private String getDaemonUrl(SolrResponse rsp) {
+  // XXX see #waitForDaemon() for why we need this
+  private String getDaemonUrl(SolrResponse rsp, DocCollection coll) {
+    Map<String, Object> rs = (Map<String, Object>)rsp.getResponse().get("result-set");
+    if (rs == null || rs.isEmpty()) {
+      log.debug("Missing daemon information in response: " + Utils.toJSONString(rsp));
+    }
+    List<Object> list = (List<Object>)rs.get("docs");
+    if (list == null) {
+      log.debug("Missing daemon information in response: " + Utils.toJSONString(rsp));
+      return null;
+    }
+    String replicaName = null;
+    for (Object o : list) {
+      Map<String, Object> map = (Map<String, Object>)o;
+      String op = (String)map.get("DaemonOp");
+      if (op == null) {
+        continue;
+      }
+      String[] parts = op.split("\\s+");
+      if (parts.length != 4) {
+        log.debug("Invalid daemon location info, expected 4 tokens: " + op);
+        return null;
+      }
+      // check if it's plausible
+      if (parts[3].contains("shard") && parts[3].contains("replica")) {
+        replicaName = parts[3];
+        break;
+      } else {
+        log.debug("daemon location info likely invalid: " + op);
+        return null;
+      }
+    }
+    if (replicaName == null) {
+      return null;
+    }
+    // build a baseUrl of the replica
+    for (Replica r : coll.getReplicas()) {
+      if (replicaName.equals(r.getCoreName())) {
+        return r.getBaseUrl() + "/" + r.getCoreName();
+      }
+    }
     return null;
   }
 
-  private void cleanup(String collection, String daemonUrl) throws Exception {
+  // XXX currently this is complicated to due a bug in the way the daemon 'list'
+  // XXX operation is implemented - see SOLR-13245. We need to query the actual
+  // XXX SolrCore where the daemon is running
+  private void waitForDaemon(String daemonName, String daemonUrl, String collection) throws Exception {
+    HttpClient client = ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
+    try (HttpSolrClient solrClient = new HttpSolrClient.Builder()
+        .withHttpClient(client)
+        .withBaseSolrUrl(daemonUrl).build()) {
+      ModifiableSolrParams q = new ModifiableSolrParams();
+      q.set(CommonParams.QT, "/stream");
+      q.set("action", "list");
+      q.set(CommonParams.DISTRIB, false);
+      QueryRequest req = new QueryRequest(q);
+      boolean isRunning;
+      do {
+        isRunning = false;
+        try {
+          NamedList<Object> rsp = solrClient.request(req);
+          Map<String, Object> rs = (Map<String, Object>)rsp.get("result-set");
+          if (rs == null || rs.isEmpty()) {
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Missing result-set: " + Utils.toJSONString(rsp));
+          }
+          List<Object> list = (List<Object>)rs.get("docs");
+          if (list == null) {
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Missing result-set: " + Utils.toJSONString(rsp));
+          }
+          if (list.isEmpty()) { // finished?
+            break;
+          }
+          for (Object o : list) {
+            Map<String, Object> map = (Map<String, Object>)o;
+            String id = (String)map.get("id");
+            if (daemonName.equals(id)) {
+              isRunning = true;
+              break;
+            }
+          }
+        } catch (Exception e) {
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception copying the documents", e);
+        }
+        ocmh.cloudManager.getTimeSource().sleep(5000);
+      } while (isRunning && !maybeAbort(collection));
+    }
+  }
+
+  private void killDaemon(String daemonName, String daemonUrl) throws Exception {
+    HttpClient client = ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
+    try (HttpSolrClient solrClient = new HttpSolrClient.Builder()
+        .withHttpClient(client)
+        .withBaseSolrUrl(daemonUrl).build()) {
+      ModifiableSolrParams q = new ModifiableSolrParams();
+      q.set(CommonParams.QT, "/stream");
+      q.set("action", "kill");
+      q.set(CommonParams.ID, daemonName);
+      q.set(CommonParams.DISTRIB, false);
+      QueryRequest req = new QueryRequest(q);
+      NamedList<Object> rsp = solrClient.request(req);
+      // /result-set/docs/[0]/DaemonOp : Deamon:id killed on coreName
+
+      // nocommit error checking
+    }
+  }
+
+  private void cleanup(String collection, String targetCollection, String chkCollection, String daemonUrl, String daemonName) throws Exception {
+    // 1. kill the daemon
+    // 2. cleanup target / chk collections IFF the source collection still exists and is not empty
+    // 3. cleanup collection state
 
     if (daemonUrl != null) {
-      // kill the daemon
+      killDaemon(daemonName, daemonUrl);
     }
     ClusterState clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
-
+    NamedList<Object> cmdResults = new NamedList<>();
+    if (!collection.equals(targetCollection)) {
+      ZkNodeProps cmd = new ZkNodeProps(
+          Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
+          CommonParams.NAME, targetCollection,
+          CoreAdminParams.DELETE_METRICS_HISTORY, "true"
+      );
+      ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
+      // nocommit error checking
+    }
+    ZkNodeProps props = new ZkNodeProps(
+        Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
+        ZkStateReader.COLLECTION_PROP, collection,
+        REINDEXING_PROP, State.ABORTED.toLower(),
+        READONLY_PROP, "");
+    ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 7b7b1e5..6a7ea05 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -76,6 +76,7 @@ import org.apache.solr.common.params.AutoScalingParams;
 import org.apache.solr.common.params.CollectionAdminParams;
 import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.params.CollectionParams.CollectionAction;
+import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
@@ -530,7 +531,21 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
       copy(req.getParams(), m,
           ReindexCollectionCmd.ABORT,
           ReindexCollectionCmd.KEEP_SOURCE,
-          ReindexCollectionCmd.TARGET);
+          ReindexCollectionCmd.TARGET,
+          ZkStateReader.CONFIGNAME_PROP,
+          NUM_SLICES,
+          NRT_REPLICAS,
+          PULL_REPLICAS,
+          TLOG_REPLICAS,
+          REPLICATION_FACTOR,
+          MAX_SHARDS_PER_NODE,
+          POLICY,
+          CREATE_NODE_SET,
+          CREATE_NODE_SET_SHUFFLE,
+          AUTO_ADD_REPLICAS,
+          CommonParams.ROWS,
+          CommonParams.Q);
+      copyPropertiesWithPrefix(req.getParams(), m, "router.");
       return m;
     }),
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java b/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java
new file mode 100644
index 0000000..ae9d5e0
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.api.collections.ReindexCollectionCmd;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CommonParams;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class ReindexCollectionTest extends SolrCloudTestCase {
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(2)
+        .addConfig("conf1", configset("cloud-dynamic"))
+        .addConfig("conf2", configset("cloud-minimal"))
+        .configure();
+  }
+
+  private CloudSolrClient solrClient;
+  private SolrCloudManager cloudManager;
+
+  @Before
+  public void doBefore() throws Exception {
+    solrClient = getCloudSolrClient(cluster);
+    cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
+  }
+
+  @After
+  public void doAfter() throws Exception {
+    cluster.deleteAllCollections(); // deletes aliases too
+
+    solrClient.close();
+  }
+
+  private static final int NUM_DOCS = 200;
+
+  @Test
+  public void testBasicReindexing() throws Exception {
+    final String sourceCollection = "basicReindexing";
+
+    CollectionAdminRequest.createCollection(sourceCollection, "conf1", 2, 2)
+        .setMaxShardsPerNode(-1)
+        .process(solrClient);
+
+    cluster.waitForActiveCollection(sourceCollection, 2, 4);
+
+    // verify that indexing works
+    List<SolrInputDocument> docs = new ArrayList<>();
+    for (int i = 0; i < NUM_DOCS; i++) {
+      docs.add(new SolrInputDocument("id", String.valueOf(i), "string_s", String.valueOf(i)));
+    }
+    solrClient.add(sourceCollection, docs);
+    solrClient.commit(sourceCollection);
+    // verify the docs exist
+    QueryResponse rsp = solrClient.query(sourceCollection, params(CommonParams.Q, "*:*"));
+    assertEquals("initial num docs", NUM_DOCS, rsp.getResults().getNumFound());
+
+    final String targetCollection = "basicReindexingTarget";
+
+    CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
+        .setTarget(targetCollection);
+    req.process(solrClient);
+
+    CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", sourceCollection, (liveNodes, coll) -> {
+      ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_PROP));
+      return ReindexCollectionCmd.State.FINISHED == state;
+    });
+    // verify the target docs exist
+    rsp = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
+    assertEquals("copied num docs", NUM_DOCS, rsp.getResults().getNumFound());
+  }
+}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index a326abe..a024edfb9 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -789,10 +789,56 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
   }
 
   public static class ReindexCollection extends AsyncCollectionSpecificAdminRequest {
+    String target;
+    String query;
+    String configName;
+    Boolean keepSource;
+    Integer batchSize;
 
     private ReindexCollection(String collection) {
       super(CollectionAction.REINDEX_COLLECTION, collection);
     }
+
+    public ReindexCollection setTarget(String target) {
+      this.target = target;
+      return this;
+    }
+
+    public ReindexCollection setQuery(String query) {
+      this.query = query;
+      return this;
+    }
+
+    public ReindexCollection setKeepSource(boolean keepSource) {
+      this.keepSource = keepSource;
+      return this;
+    }
+
+    public ReindexCollection setBatchSize(int batchSize) {
+      this.batchSize = batchSize;
+      return this;
+    }
+
+    @Override
+    public SolrParams getParams() {
+      ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
+      if (target != null) {
+        params.set("target", target);
+      }
+      if (configName != null) {
+        params.set(ZkStateReader.CONFIGNAME_PROP, configName);
+      }
+      if (query != null) {
+        params.set(CommonParams.Q, query);
+      }
+      if (keepSource != null) {
+        params.set("keepSource", keepSource);
+      }
+      if (batchSize != null) {
+        params.set(CommonParams.ROWS, batchSize);
+      }
+      return params;
+    }
   }
 
   /**
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
index 30778b8..c4dad33 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
@@ -100,6 +100,11 @@ public class CompositeIdRouter extends HashBasedRouter {
     return targetSlices;
   }
 
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
   public List<Range> partitionRangeByKey(String key, Range range) {
     List<Range> result = new ArrayList<>(3);
     Range keyRange = keyHashRange(key);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java
index 111c74b..335c86d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java
@@ -223,6 +223,7 @@ public abstract class DocRouter {
 
   public abstract boolean isTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, String shardId, DocCollection collection);
 
+  public abstract String getName();
 
   /** This method is consulted to determine what slices should be queried for a request when
    *  an explicit shards parameter was not used.
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java
index 0b25fcb..7e51621 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java
@@ -76,6 +76,11 @@ public class ImplicitDocRouter extends DocRouter {
   }
 
   @Override
+  public String getName() {
+    return NAME;
+  }
+
+  @Override
   public Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection) {
 
     if (shardKey == null) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PlainIdRouter.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PlainIdRouter.java
index f1cea47..d63c5cd 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PlainIdRouter.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PlainIdRouter.java
@@ -19,4 +19,9 @@ package org.apache.solr.common.cloud;
 
 public class PlainIdRouter extends HashBasedRouter {
   public static final String NAME = "plain";
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
 }