You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2018/08/01 14:31:07 UTC

[1/2] lucene-solr:master: SOLR-12509: Improve SplitShardCmd performance and reliability.

Repository: lucene-solr
Updated Branches:
  refs/heads/master c6e0c2875 -> 1133bf98a


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/test/org/apache/solr/update/SolrIndexSplitterTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/SolrIndexSplitterTest.java b/solr/core/src/test/org/apache/solr/update/SolrIndexSplitterTest.java
index ae743da..79eccd9 100644
--- a/solr/core/src/test/org/apache/solr/update/SolrIndexSplitterTest.java
+++ b/solr/core/src/test/org/apache/solr/update/SolrIndexSplitterTest.java
@@ -34,9 +34,11 @@ import org.apache.solr.common.cloud.CompositeIdRouter;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.PlainIdRouter;
 import org.apache.solr.common.util.Hash;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.DirectoryFactory;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -45,13 +47,15 @@ import org.slf4j.LoggerFactory;
 
 public class SolrIndexSplitterTest extends SolrTestCaseJ4 {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  
+
   File indexDir1 = null, indexDir2 = null, indexDir3 = null;
 
   @BeforeClass
   public static void beforeClass() throws Exception {
-    System.setProperty("enable.update.log", "false"); // schema12 doesn't support _version_
-    initCore("solrconfig.xml", "schema12.xml");
+   // System.setProperty("enable.update.log", "false"); // schema12 doesn't support _version_
+    System.setProperty("solr.directoryFactory", "solr.NRTCachingDirectoryFactory");
+    System.setProperty("solr.tests.lockType", DirectoryFactory.LOCK_TYPE_SIMPLE);
+    initCore("solrconfig.xml", "schema15.xml");
   }
 
   @Override
@@ -67,6 +71,15 @@ public class SolrIndexSplitterTest extends SolrTestCaseJ4 {
 
   @Test
   public void testSplitByPaths() throws Exception {
+    doTestSplitByPaths(SolrIndexSplitter.SplitMethod.REWRITE);
+  }
+
+  @Test
+  public void testSplitByPathsLink() throws Exception {
+    doTestSplitByPaths(SolrIndexSplitter.SplitMethod.LINK);
+  }
+
+  private void doTestSplitByPaths(SolrIndexSplitter.SplitMethod splitMethod) throws Exception {
     LocalSolrQueryRequest request = null;
     try {
       // add two docs
@@ -81,10 +94,10 @@ public class SolrIndexSplitterTest extends SolrTestCaseJ4 {
       List<DocRouter.Range> ranges = getRanges(id1, id2);
 
       request = lrf.makeRequest("q", "dummy");
-
-      SplitIndexCommand command = new SplitIndexCommand(request,
-          Lists.newArrayList(indexDir1.getAbsolutePath(), indexDir2.getAbsolutePath()), null, ranges, new PlainIdRouter(), null, null);
-      new SolrIndexSplitter(command).split();
+      SolrQueryResponse rsp = new SolrQueryResponse();
+      SplitIndexCommand command = new SplitIndexCommand(request, rsp,
+          Lists.newArrayList(indexDir1.getAbsolutePath(), indexDir2.getAbsolutePath()), null, ranges, new PlainIdRouter(), null, null, splitMethod);
+      doSplit(command);
 
       Directory directory = h.getCore().getDirectoryFactory().get(indexDir1.getAbsolutePath(),
           DirectoryFactory.DirContext.DEFAULT, h.getCore().getSolrConfig().indexConfig.lockType);
@@ -106,9 +119,23 @@ public class SolrIndexSplitterTest extends SolrTestCaseJ4 {
       if (request != null) request.close(); // decrefs the searcher
     }
   }
+
+  private void doSplit(SplitIndexCommand command) throws Exception {
+    NamedList<Object> results = new NamedList<>();
+    new SolrIndexSplitter(command).split(results);
+    command.rsp.addResponse(results);
+  }
   
   // SOLR-5144
   public void testSplitDeletes() throws Exception {
+    doTestSplitDeletes(SolrIndexSplitter.SplitMethod.REWRITE);
+  }
+
+  public void testSplitDeletesLink() throws Exception {
+    doTestSplitDeletes(SolrIndexSplitter.SplitMethod.LINK);
+  }
+
+  private void doTestSplitDeletes(SolrIndexSplitter.SplitMethod splitMethod) throws Exception {
     LocalSolrQueryRequest request = null;
     try {
       // add two docs
@@ -126,10 +153,11 @@ public class SolrIndexSplitterTest extends SolrTestCaseJ4 {
       List<DocRouter.Range> ranges = getRanges(id1, id2);
 
       request = lrf.makeRequest("q", "dummy");
+      SolrQueryResponse rsp = new SolrQueryResponse();
 
-      SplitIndexCommand command = new SplitIndexCommand(request,
-          Lists.newArrayList(indexDir1.getAbsolutePath(), indexDir2.getAbsolutePath()), null, ranges, new PlainIdRouter(), null, null);
-      new SolrIndexSplitter(command).split();
+      SplitIndexCommand command = new SplitIndexCommand(request, rsp,
+          Lists.newArrayList(indexDir1.getAbsolutePath(), indexDir2.getAbsolutePath()), null, ranges, new PlainIdRouter(), null, null, splitMethod);
+      doSplit(command);
 
       Directory directory = h.getCore().getDirectoryFactory().get(indexDir1.getAbsolutePath(),
           DirectoryFactory.DirContext.DEFAULT, h.getCore().getSolrConfig().indexConfig.lockType);
@@ -152,11 +180,25 @@ public class SolrIndexSplitterTest extends SolrTestCaseJ4 {
 
   @Test
   public void testSplitByCores() throws Exception {
-    // add two docs
+    doTestSplitByCores(SolrIndexSplitter.SplitMethod.REWRITE);
+  }
+
+  @Test
+  public void testSplitByCoresLink() throws Exception {
+    doTestSplitByCores(SolrIndexSplitter.SplitMethod.LINK);
+  }
+
+  private void doTestSplitByCores(SolrIndexSplitter.SplitMethod splitMethod) throws Exception {
+    // add three docs and 1 delete
     String id1 = "dorothy";
     assertU(adoc("id", id1));
     String id2 = "kansas";
     assertU(adoc("id", id2));
+    String id3 = "wizard";
+    assertU(adoc("id", id3));
+    assertU(commit());
+    assertJQ(req("q", "*:*"), "/response/numFound==3");
+    assertU(delI("wizard"));
     assertU(commit());
     assertJQ(req("q", "*:*"), "/response/numFound==2");
     List<DocRouter.Range> ranges = getRanges(id1, id2);
@@ -165,16 +207,17 @@ public class SolrIndexSplitterTest extends SolrTestCaseJ4 {
     try {
 
       core1 = h.getCoreContainer().create("split1",
-          ImmutableMap.of("dataDir", indexDir1.getAbsolutePath(), "configSet", "minimal"));
+          ImmutableMap.of("dataDir", indexDir1.getAbsolutePath(), "configSet", "cloud-minimal"));
       core2 = h.getCoreContainer().create("split2",
-          ImmutableMap.of("dataDir", indexDir2.getAbsolutePath(), "configSet", "minimal"));
+          ImmutableMap.of("dataDir", indexDir2.getAbsolutePath(), "configSet", "cloud-minimal"));
 
       LocalSolrQueryRequest request = null;
       try {
         request = lrf.makeRequest("q", "dummy");
-
-        SplitIndexCommand command = new SplitIndexCommand(request, null, Lists.newArrayList(core1, core2), ranges, new PlainIdRouter(), null, null);
-        new SolrIndexSplitter(command).split();
+        SolrQueryResponse rsp = new SolrQueryResponse();
+        SplitIndexCommand command = new SplitIndexCommand(request, rsp, null, Lists.newArrayList(core1, core2), ranges,
+            new PlainIdRouter(), null, null, splitMethod);
+        doSplit(command);
       } finally {
         if (request != null) request.close();
       }
@@ -196,6 +239,15 @@ public class SolrIndexSplitterTest extends SolrTestCaseJ4 {
 
   @Test
   public void testSplitAlternately() throws Exception {
+    doTestSplitAlternately(SolrIndexSplitter.SplitMethod.REWRITE);
+  }
+
+  @Test
+  public void testSplitAlternatelyLink() throws Exception {
+    doTestSplitAlternately(SolrIndexSplitter.SplitMethod.LINK);
+  }
+
+  private void doTestSplitAlternately(SolrIndexSplitter.SplitMethod splitMethod) throws Exception {
     LocalSolrQueryRequest request = null;
     Directory directory = null;
     try {
@@ -208,10 +260,11 @@ public class SolrIndexSplitterTest extends SolrTestCaseJ4 {
       assertU(commit());
 
       request = lrf.makeRequest("q", "dummy");
-
-      SplitIndexCommand command = new SplitIndexCommand(request,
-          Lists.newArrayList(indexDir1.getAbsolutePath(), indexDir2.getAbsolutePath(), indexDir3.getAbsolutePath()), null, null, new PlainIdRouter(), null, null);
-      new SolrIndexSplitter(command).split();
+      SolrQueryResponse rsp = new SolrQueryResponse();
+      SplitIndexCommand command = new SplitIndexCommand(request, rsp,
+          Lists.newArrayList(indexDir1.getAbsolutePath(), indexDir2.getAbsolutePath(), indexDir3.getAbsolutePath()),
+          null, null, new PlainIdRouter(), null, null, splitMethod);
+      doSplit(command);
 
       directory = h.getCore().getDirectoryFactory().get(indexDir1.getAbsolutePath(),
           DirectoryFactory.DirContext.DEFAULT, h.getCore().getSolrConfig().indexConfig.lockType);
@@ -242,7 +295,16 @@ public class SolrIndexSplitterTest extends SolrTestCaseJ4 {
   }
 
   @Test
-  public void testSplitByRouteKey() throws Exception  {
+  public void testSplitByRouteKey() throws Exception {
+    doTestSplitByRouteKey(SolrIndexSplitter.SplitMethod.REWRITE);
+  }
+
+  @Test
+  public void testSplitByRouteKeyLink() throws Exception  {
+    doTestSplitByRouteKey(SolrIndexSplitter.SplitMethod.LINK);
+  }
+
+  private void doTestSplitByRouteKey(SolrIndexSplitter.SplitMethod splitMethod) throws Exception  {
     File indexDir = createTempDir().toFile();
 
     CompositeIdRouter r1 = new CompositeIdRouter();
@@ -274,9 +336,11 @@ public class SolrIndexSplitterTest extends SolrTestCaseJ4 {
     Directory directory = null;
     try {
       request = lrf.makeRequest("q", "dummy");
-      SplitIndexCommand command = new SplitIndexCommand(request,
-          Lists.newArrayList(indexDir.getAbsolutePath()), null, Lists.newArrayList(splitKeyRange), new CompositeIdRouter(), null, splitKey);
-      new SolrIndexSplitter(command).split();
+      SolrQueryResponse rsp = new SolrQueryResponse();
+      SplitIndexCommand command = new SplitIndexCommand(request, rsp,
+          Lists.newArrayList(indexDir.getAbsolutePath()), null, Lists.newArrayList(splitKeyRange),
+          new CompositeIdRouter(), null, splitKey, splitMethod);
+      doSplit(command);
       directory = h.getCore().getDirectoryFactory().get(indexDir.getAbsolutePath(),
           DirectoryFactory.DirContext.DEFAULT, h.getCore().getSolrConfig().indexConfig.lockType);
       DirectoryReader reader = DirectoryReader.open(directory);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/solr-ref-guide/src/collections-api.adoc
----------------------------------------------------------------------
diff --git a/solr/solr-ref-guide/src/collections-api.adoc b/solr/solr-ref-guide/src/collections-api.adoc
index b94509d..80ce941 100644
--- a/solr/solr-ref-guide/src/collections-api.adoc
+++ b/solr/solr-ref-guide/src/collections-api.adoc
@@ -259,9 +259,11 @@ This command allows for seamless splitting and requires no downtime. A shard bei
 
 The split is performed by dividing the original shard's hash range into two equal partitions and dividing up the documents in the original shard according to the new sub-ranges. Two parameters discussed below, `ranges` and `split.key` provide further control over how the split occurs.
 
-The newly created shards will have as many replicas as the parent shard.
+The newly created shards will have as many replicas as the parent shard, of the same replica types.
 
-You must ensure that the node running the leader of the parent shard has enough free disk space i.e., more than twice the index size, for the split to succeed. The API uses the Autoscaling framework to find nodes that can satisfy the disk requirements for the new replicas but only when an Autoscaling policy is configured. Refer to <<solrcloud-autoscaling-policy-preferences.adoc#solrcloud-autoscaling-policy-preferences,Autoscaling Policy and Preferences>> section for more details.
+When using `splitMethod=rewrite` (default) you must ensure that the node running the leader of the parent shard has enough free disk space i.e., more than twice the index size, for the split to succeed. The API uses the Autoscaling framework to find nodes that can satisfy the disk requirements for the new replicas but only when an Autoscaling policy is configured. Refer to <<solrcloud-autoscaling-policy-preferences.adoc#solrcloud-autoscaling-policy-preferences,Autoscaling Policy and Preferences>> section for more details.
+
+Also, the first replicas of resulting sub-shards will always be placed on the shard leader node, which may cause Autoscaling policy violations that need to be resolved either automatically (when appropriate triggers are in use) or manually.
 
 Shard splitting can be a long running process. In order to avoid timeouts, you should run this as an <<Asynchronous Calls,asynchronous call>>.
 
@@ -285,12 +287,26 @@ This parameter can be used to split a shard using a route key such that all docu
 +
 For example, suppose `split.key=A!` hashes to the range `12-15` and belongs to shard 'shard1' with range `0-20`. Splitting by this route key would yield three sub-shards with ranges `0-11`, `12-15` and `16-20`. Note that the sub-shard with the hash range of the route key may also contain documents for other route keys whose hash ranges overlap.
 
+`splitMethod`::
+Currently two methods of shard splitting are supported:
+* `splitMethod=rewrite` (default) after selecting documents to retain in each partition this method creates sub-indexes from
+scratch, which is a lengthy CPU- and I/O-intensive process but results in optimally-sized sub-indexes that don't contain
+any data from documents not belonging to each partition.
+* `splitMethod=link` uses file system-level hard links for creating copies of the original index files and then only modifies the
+file that contains the list of deleted documents in each partition. This method is many times quicker and lighter on resources than the
+`rewrite` method but the resulting sub-indexes are still as large as the original index because they still contain data from documents not
+belonging to the partition. This slows down the replication process and consumes more disk space on replica nodes (the multiple hard-linked
+copies don't occupy additional disk space on the leader node, unless hard-linking is not supported).
+
 `property._name_=_value_`::
 Set core property _name_ to _value_. See the section <<defining-core-properties.adoc#defining-core-properties,Defining core.properties>> for details on supported properties and values.
 
 `waitForFinalState`::
 If `true`, the request will complete only when all affected replicas become active. The default is `false`, which means that the API will return the status of the single action, which may be before the new replica is online and active.
 
+`timing`::
+If `true` then each stage of processing will be timed and a `timing` section will be included in response.
+
 `async`::
 Request ID to track this action which will be <<Asynchronous Calls,processed asynchronously>>
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 8d36296..50cd65d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -1144,6 +1144,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     protected String ranges;
     protected String splitKey;
     protected String shard;
+    protected String splitMethod;
 
     private Properties properties;
 
@@ -1155,6 +1156,15 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     public SplitShard setRanges(String ranges) { this.ranges = ranges; return this; }
     public String getRanges() { return ranges; }
 
+    public SplitShard setSplitMethod(String splitMethod) {
+      this.splitMethod = splitMethod;
+      return this;
+    }
+
+    public String getSplitMethod() {
+      return splitMethod;
+    }
+
     public SplitShard setSplitKey(String splitKey) {
       this.splitKey = splitKey;
       return this;
@@ -1191,6 +1201,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       params.set(CoreAdminParams.SHARD, shard);
       params.set("split.key", this.splitKey);
       params.set(CoreAdminParams.RANGES, ranges);
+      params.set(CommonAdminParams.SPLIT_METHOD, splitMethod);
 
       if(properties != null) {
         addProperties(params, properties);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/solrj/src/java/org/apache/solr/common/params/CommonAdminParams.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CommonAdminParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CommonAdminParams.java
index c39b4a8..c12ee32 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CommonAdminParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CommonAdminParams.java
@@ -25,6 +25,8 @@ public interface CommonAdminParams
   String WAIT_FOR_FINAL_STATE = "waitForFinalState";
   /** Allow in-place move of replicas that use shared filesystems. */
   String IN_PLACE_MOVE = "inPlaceMove";
+  /** Method to use for shard splitting. */
+  String SPLIT_METHOD = "splitMethod";
   /** Timeout for replicas to become active. */
   String TIMEOUT = "timeout";
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
index 58c39a3..9103450 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
@@ -84,7 +84,7 @@ public abstract class CoreAdminParams
 
   /** The hash ranges to be used to split a shard or an index */
   public final static String RANGES = "ranges";
-  
+
   public static final String ROLES = "roles";
 
   public static final String REQUESTID = "requestid";


[2/2] lucene-solr:master: SOLR-12509: Improve SplitShardCmd performance and reliability.

Posted by ab...@apache.org.
SOLR-12509: Improve SplitShardCmd performance and reliability.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/1133bf98
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/1133bf98
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/1133bf98

Branch: refs/heads/master
Commit: 1133bf98a5fd075173efecfb75a51493fceb62b3
Parents: c6e0c28
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Wed Aug 1 14:39:37 2018 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Wed Aug 1 16:30:59 2018 +0200

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   4 +
 .../cloud/api/collections/SplitShardCmd.java    | 144 ++++--
 .../solr/cloud/overseer/ReplicaMutator.java     |  47 +-
 .../solr/handler/admin/CollectionsHandler.java  |   6 +-
 .../org/apache/solr/handler/admin/SplitOp.java  |  24 +-
 .../solr/update/DirectUpdateHandler2.java       |   5 +-
 .../apache/solr/update/SolrIndexSplitter.java   | 464 +++++++++++++++++--
 .../apache/solr/update/SplitIndexCommand.java   |  22 +-
 .../solr/cloud/ChaosMonkeyShardSplitTest.java   |   2 +-
 .../cloud/api/collections/ShardSplitTest.java   |  73 ++-
 .../solr/update/SolrIndexSplitterTest.java      | 112 ++++-
 solr/solr-ref-guide/src/collections-api.adoc    |  20 +-
 .../solrj/request/CollectionAdminRequest.java   |  11 +
 .../solr/common/params/CommonAdminParams.java   |   2 +
 .../solr/common/params/CoreAdminParams.java     |   2 +-
 15 files changed, 773 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 37dd5a7..49fc7fe 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -205,6 +205,10 @@ Optimizations
 * SOLR-12305: When a replica is applying updates, some kind of updates can skip buffering for faster recovery.
   (Cao Manh Dat)
 
+* SOLR-12509: Improve SplitShardCmd performance and reliability. A new method of splitting has been
+  introduced (splitMethod=link) which uses hard-linking of index files when possible, resulting in
+  significant speedups and reduced CPU / IO load on shard leader. (ab)
+
 Other Changes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index b5408f8..00488a3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -51,11 +52,14 @@ import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.rule.ImplicitSnitch;
 import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.update.SolrIndexSplitter;
+import org.apache.solr.util.RTimerTree;
 import org.apache.solr.util.TestInjection;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -87,13 +91,23 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
 
   public boolean split(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
     boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
+    String methodStr = message.getStr(CommonAdminParams.SPLIT_METHOD, SolrIndexSplitter.SplitMethod.REWRITE.toLower());
+    SolrIndexSplitter.SplitMethod splitMethod = SolrIndexSplitter.SplitMethod.get(methodStr);
+    if (splitMethod == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown value '" + CommonAdminParams.SPLIT_METHOD +
+          ": " + methodStr);
+    }
+    boolean withTiming = message.getBool(CommonParams.TIMING, false);
+
     String collectionName = message.getStr(CoreAdminParams.COLLECTION);
 
-    log.info("Split shard invoked");
+    log.debug("Split shard invoked: {}", message);
     ZkStateReader zkStateReader = ocmh.zkStateReader;
     zkStateReader.forceUpdateCollection(collectionName);
     AtomicReference<String> slice = new AtomicReference<>();
     slice.set(message.getStr(ZkStateReader.SHARD_ID_PROP));
+    Set<String> offlineSlices = new HashSet<>();
+    RTimerTree timings = new RTimerTree();
 
     String splitKey = message.getStr("split.key");
     DocCollection collection = clusterState.getCollection(collectionName);
@@ -101,6 +115,9 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
     PolicyHelper.SessionWrapper sessionWrapper = null;
 
     Slice parentSlice = getParentSlice(clusterState, collectionName, slice, splitKey);
+    if (parentSlice.getState() != Slice.State.ACTIVE) {
+      throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Parent slice is not active: " + parentSlice.getState());
+    }
 
     // find the leader for the shard
     Replica parentShardLeader = null;
@@ -111,7 +128,9 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Interrupted.");
     }
 
+    RTimerTree t = timings.sub("checkDiskSpace");
     checkDiskSpace(collectionName, slice.get(), parentShardLeader);
+    t.stop();
 
     // let's record the ephemeralOwner of the parent leader node
     Stat leaderZnodeStat = zkStateReader.getZkClient().exists(ZkStateReader.LIVE_NODES_ZKNODE + "/" + parentShardLeader.getNodeName(), null, true);
@@ -142,20 +161,22 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
     });
     int repFactor = numNrt.get() + numTlog.get() + numPull.get();
 
-    // type of the first subreplica will be the same as leader
-    boolean firstNrtReplica = parentShardLeader.getType() == Replica.Type.NRT;
-    // verify that we indeed have the right number of correct replica types
-    if ((firstNrtReplica && numNrt.get() < 1) || (!firstNrtReplica && numTlog.get() < 1)) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "aborting split - inconsistent replica types in collection " + collectionName +
-          ": nrt=" + numNrt.get() + ", tlog=" + numTlog.get() + ", pull=" + numPull.get() + ", shard leader type is " +
-      parentShardLeader.getType());
-    }
-
-    List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
+    boolean success = false;
+    try {
+      // type of the first subreplica will be the same as leader
+      boolean firstNrtReplica = parentShardLeader.getType() == Replica.Type.NRT;
+      // verify that we indeed have the right number of correct replica types
+      if ((firstNrtReplica && numNrt.get() < 1) || (!firstNrtReplica && numTlog.get() < 1)) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "aborting split - inconsistent replica types in collection " + collectionName +
+            ": nrt=" + numNrt.get() + ", tlog=" + numTlog.get() + ", pull=" + numPull.get() + ", shard leader type is " +
+            parentShardLeader.getType());
+      }
 
-    String rangesStr = fillRanges(ocmh.cloudManager, message, collection, parentSlice, subRanges, subSlices, subShardNames, firstNrtReplica);
+      List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
 
-    try {
+      t = timings.sub("fillRanges");
+      String rangesStr = fillRanges(ocmh.cloudManager, message, collection, parentSlice, subRanges, subSlices, subShardNames, firstNrtReplica);
+      t.stop();
 
       boolean oldShardsDeleted = false;
       for (String subSlice : subSlices) {
@@ -196,12 +217,13 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
       Map<String, String> requestMap = new HashMap<>();
       String nodeName = parentShardLeader.getNodeName();
 
+      t = timings.sub("createSubSlicesAndLeadersInState");
       for (int i = 0; i < subRanges.size(); i++) {
         String subSlice = subSlices.get(i);
         String subShardName = subShardNames.get(i);
         DocRouter.Range subRange = subRanges.get(i);
 
-        log.info("Creating slice " + subSlice + " of collection " + collectionName + " on " + nodeName);
+        log.debug("Creating slice " + subSlice + " of collection " + collectionName + " on " + nodeName);
 
         Map<String, Object> propMap = new HashMap<>();
         propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD.toLower());
@@ -210,7 +232,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
         propMap.put(ZkStateReader.SHARD_RANGE_PROP, subRange.toString());
         propMap.put(ZkStateReader.SHARD_STATE_PROP, Slice.State.CONSTRUCTION.toString());
         propMap.put(ZkStateReader.SHARD_PARENT_PROP, parentSlice.getName());
-        propMap.put("shard_parent_node", parentShardLeader.getNodeName());
+        propMap.put("shard_parent_node", nodeName);
         propMap.put("shard_parent_zk_session", leaderZnodeStat.getEphemeralOwner());
         DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
         inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
@@ -221,7 +243,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
         // refresh cluster state
         clusterState = zkStateReader.getClusterState();
 
-        log.info("Adding replica " + subShardName + " as part of slice " + subSlice + " of collection " + collectionName
+        log.debug("Adding first replica " + subShardName + " as part of slice " + subSlice + " of collection " + collectionName
             + " on " + nodeName);
         propMap = new HashMap<>();
         propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
@@ -248,9 +270,11 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
 
       ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard leaders", asyncId, requestMap);
 
+      t.stop();
+      t = timings.sub("waitForSubSliceLeadersAlive");
       for (String subShardName : subShardNames) {
         // wait for parent leader to acknowledge the sub-shard core
-        log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
+        log.debug("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
         String coreNodeName = ocmh.waitForCoreNodeName(collectionName, nodeName, subShardName);
         CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
         cmd.setCoreName(subShardName);
@@ -266,8 +290,9 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
 
       ocmh.processResponses(results, shardHandler, true, "SPLITSHARD timed out waiting for subshard leaders to come up",
           asyncId, requestMap);
+      t.stop();
 
-      log.info("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice
+      log.debug("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice
           + " on: " + parentShardLeader);
 
       log.info("Splitting shard " + parentShardLeader.getName() + " as part of slice " + slice + " of collection "
@@ -275,6 +300,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
 
       ModifiableSolrParams params = new ModifiableSolrParams();
       params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.SPLIT.toString());
+      params.set(CommonAdminParams.SPLIT_METHOD, splitMethod.toLower());
       params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core"));
       for (int i = 0; i < subShardNames.size(); i++) {
         String subShardName = subShardNames.get(i);
@@ -282,18 +308,22 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
       }
       params.set(CoreAdminParams.RANGES, rangesStr);
 
+      t = timings.sub("splitParentCore");
+
       ocmh.sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
 
       ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to invoke SPLIT core admin command", asyncId,
           requestMap);
+      t.stop();
 
-      log.info("Index on shard: " + nodeName + " split into two successfully");
+      log.debug("Index on shard: " + nodeName + " split into two successfully");
 
+      t = timings.sub("applyBufferedUpdates");
       // apply buffered updates on sub-shards
       for (int i = 0; i < subShardNames.size(); i++) {
         String subShardName = subShardNames.get(i);
 
-        log.info("Applying buffered updates on : " + subShardName);
+        log.debug("Applying buffered updates on : " + subShardName);
 
         params = new ModifiableSolrParams();
         params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString());
@@ -304,8 +334,9 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
 
       ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed while asking sub shard leaders" +
           " to apply buffered updates", asyncId, requestMap);
+      t.stop();
 
-      log.info("Successfully applied buffered updates on : " + subShardNames);
+      log.debug("Successfully applied buffered updates on : " + subShardNames);
 
       // Replica creation for the new Slices
       // replica placement is controlled by the autoscaling policy framework
@@ -329,6 +360,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
         numTlog.decrementAndGet();
       }
 
+      t = timings.sub("identifyNodesForReplicas");
       List<ReplicaPosition> replicaPositions = Assign.identifyNodes(ocmh.cloudManager,
           clusterState,
           new ArrayList<>(clusterState.getLiveNodes()),
@@ -336,13 +368,15 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
           new ZkNodeProps(collection.getProperties()),
           subSlices, numNrt.get(), numTlog.get(), numPull.get());
       sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
+      t.stop();
 
+      t = timings.sub("createReplicaPlaceholders");
       for (ReplicaPosition replicaPosition : replicaPositions) {
         String sliceName = replicaPosition.shard;
         String subShardNodeName = replicaPosition.node;
         String solrCoreName = Assign.buildSolrCoreName(collectionName, sliceName, replicaPosition.type, replicaPosition.index);
 
-        log.info("Creating replica shard " + solrCoreName + " as part of slice " + sliceName + " of collection "
+        log.debug("Creating replica shard " + solrCoreName + " as part of slice " + sliceName + " of collection "
             + collectionName + " on " + subShardNodeName);
 
         // we first create all replicas in DOWN state without actually creating their cores in order to
@@ -384,7 +418,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
 
         replicas.add(propMap);
       }
-
+      t.stop();
       assert TestInjection.injectSplitFailureBeforeReplicaCreation();
 
       long ephemeralOwner = leaderZnodeStat.getEphemeralOwner();
@@ -414,12 +448,12 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
       }
 
       // we must set the slice state into recovery before actually creating the replica cores
-      // this ensures that the logic inside Overseer to update sub-shard state to 'active'
+      // this ensures that the logic inside ReplicaMutator to update sub-shard state to 'active'
       // always gets a chance to execute. See SOLR-7673
 
       if (repFactor == 1) {
         // switch sub shard states to 'active'
-        log.info("Replication factor is 1 so switching shard states");
+        log.debug("Replication factor is 1 so switching shard states");
         DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
         Map<String, Object> propMap = new HashMap<>();
         propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
@@ -431,7 +465,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
         ZkNodeProps m = new ZkNodeProps(propMap);
         inQueue.offer(Utils.toJSON(m));
       } else {
-        log.info("Requesting shard state be set to 'recovery'");
+        log.debug("Requesting shard state be set to 'recovery'");
         DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
         Map<String, Object> propMap = new HashMap<>();
         propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
@@ -443,6 +477,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
         inQueue.offer(Utils.toJSON(m));
       }
 
+      t = timings.sub("createCoresForReplicas");
       // now actually create replica cores on sub shard nodes
       for (Map<String, Object> replica : replicas) {
         ocmh.addReplica(clusterState, new ZkNodeProps(replica), results, null);
@@ -451,20 +486,28 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
       assert TestInjection.injectSplitFailureAfterReplicaCreation();
 
       ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard replicas", asyncId, requestMap);
+      t.stop();
 
       log.info("Successfully created all replica shards for all sub-slices " + subSlices);
 
+      t = timings.sub("finalCommit");
       ocmh.commit(results, slice.get(), parentShardLeader);
-
+      t.stop();
+      if (withTiming) {
+        results.add(CommonParams.TIMING, timings.asNamedList());
+      }
+      success = true;
       return true;
     } catch (SolrException e) {
-      cleanupAfterFailure(zkStateReader, collectionName, parentSlice.getName(), subSlices);
       throw e;
     } catch (Exception e) {
       log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e);
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, e);
     } finally {
       if (sessionWrapper != null) sessionWrapper.release();
+      if (!success) {
+        cleanupAfterFailure(zkStateReader, collectionName, parentSlice.getName(), subSlices, offlineSlices);
+      }
     }
   }
 
@@ -505,13 +548,14 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
     }
   }
 
-  private void cleanupAfterFailure(ZkStateReader zkStateReader, String collectionName, String parentShard, List<String> subSlices) {
-    log.debug("- cleanup after failed split of " + collectionName + "/" + parentShard);
+  private void cleanupAfterFailure(ZkStateReader zkStateReader, String collectionName, String parentShard,
+                                   List<String> subSlices, Set<String> offlineSlices) {
+    log.info("Cleaning up after a failed split of " + collectionName + "/" + parentShard);
     // get the latest state
     try {
       zkStateReader.forceUpdateCollection(collectionName);
     } catch (KeeperException | InterruptedException e) {
-      log.warn("Cleanup after failed split of " + collectionName + "/" + parentShard + ": (force update collection)", e);
+      log.warn("Cleanup failed after failed split of " + collectionName + "/" + parentShard + ": (force update collection)", e);
       return;
     }
     ClusterState clusterState = zkStateReader.getClusterState();
@@ -524,7 +568,8 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
     // set already created sub shards states to CONSTRUCTION - this prevents them
     // from entering into RECOVERY or ACTIVE (SOLR-9455)
     DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
-    Map<String, Object> propMap = new HashMap<>();
+    final Map<String, Object> propMap = new HashMap<>();
+    boolean sendUpdateState = false;
     propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
     propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
     for (Slice s : coll.getSlices()) {
@@ -532,20 +577,29 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
         continue;
       }
       propMap.put(s.getName(), Slice.State.CONSTRUCTION.toString());
+      sendUpdateState = true;
     }
 
     // if parent is inactive activate it again
     Slice parentSlice = coll.getSlice(parentShard);
     if (parentSlice.getState() == Slice.State.INACTIVE) {
+      sendUpdateState = true;
       propMap.put(parentShard, Slice.State.ACTIVE.toString());
     }
+    // plus any other previously deactivated slices
+    for (String sliceName : offlineSlices) {
+      propMap.put(sliceName, Slice.State.ACTIVE.toString());
+      sendUpdateState = true;
+    }
 
-    try {
-      ZkNodeProps m = new ZkNodeProps(propMap);
-      inQueue.offer(Utils.toJSON(m));
-    } catch (Exception e) {
-      // don't give up yet - just log the error, we may still be able to clean up
-      log.warn("Cleanup after failed split of " + collectionName + "/" + parentShard + ": (slice state changes)", e);
+    if (sendUpdateState) {
+      try {
+        ZkNodeProps m = new ZkNodeProps(propMap);
+        inQueue.offer(Utils.toJSON(m));
+      } catch (Exception e) {
+        // don't give up yet - just log the error, we may still be able to clean up
+        log.warn("Cleanup failed after failed split of " + collectionName + "/" + parentShard + ": (slice state changes)", e);
+      }
     }
 
     // delete existing subShards
@@ -554,16 +608,16 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
       if (s == null) {
         continue;
       }
-      log.info("Sub-shard: {} already exists therefore requesting its deletion", subSlice);
-      propMap = new HashMap<>();
-      propMap.put(Overseer.QUEUE_OPERATION, "deleteshard");
-      propMap.put(COLLECTION_PROP, collectionName);
-      propMap.put(SHARD_ID_PROP, subSlice);
-      ZkNodeProps m = new ZkNodeProps(propMap);
+      log.debug("- sub-shard: {} exists therefore requesting its deletion", subSlice);
+      HashMap<String, Object> props = new HashMap<>();
+      props.put(Overseer.QUEUE_OPERATION, "deleteshard");
+      props.put(COLLECTION_PROP, collectionName);
+      props.put(SHARD_ID_PROP, subSlice);
+      ZkNodeProps m = new ZkNodeProps(props);
       try {
         ocmh.commandMap.get(DELETESHARD).call(clusterState, m, new NamedList());
       } catch (Exception e) {
-        log.warn("Cleanup after failed split of " + collectionName + "/" + parentShard + ": (deleting existing sub shard " + subSlice + ")", e);
+        log.warn("Cleanup failed after failed split of " + collectionName + "/" + parentShard + ": (deleting existing sub shard " + subSlice + ")", e);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
index f897072..34843c1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
@@ -25,6 +25,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.solr.client.solrj.cloud.DistribStateManager;
@@ -52,12 +53,12 @@ import static org.apache.solr.common.params.CommonParams.NAME;
 public class ReplicaMutator {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  protected final SolrCloudManager dataProvider;
+  protected final SolrCloudManager cloudManager;
   protected final DistribStateManager stateManager;
 
-  public ReplicaMutator(SolrCloudManager dataProvider) {
-    this.dataProvider = dataProvider;
-    this.stateManager = dataProvider.getDistribStateManager();
+  public ReplicaMutator(SolrCloudManager cloudManager) {
+    this.cloudManager = cloudManager;
+    this.stateManager = cloudManager.getDistribStateManager();
   }
 
   protected Replica setProperty(Replica replica, String key, String value) {
@@ -96,11 +97,11 @@ public class ReplicaMutator {
   }
 
   public ZkWriteCommand addReplicaProperty(ClusterState clusterState, ZkNodeProps message) {
-    if (checkKeyExistence(message, ZkStateReader.COLLECTION_PROP) == false ||
-        checkKeyExistence(message, ZkStateReader.SHARD_ID_PROP) == false ||
-        checkKeyExistence(message, ZkStateReader.REPLICA_PROP) == false ||
-        checkKeyExistence(message, ZkStateReader.PROPERTY_PROP) == false ||
-        checkKeyExistence(message, ZkStateReader.PROPERTY_VALUE_PROP) == false) {
+    if (!checkKeyExistence(message, ZkStateReader.COLLECTION_PROP) ||
+        !checkKeyExistence(message, ZkStateReader.SHARD_ID_PROP) ||
+        !checkKeyExistence(message, ZkStateReader.REPLICA_PROP) ||
+        !checkKeyExistence(message, ZkStateReader.PROPERTY_PROP) ||
+        !checkKeyExistence(message, ZkStateReader.PROPERTY_VALUE_PROP)) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
           "Overseer ADDREPLICAPROP requires " +
               ZkStateReader.COLLECTION_PROP + " and " + ZkStateReader.SHARD_ID_PROP + " and " +
@@ -200,7 +201,7 @@ public class ReplicaMutator {
   }
 
   public ZkWriteCommand setState(ClusterState clusterState, ZkNodeProps message) {
-    if (Overseer.isLegacy(dataProvider.getClusterStateProvider())) {
+    if (Overseer.isLegacy(cloudManager.getClusterStateProvider())) {
       return updateState(clusterState, message);
     } else {
       return updateStateNew(clusterState, message);
@@ -224,7 +225,7 @@ public class ReplicaMutator {
       ClusterStateMutator.getShardNames(numShards, shardNames);
       Map<String, Object> createMsg = Utils.makeMap(NAME, cName);
       createMsg.putAll(message.getProperties());
-      writeCommand = new ClusterStateMutator(dataProvider).createCollection(prevState, new ZkNodeProps(createMsg));
+      writeCommand = new ClusterStateMutator(cloudManager).createCollection(prevState, new ZkNodeProps(createMsg));
       DocCollection collection = writeCommand.collection;
       newState = ClusterStateMutator.newState(prevState, cName, collection);
     }
@@ -451,30 +452,34 @@ public class ReplicaMutator {
               }
             }
 
+            Map<String, Object> propMap = new HashMap<>();
+            propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
+            propMap.put(ZkStateReader.COLLECTION_PROP, collection.getName());
             if (isLeaderSame) {
               log.info("Sub-shard leader node is still the same one at {} with ZK session id {}. Preparing to switch shard states.", shardParentNode, shardParentZkSession);
-              Map<String, Object> propMap = new HashMap<>();
-              propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
               propMap.put(parentSliceName, Slice.State.INACTIVE.toString());
               propMap.put(sliceName, Slice.State.ACTIVE.toString());
+              long now = cloudManager.getTimeSource().getEpochTimeNs();
               for (Slice subShardSlice : subShardSlices) {
                 propMap.put(subShardSlice.getName(), Slice.State.ACTIVE.toString());
+                String lastTimeStr = subShardSlice.getStr(ZkStateReader.STATE_TIMESTAMP_PROP);
+                if (lastTimeStr != null) {
+                  long start = Long.parseLong(lastTimeStr);
+                  log.info("TIMINGS: Sub-shard " + subShardSlice.getName() + " recovered in " +
+                      TimeUnit.MILLISECONDS.convert(now - start, TimeUnit.NANOSECONDS) + " ms");
+                } else {
+                  log.info("TIMINGS Sub-shard " + subShardSlice.getName() + " not available: " + subShardSlice);
+                }
               }
-              propMap.put(ZkStateReader.COLLECTION_PROP, collection.getName());
-              ZkNodeProps m = new ZkNodeProps(propMap);
-              return new SliceMutator(dataProvider).updateShardState(prevState, m).collection;
             } else  {
               // we must mark the shard split as failed by switching sub-shards to recovery_failed state
-              Map<String, Object> propMap = new HashMap<>();
-              propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
               propMap.put(sliceName, Slice.State.RECOVERY_FAILED.toString());
               for (Slice subShardSlice : subShardSlices) {
                 propMap.put(subShardSlice.getName(), Slice.State.RECOVERY_FAILED.toString());
               }
-              propMap.put(ZkStateReader.COLLECTION_PROP, collection.getName());
-              ZkNodeProps m = new ZkNodeProps(propMap);
-              return new SliceMutator(dataProvider).updateShardState(prevState, m).collection;
             }
+            ZkNodeProps m = new ZkNodeProps(propMap);
+            return new SliceMutator(cloudManager).updateShardState(prevState, m).collection;
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 8d7cdbf..3a46b2b 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -144,8 +144,10 @@ import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTIO
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
 import static org.apache.solr.common.params.CommonAdminParams.IN_PLACE_MOVE;
+import static org.apache.solr.common.params.CommonAdminParams.SPLIT_METHOD;
 import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
 import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.params.CommonParams.TIMING;
 import static org.apache.solr.common.params.CommonParams.VALUE_LONG;
 import static org.apache.solr.common.params.CoreAdminParams.DATA_DIR;
 import static org.apache.solr.common.params.CoreAdminParams.DELETE_DATA_DIR;
@@ -662,7 +664,9 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           SHARD_ID_PROP,
           "split.key",
           CoreAdminParams.RANGES,
-          WAIT_FOR_FINAL_STATE);
+          WAIT_FOR_FINAL_STATE,
+          TIMING,
+          SPLIT_METHOD);
       return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX);
     }),
     DELETESHARD_OP(DELETESHARD, (req, rsp, h) -> {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java b/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
index 9dda6d4..31382c3 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
@@ -30,11 +30,13 @@ import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.params.CommonAdminParams;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.request.LocalSolrQueryRequest;
 import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.update.SolrIndexSplitter;
 import org.apache.solr.update.SplitIndexCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,9 +80,14 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
     }
 
     log.info("Invoked split action for core: " + cname);
-    SolrCore core = it.handler.coreContainer.getCore(cname);
-    SolrQueryRequest req = new LocalSolrQueryRequest(core, params);
+    String methodStr = params.get(CommonAdminParams.SPLIT_METHOD, SolrIndexSplitter.SplitMethod.REWRITE.toLower());
+    SolrIndexSplitter.SplitMethod splitMethod = SolrIndexSplitter.SplitMethod.get(methodStr);
+    if (splitMethod == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unsupported value of '" + CommonAdminParams.SPLIT_METHOD + "': " + methodStr);
+    }
+    SolrCore parentCore = it.handler.coreContainer.getCore(cname);
     List<SolrCore> newCores = null;
+    SolrQueryRequest req = null;
 
     try {
       // TODO: allow use of rangesStr in the future
@@ -91,9 +98,9 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
       String routeFieldName = null;
       if (it.handler.coreContainer.isZooKeeperAware()) {
         ClusterState clusterState = it.handler.coreContainer.getZkController().getClusterState();
-        String collectionName = req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName();
+        String collectionName = parentCore.getCoreDescriptor().getCloudDescriptor().getCollectionName();
         DocCollection collection = clusterState.getCollection(collectionName);
-        String sliceName = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
+        String sliceName = parentCore.getCoreDescriptor().getCloudDescriptor().getShardId();
         Slice slice = collection.getSlice(sliceName);
         router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
         if (ranges == null) {
@@ -101,7 +108,7 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
           ranges = currentRange != null ? router.partitionRange(partitions, currentRange) : null;
         }
         Object routerObj = collection.get(DOC_ROUTER); // for back-compat with Solr 4.4
-        if (routerObj != null && routerObj instanceof Map) {
+        if (routerObj instanceof Map) {
           Map routerProps = (Map) routerObj;
           routeFieldName = (String) routerProps.get("field");
         }
@@ -131,9 +138,10 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
         paths = Arrays.asList(pathsArr);
       }
 
+      req = new LocalSolrQueryRequest(parentCore, params);
 
-      SplitIndexCommand cmd = new SplitIndexCommand(req, paths, newCores, ranges, router, routeFieldName, splitKey);
-      core.getUpdateHandler().split(cmd);
+      SplitIndexCommand cmd = new SplitIndexCommand(req, it.rsp, paths, newCores, ranges, router, routeFieldName, splitKey, splitMethod);
+      parentCore.getUpdateHandler().split(cmd);
 
       if (it.handler.coreContainer.isZooKeeperAware()) {
         for (SolrCore newcore : newCores) {
@@ -150,7 +158,7 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
       throw e;
     } finally {
       if (req != null) req.close();
-      if (core != null) core.close();
+      if (parentCore != null) parentCore.close();
       if (newCores != null) {
         for (SolrCore newCore : newCores) {
           newCore.close();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index bcc97eb..e64ee8a 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -47,6 +47,7 @@ import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.SolrConfig.UpdateHandlerInfo;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.metrics.SolrMetricManager;
@@ -902,8 +903,10 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
     commit(new CommitUpdateCommand(cmd.req, false));
     SolrIndexSplitter splitter = new SolrIndexSplitter(cmd);
     splitCommands.mark();
+    NamedList<Object> results = new NamedList<>();
     try {
-      splitter.split();
+      splitter.split(results);
+      cmd.rsp.addResponse(results);
     } catch (IOException e) {
       numErrors.increment();
       numErrorsCumulative.mark();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java b/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
index aadbe74..75234fa 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
@@ -18,31 +18,59 @@ package org.apache.solr.update;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
 
 import org.apache.lucene.index.CodecReader;
 import org.apache.lucene.index.FilterCodecReader;
+import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.LeafReader;
 import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.NoMergePolicy;
 import org.apache.lucene.index.PostingsEnum;
 import org.apache.lucene.index.SlowCodecReaderWrapper;
 import org.apache.lucene.index.Terms;
 import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.search.ConstantScoreScorer;
+import org.apache.lucene.search.ConstantScoreWeight;
 import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreMode;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.Weight;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.HardlinkCopyDirectoryWrapper;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.util.BitSetIterator;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.CharsRefBuilder;
 import org.apache.lucene.util.FixedBitSet;
 import org.apache.lucene.util.IOUtils;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.CompositeIdRouter;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.HashBasedRouter;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.DirectoryFactory;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.IndexFetcher;
+import org.apache.solr.handler.SnapShooter;
 import org.apache.solr.schema.SchemaField;
 import org.apache.solr.search.BitsFilteredPostingsEnum;
 import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.util.RTimerTree;
 import org.apache.solr.util.RefCounted;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,26 +78,47 @@ import org.slf4j.LoggerFactory;
 public class SolrIndexSplitter {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  private static final String INDEX_PREFIX = "index.";
+
+  public enum SplitMethod {
+    REWRITE,
+    LINK;
+
+    public static SplitMethod get(String p) {
+      if (p != null) {
+        try {
+          return SplitMethod.valueOf(p.toUpperCase(Locale.ROOT));
+        } catch (Exception ex) {
+          return null;
+        }
+      }
+      return null;
+    }
+
+    public String toLower() {
+      return toString().toLowerCase(Locale.ROOT);
+    }
+  }
+
   SolrIndexSearcher searcher;
   SchemaField field;
   List<DocRouter.Range> ranges;
   DocRouter.Range[] rangesArr; // same as ranges list, but an array for extra speed in inner loops
   List<String> paths;
   List<SolrCore> cores;
-  DocRouter router;
   HashBasedRouter hashRouter;
   int numPieces;
-  int currPartition = 0;
   String routeFieldName;
   String splitKey;
+  SplitMethod splitMethod;
+  RTimerTree timings = new RTimerTree();
 
   public SolrIndexSplitter(SplitIndexCommand cmd) {
     searcher = cmd.getReq().getSearcher();
     ranges = cmd.ranges;
     paths = cmd.paths;
     cores = cmd.cores;
-    router = cmd.router;
-    hashRouter = router instanceof HashBasedRouter ? (HashBasedRouter)router : null;
+    hashRouter = cmd.router instanceof HashBasedRouter ? (HashBasedRouter)cmd.router : null;
 
     if (ranges == null) {
       numPieces =  paths != null ? paths.size() : cores.size();
@@ -86,83 +135,413 @@ public class SolrIndexSplitter {
     if (cmd.splitKey != null) {
       splitKey = getRouteKey(cmd.splitKey);
     }
+    if (cores == null) {
+      this.splitMethod = SplitMethod.REWRITE;
+    } else {
+      this.splitMethod = cmd.splitMethod;
+    }
   }
 
-  public void split() throws IOException {
+  public void split(NamedList<Object> results) throws IOException {
+    SolrCore parentCore = searcher.getCore();
+    Directory parentDirectory = searcher.getRawReader().directory();
+    Lock parentDirectoryLock = null;
+    UpdateLog ulog = parentCore.getUpdateHandler().getUpdateLog();
+    if (ulog == null && splitMethod == SplitMethod.LINK) {
+      log.warn("No updateLog in parent core, switching to use potentially slower 'splitMethod=rewrite'");
+      splitMethod = SplitMethod.REWRITE;
+    }
+    if (splitMethod == SplitMethod.LINK) {
+      RTimerTree t = timings.sub("closeParentIW");
+      try {
+        // start buffering updates
+        ulog.bufferUpdates();
+        parentCore.getSolrCoreState().closeIndexWriter(parentCore, false);
+        // make sure we can lock the directory for our exclusive use
+        parentDirectoryLock = parentDirectory.obtainLock(IndexWriter.WRITE_LOCK_NAME);
+        log.info("Splitting in 'link' mode: closed parent IndexWriter...");
+        t.stop();
+      } catch (Exception e) {
+        if (parentDirectoryLock != null) {
+          IOUtils.closeWhileHandlingException(parentDirectoryLock);
+        }
+        try {
+          parentCore.getSolrCoreState().openIndexWriter(parentCore);
+          ulog.applyBufferedUpdates();
+        } catch (Exception e1) {
+          log.error("Error reopening IndexWriter after failed close", e1);
+          log.error("Original error closing IndexWriter:", e);
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error reopening IndexWriter after failed close", e1);
+        }
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error closing current IndexWriter, aborting offline split...", e);
+      }
+    }
+    boolean success = false;
+    try {
+      RTimerTree t = timings.sub("doSplit");
+      doSplit();
+      t.stop();
+      success = true;
+    } catch (Exception e) {
+      results.add("failed", e.toString());
+      throw e;
+    } finally {
+      if (splitMethod == SplitMethod.LINK) {
+        IOUtils.closeWhileHandlingException(parentDirectoryLock);
+        RTimerTree t = timings.sub("reopenParentIW");
+        parentCore.getSolrCoreState().openIndexWriter(parentCore);
+        t.stop();
+        t = timings.sub("parentApplyBufferedUpdates");
+        ulog.applyBufferedUpdates();
+        t.stop();
+        log.info("Splitting in 'offline' mode " + (success? "finished" : "FAILED") +
+            ": re-opened parent IndexWriter.");
+      }
+    }
+    results.add(CommonParams.TIMING, timings.asNamedList());
+  }
+
+  public void doSplit() throws IOException {
 
     List<LeafReaderContext> leaves = searcher.getRawReader().leaves();
+    Directory parentDirectory = searcher.getRawReader().directory();
     List<FixedBitSet[]> segmentDocSets = new ArrayList<>(leaves.size());
-
-    log.info("SolrIndexSplitter: partitions=" + numPieces + " segments="+leaves.size());
-
-    for (LeafReaderContext readerContext : leaves) {
-      assert readerContext.ordInParent == segmentDocSets.size();  // make sure we're going in order
-      FixedBitSet[] docSets = split(readerContext);
-      segmentDocSets.add( docSets );
+    SolrIndexConfig parentConfig = searcher.getCore().getSolrConfig().indexConfig;
+    String timestamp = new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date());
+
+    log.info("SolrIndexSplitter: partitions=" + numPieces + " segments=" + leaves.size());
+    RTimerTree t;
+
+    if (splitMethod != SplitMethod.LINK) {
+      t = timings.sub("findDocSetsPerLeaf");
+      for (LeafReaderContext readerContext : leaves) {
+        assert readerContext.ordInParent == segmentDocSets.size();  // make sure we're going in order
+        FixedBitSet[] docSets = split(readerContext, numPieces, field, rangesArr, splitKey, hashRouter, false);
+        segmentDocSets.add(docSets);
+      }
+      t.stop();
     }
 
 
+    Map<IndexReader.CacheKey, FixedBitSet[]> docsToDeleteCache = new ConcurrentHashMap<>();
+
     // would it be more efficient to write segment-at-a-time to each new index?
     // - need to worry about number of open descriptors
     // - need to worry about if IW.addIndexes does a sync or not...
     // - would be more efficient on the read side, but prob less efficient merging
-
     for (int partitionNumber=0; partitionNumber<numPieces; partitionNumber++) {
-      log.info("SolrIndexSplitter: partition #" + partitionNumber + " partitionCount=" + numPieces + (ranges != null ? " range=" + ranges.get(partitionNumber) : ""));
+      String partitionName = "SolrIndexSplitter:partition=" + partitionNumber + ",partitionCount=" + numPieces + (ranges != null ? ",range=" + ranges.get(partitionNumber) : "");
+      log.info(partitionName);
 
       boolean success = false;
 
       RefCounted<IndexWriter> iwRef = null;
-      IndexWriter iw = null;
-      if (cores != null) {
+      IndexWriter iw;
+      if (cores != null && splitMethod != SplitMethod.LINK) {
         SolrCore subCore = cores.get(partitionNumber);
         iwRef = subCore.getUpdateHandler().getSolrCoreState().getIndexWriter(subCore);
         iw = iwRef.get();
       } else {
-        SolrCore core = searcher.getCore();
-        String path = paths.get(partitionNumber);
-        iw = SolrIndexWriter.create(core, "SplittingIndexWriter"+partitionNumber + (ranges != null ? " " + ranges.get(partitionNumber) : ""), path,
-                                    core.getDirectoryFactory(), true, core.getLatestSchema(),
-                                    core.getSolrConfig().indexConfig, core.getDeletionPolicy(), core.getCodec());
+        if (splitMethod == SplitMethod.LINK) {
+          SolrCore subCore = cores.get(partitionNumber);
+          String path = subCore.getDataDir() + INDEX_PREFIX + timestamp;
+          t = timings.sub("hardLinkCopy");
+          t.resume();
+          // copy by hard-linking
+          Directory splitDir = subCore.getDirectoryFactory().get(path, DirectoryFactory.DirContext.DEFAULT, subCore.getSolrConfig().indexConfig.lockType);
+          // the wrapper doesn't hold any resources itself so it doesn't need closing
+          HardlinkCopyDirectoryWrapper hardLinkedDir = new HardlinkCopyDirectoryWrapper(splitDir);
+          boolean copiedOk = false;
+          try {
+            for (String file : parentDirectory.listAll()) {
+              // we've closed the IndexWriter, so ignore write.lock
+              // its file may be present even when IndexWriter is closed but
+              // we've already checked that the lock is not held by anyone else
+              if (file.equals(IndexWriter.WRITE_LOCK_NAME)) {
+                continue;
+              }
+              hardLinkedDir.copyFrom(parentDirectory, file, file, IOContext.DEFAULT);
+            }
+            copiedOk = true;
+          } finally {
+            if (!copiedOk) {
+              subCore.getDirectoryFactory().doneWithDirectory(splitDir);
+              subCore.getDirectoryFactory().remove(splitDir);
+            }
+          }
+          t.pause();
+          IndexWriterConfig iwConfig = parentConfig.toIndexWriterConfig(subCore);
+          // don't run merges at this time
+          iwConfig.setMergePolicy(NoMergePolicy.INSTANCE);
+          t = timings.sub("createSubIW");
+          t.resume();
+          iw = new SolrIndexWriter(partitionName, splitDir, iwConfig);
+          t.pause();
+        } else {
+          SolrCore core = searcher.getCore();
+          String path = paths.get(partitionNumber);
+          t = timings.sub("createSubIW");
+          t.resume();
+          iw = SolrIndexWriter.create(core, partitionName, path,
+              core.getDirectoryFactory(), true, core.getLatestSchema(),
+              core.getSolrConfig().indexConfig, core.getDeletionPolicy(), core.getCodec());
+          t.pause();
+        }
       }
 
       try {
-        // This removes deletions but optimize might still be needed because sub-shards will have the same number of segments as the parent shard.
-        for (int segmentNumber = 0; segmentNumber<leaves.size(); segmentNumber++) {
-          log.info("SolrIndexSplitter: partition #" + partitionNumber + " partitionCount=" + numPieces + (ranges != null ? " range=" + ranges.get(partitionNumber) : "") + " segment #"+segmentNumber + " segmentCount=" + leaves.size());
-          CodecReader subReader = SlowCodecReaderWrapper.wrap(leaves.get(segmentNumber).reader());
-          iw.addIndexes(new LiveDocsReader(subReader, segmentDocSets.get(segmentNumber)[partitionNumber]));
+        if (splitMethod == SplitMethod.LINK) {
+          t = timings.sub("deleteDocuments");
+          t.resume();
+          // apply deletions specific to this partition. As a side-effect on the first call this also populates
+          // a cache of docsets to delete per leaf reader per partition, which is reused for subsequent partitions.
+          iw.deleteDocuments(new SplittingQuery(partitionNumber, field, rangesArr, hashRouter, splitKey, docsToDeleteCache));
+          t.pause();
+        } else {
+          // This removes deletions but optimize might still be needed because sub-shards will have the same number of segments as the parent shard.
+          t = timings.sub("addIndexes");
+          t.resume();
+          for (int segmentNumber = 0; segmentNumber<leaves.size(); segmentNumber++) {
+            log.info("SolrIndexSplitter: partition #" + partitionNumber + " partitionCount=" + numPieces + (ranges != null ? " range=" + ranges.get(partitionNumber) : "") + " segment #"+segmentNumber + " segmentCount=" + leaves.size());
+            CodecReader subReader = SlowCodecReaderWrapper.wrap(leaves.get(segmentNumber).reader());
+            iw.addIndexes(new LiveDocsReader(subReader, segmentDocSets.get(segmentNumber)[partitionNumber]));
+          }
+          t.pause();
         }
         // we commit explicitly instead of sending a CommitUpdateCommand through the processor chain
         // because the sub-shard cores will just ignore such a commit because the update log is not
         // in active state at this time.
         //TODO no commitUpdateCommand
         SolrIndexWriter.setCommitData(iw, -1);
+        t = timings.sub("subIWCommit");
+        t.resume();
         iw.commit();
+        t.pause();
         success = true;
       } finally {
         if (iwRef != null) {
           iwRef.decref();
         } else {
           if (success) {
+            t = timings.sub("subIWClose");
+            t.resume();
             iw.close();
+            t.pause();
           } else {
             IOUtils.closeWhileHandlingException(iw);
           }
+          if (splitMethod == SplitMethod.LINK) {
+            SolrCore subCore = cores.get(partitionNumber);
+            subCore.getDirectoryFactory().release(iw.getDirectory());
+          }
+        }
+      }
+    }
+    // all sub-indexes created ok
+    // when using hard-linking switch directories & refresh cores
+    if (splitMethod == SplitMethod.LINK && cores != null) {
+      boolean switchOk = true;
+      t = timings.sub("switchSubIndexes");
+      for (int partitionNumber = 0; partitionNumber < numPieces; partitionNumber++) {
+        SolrCore subCore = cores.get(partitionNumber);
+        String indexDirPath = subCore.getIndexDir();
+
+        log.debug("Switching directories");
+        String hardLinkPath = subCore.getDataDir() + INDEX_PREFIX + timestamp;
+        subCore.modifyIndexProps(INDEX_PREFIX + timestamp);
+        try {
+          subCore.getUpdateHandler().newIndexWriter(false);
+          openNewSearcher(subCore);
+        } catch (Exception e) {
+          log.error("Failed to switch sub-core " + indexDirPath + " to " + hardLinkPath + ", split will fail.", e);
+          switchOk = false;
+          break;
         }
       }
+      t.stop();
+      if (!switchOk) {
+        t = timings.sub("rollbackSubIndexes");
+        // rollback the switch
+        for (int partitionNumber = 0; partitionNumber < numPieces; partitionNumber++) {
+          SolrCore subCore = cores.get(partitionNumber);
+          Directory dir = null;
+          try {
+            dir = subCore.getDirectoryFactory().get(subCore.getDataDir(), DirectoryFactory.DirContext.META_DATA,
+                subCore.getSolrConfig().indexConfig.lockType);
+            dir.deleteFile(IndexFetcher.INDEX_PROPERTIES);
+          } finally {
+            if (dir != null) {
+              subCore.getDirectoryFactory().release(dir);
+            }
+          }
+          // switch back if necessary and remove the hardlinked dir
+          String hardLinkPath = subCore.getDataDir() + INDEX_PREFIX + timestamp;
+          try {
+            dir = subCore.getDirectoryFactory().get(hardLinkPath, DirectoryFactory.DirContext.DEFAULT,
+                subCore.getSolrConfig().indexConfig.lockType);
+            subCore.getDirectoryFactory().doneWithDirectory(dir);
+            subCore.getDirectoryFactory().remove(dir);
+          } finally {
+            if (dir != null) {
+              subCore.getDirectoryFactory().release(dir);
+            }
+          }
+          subCore.getUpdateHandler().newIndexWriter(false);
+          try {
+            openNewSearcher(subCore);
+          } catch (Exception e) {
+            log.warn("Error rolling back failed split of " + hardLinkPath, e);
+          }
+        }
+        t.stop();
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "There were errors during index split");
+      } else {
+        // complete the switch - remove original index
+        t = timings.sub("cleanSubIndex");
+        for (int partitionNumber = 0; partitionNumber < numPieces; partitionNumber++) {
+          SolrCore subCore = cores.get(partitionNumber);
+          String oldIndexPath = subCore.getDataDir() + "index";
+          Directory indexDir = null;
+          try {
+            indexDir = subCore.getDirectoryFactory().get(oldIndexPath,
+                DirectoryFactory.DirContext.DEFAULT, subCore.getSolrConfig().indexConfig.lockType);
+            subCore.getDirectoryFactory().doneWithDirectory(indexDir);
+            subCore.getDirectoryFactory().remove(indexDir);
+          } finally {
+            if (indexDir != null) {
+              subCore.getDirectoryFactory().release(indexDir);
+            }
+          }
+        }
+        t.stop();
+      }
+    }
+  }
+
+  private void openNewSearcher(SolrCore core) throws Exception {
+    Future[] waitSearcher = new Future[1];
+    core.getSearcher(true, false, waitSearcher, true);
+    if (waitSearcher[0] != null) {
+      waitSearcher[0].get();
+    }
+  }
+
+  private class SplittingQuery extends Query {
+    private final int partition;
+    private final SchemaField field;
+    private final DocRouter.Range[] rangesArr;
+    private final HashBasedRouter hashRouter;
+    private final String splitKey;
+    private final Map<IndexReader.CacheKey, FixedBitSet[]> docsToDelete;
+
+    SplittingQuery(int partition, SchemaField field, DocRouter.Range[] rangesArr, HashBasedRouter hashRouter, String splitKey,
+                   Map<IndexReader.CacheKey, FixedBitSet[]> docsToDelete) {
+      this.partition = partition;
+      this.field = field;
+      this.rangesArr = rangesArr;
+      this.hashRouter = hashRouter;
+      this.splitKey = splitKey;
+      this.docsToDelete = docsToDelete;
+    }
+
+    @Override
+    public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException {
+      return new ConstantScoreWeight(this, boost) {
+
+        @Override
+        public Scorer scorer(LeafReaderContext context) throws IOException {
+          RTimerTree t = timings.sub("findDocsToDelete");
+          t.resume();
+          FixedBitSet set = findDocsToDelete(context);
+          t.pause();
+          log.info("### partition=" + partition + ", leaf=" + context + ", maxDoc=" + context.reader().maxDoc() +
+          ", numDels=" + context.reader().numDeletedDocs() + ", setLen=" + set.length() + ", setCard=" + set.cardinality());
+          Bits liveDocs = context.reader().getLiveDocs();
+          if (liveDocs != null) {
+            // check that we don't delete already deleted docs
+            FixedBitSet dels = FixedBitSet.copyOf(liveDocs);
+            dels.flip(0, dels.length());
+            dels.and(set);
+            if (dels.cardinality() > 0) {
+              log.error("### INVALID DELS " + dels.cardinality());
+            }
+          }
+          return new ConstantScoreScorer(this, score(), new BitSetIterator(set, set.length()));
+        }
 
+        @Override
+        public boolean isCacheable(LeafReaderContext ctx) {
+          return false;
+        }
+
+        @Override
+        public String toString() {
+          return "weight(shardSplittingQuery,part" + partition + ")";
+        }
+      };
     }
 
+    private FixedBitSet findDocsToDelete(LeafReaderContext readerContext) throws IOException {
+      // check whether a cached copy of bitsets already exists for this reader
+      FixedBitSet[] perPartition = docsToDelete.get(readerContext.reader().getCoreCacheHelper().getKey());
+      if (perPartition != null) {
+        return perPartition[partition];
+      }
+      synchronized (docsToDelete) {
+        perPartition = docsToDelete.get(readerContext.reader().getCoreCacheHelper().getKey());
+        if (perPartition != null) {
+          return perPartition[partition];
+        }
+
+        perPartition = split(readerContext, numPieces, field, rangesArr, splitKey, hashRouter, true);
+        docsToDelete.put(readerContext.reader().getCoreCacheHelper().getKey(), perPartition);
+        return perPartition[partition];
+      }
+    }
+
+    @Override
+    public String toString(String field) {
+      return "shardSplittingQuery";
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null) {
+        return false;
+      }
+      if (this == obj) {
+        return true;
+      }
+      if (!(obj instanceof SplittingQuery)) {
+        return false;
+      }
+      SplittingQuery q = (SplittingQuery)obj;
+      return partition == q.partition;
+    }
+
+    @Override
+    public int hashCode() {
+      return partition;
+    }
   }
 
-  FixedBitSet[] split(LeafReaderContext readerContext) throws IOException {
+  static FixedBitSet[] split(LeafReaderContext readerContext, int numPieces, SchemaField field, DocRouter.Range[] rangesArr,
+                             String splitKey, HashBasedRouter hashRouter, boolean delete) throws IOException {
     LeafReader reader = readerContext.reader();
     FixedBitSet[] docSets = new FixedBitSet[numPieces];
     for (int i=0; i<docSets.length; i++) {
       docSets[i] = new FixedBitSet(reader.maxDoc());
+      if (delete) {
+        docSets[i].set(0, reader.maxDoc());
+      }
     }
     Bits liveDocs = reader.getLiveDocs();
+    if (liveDocs != null && delete) {
+      FixedBitSet liveDocsSet = FixedBitSet.copyOf(liveDocs);
+      for (FixedBitSet set : docSets) {
+        set.and(liveDocsSet);
+      }
+    }
 
     Terms terms = reader.terms(field.getName());
     TermsEnum termsEnum = terms==null ? null : terms.iterator();
@@ -172,11 +551,12 @@ public class SolrIndexSplitter {
     PostingsEnum postingsEnum = null;
 
     int[] docsMatchingRanges = null;
-    if (ranges != null) {
+    if (rangesArr != null) {
       // +1 because documents can belong to *zero*, one, several or all ranges in rangesArr
       docsMatchingRanges = new int[rangesArr.length+1];
     }
 
+    int partition = 0;
     CharsRefBuilder idRef = new CharsRefBuilder();
     for (;;) {
       term = termsEnum.next();
@@ -209,14 +589,22 @@ public class SolrIndexSplitter {
       for (;;) {
         int doc = postingsEnum.nextDoc();
         if (doc == DocIdSetIterator.NO_MORE_DOCS) break;
-        if (ranges == null) {
-          docSets[currPartition].set(doc);
-          currPartition = (currPartition + 1) % numPieces;
+        if (rangesArr == null) {
+          if (delete) {
+            docSets[partition].clear(doc);
+          } else {
+            docSets[partition].set(doc);
+          }
+          partition = (partition + 1) % numPieces;
         } else  {
           int matchingRangesCount = 0;
           for (int i=0; i<rangesArr.length; i++) {      // inner-loop: use array here for extra speed.
             if (rangesArr[i].includes(hash)) {
-              docSets[i].set(doc);
+              if (delete) {
+                docSets[i].clear(doc);
+              } else {
+                docSets[i].set(doc);
+              }
               ++matchingRangesCount;
             }
           }
@@ -256,12 +644,10 @@ public class SolrIndexSplitter {
     if (idx <= 0) return null;
     String part1 = idString.substring(0, idx);
     int commaIdx = part1.indexOf(CompositeIdRouter.bitsSeparator);
-    if (commaIdx > 0) {
-      if (commaIdx + 1 < part1.length())  {
-        char ch = part1.charAt(commaIdx + 1);
-        if (ch >= '0' && ch <= '9') {
-          part1 = part1.substring(0, commaIdx);
-        }
+    if (commaIdx > 0 && commaIdx + 1 < part1.length())  {
+      char ch = part1.charAt(commaIdx + 1);
+      if (ch >= '0' && ch <= '9') {
+        part1 = part1.substring(0, commaIdx);
       }
     }
     return part1;
@@ -273,7 +659,7 @@ public class SolrIndexSplitter {
     final FixedBitSet liveDocs;
     final int numDocs;
 
-    public LiveDocsReader(CodecReader in, FixedBitSet liveDocs) throws IOException {
+    public LiveDocsReader(CodecReader in, FixedBitSet liveDocs) {
       super(in);
       this.liveDocs = liveDocs;
       this.numDocs = liveDocs.cardinality();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java b/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java
index eaa1e59..7ea8a2a 100644
--- a/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java
+++ b/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java
@@ -19,6 +19,7 @@ package org.apache.solr.update;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
 
 import java.util.List;
 
@@ -29,22 +30,26 @@ import java.util.List;
  *
  */
 public class SplitIndexCommand extends UpdateCommand {
-  // public List<Directory> dirs;
-  public List<String> paths;
-  public List<SolrCore> cores;  // either paths or cores should be specified
-  public List<DocRouter.Range> ranges;
-  public DocRouter router;
-  public String routeFieldName;
-  public String splitKey;
+  public final SolrQueryResponse rsp;
+  public final List<String> paths;
+  public final List<SolrCore> cores;  // either paths or cores should be specified
+  public final List<DocRouter.Range> ranges;
+  public final DocRouter router;
+  public final String routeFieldName;
+  public final String splitKey;
+  public final SolrIndexSplitter.SplitMethod splitMethod;
 
-  public SplitIndexCommand(SolrQueryRequest req, List<String> paths, List<SolrCore> cores, List<DocRouter.Range> ranges, DocRouter router, String routeFieldName, String splitKey) {
+  public SplitIndexCommand(SolrQueryRequest req, SolrQueryResponse rsp, List<String> paths, List<SolrCore> cores, List<DocRouter.Range> ranges,
+                           DocRouter router, String routeFieldName, String splitKey, SolrIndexSplitter.SplitMethod splitMethod) {
     super(req);
+    this.rsp = rsp;
     this.paths = paths;
     this.cores = cores;
     this.ranges = ranges;
     this.router = router;
     this.routeFieldName = routeFieldName;
     this.splitKey = splitKey;
+    this.splitMethod = splitMethod;
   }
 
   @Override
@@ -65,6 +70,7 @@ public class SplitIndexCommand extends UpdateCommand {
     if (splitKey != null) {
       sb.append(",split.key=" + splitKey);
     }
+    sb.append(",method=" + splitMethod.toLower());
     sb.append('}');
     return sb.toString();
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
index 22862b4..dd5f9f3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
@@ -134,7 +134,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
       killerThread.start();
       killCounter.incrementAndGet();
 
-      splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, null, null);
+      splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, null, null, false);
 
       log.info("Layout after split: \n");
       printLayout();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
index 92fd4d5..f6ee7b4 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
@@ -62,7 +62,10 @@ import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.Utils;
+import org.apache.solr.update.SolrIndexSplitter;
+import org.apache.solr.util.LogLevel;
 import org.apache.solr.util.TestInjection;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -72,6 +75,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
 
+@LogLevel("org.apache.solr.cloud.api.collections=DEBUG")
 @Slow
 public class ShardSplitTest extends BasicDistributedZkTest {
 
@@ -116,14 +120,24 @@ public class ShardSplitTest extends BasicDistributedZkTest {
   Creates a collection with replicationFactor=1, splits a shard. Restarts the sub-shard leader node.
   Add a replica. Ensure count matches in leader and replica.
    */
+  @Test
   public void testSplitStaticIndexReplication() throws Exception {
+    doSplitStaticIndexReplication(SolrIndexSplitter.SplitMethod.REWRITE);
+  }
+
+  @Test
+  public void testSplitStaticIndexReplicationLink() throws Exception {
+    doSplitStaticIndexReplication(SolrIndexSplitter.SplitMethod.LINK);
+  }
+
+  private void doSplitStaticIndexReplication(SolrIndexSplitter.SplitMethod splitMethod) throws Exception {
     waitForThingsToLevelOut(15);
 
     DocCollection defCol = cloudClient.getZkStateReader().getClusterState().getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
     Replica replica = defCol.getReplicas().get(0);
     String nodeName = replica.getNodeName();
 
-    String collectionName = "testSplitStaticIndexReplication";
+    String collectionName = "testSplitStaticIndexReplication_" + splitMethod.toLower();
     CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 1);
     create.setMaxShardsPerNode(5); // some high number so we can create replicas without hindrance
     create.setCreateNodeSet(nodeName); // we want to create the leader on a fixed node so that we know which one to restart later
@@ -141,6 +155,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
 
         CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName);
         splitShard.setShardName(SHARD1);
+        splitShard.setSplitMethod(splitMethod.toLower());
         String asyncId = splitShard.processAsync(client);
         RequestStatusState state = CollectionAdminRequest.requestStatus(asyncId).waitFor(client, 120);
         if (state == RequestStatusState.COMPLETED)  {
@@ -351,16 +366,31 @@ public class ShardSplitTest extends BasicDistributedZkTest {
 
   @Test
   public void testSplitMixedReplicaTypes() throws Exception {
+    doSplitMixedReplicaTypes(SolrIndexSplitter.SplitMethod.REWRITE);
+  }
+
+  @Test
+  public void testSplitMixedReplicaTypesLink() throws Exception {
+    doSplitMixedReplicaTypes(SolrIndexSplitter.SplitMethod.LINK);
+  }
+
+  private void doSplitMixedReplicaTypes(SolrIndexSplitter.SplitMethod splitMethod) throws Exception {
     waitForThingsToLevelOut(15);
-    String collectionName = "testSplitMixedReplicaTypes";
+    String collectionName = "testSplitMixedReplicaTypes_" + splitMethod.toLower();
     CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 2, 2, 2);
     create.setMaxShardsPerNode(5); // some high number so we can create replicas without hindrance
     create.process(cloudClient);
     waitForRecoveriesToFinish(collectionName, false);
 
+    for (int i = 0; i < 100; i++) {
+      cloudClient.add(collectionName, getDoc("id", "id-" + i, "foo_s", "bar " + i));
+    }
+    cloudClient.commit(collectionName);
+
     CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName);
     splitShard.setShardName(SHARD1);
-    splitShard.process(cloudClient);
+    splitShard.setSplitMethod(splitMethod.toLower());
+    CollectionAdminResponse rsp = splitShard.process(cloudClient);
     waitForThingsToLevelOut(15);
 
     cloudClient.getZkStateReader().forceUpdateCollection(collectionName);
@@ -393,7 +423,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     assertEquals("actual PULL", numPull, actualPull.get());
   }
 
-    @Test
+  @Test
   @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
   public void testSplitWithChaosMonkey() throws Exception {
     waitForThingsToLevelOut(15);
@@ -600,6 +630,15 @@ public class ShardSplitTest extends BasicDistributedZkTest {
 
   @Test
   public void testSplitShardWithRule() throws Exception {
+    doSplitShardWithRule(SolrIndexSplitter.SplitMethod.LINK);
+  }
+
+  @Test
+  public void testSplitShardWithRuleLink() throws Exception {
+    doSplitShardWithRule(SolrIndexSplitter.SplitMethod.LINK);
+  }
+
+  private void doSplitShardWithRule(SolrIndexSplitter.SplitMethod splitMethod) throws Exception {
     waitForThingsToLevelOut(15);
 
     if (usually()) {
@@ -609,14 +648,14 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     }
 
     log.info("Starting testSplitShardWithRule");
-    String collectionName = "shardSplitWithRule";
+    String collectionName = "shardSplitWithRule_" + splitMethod.toLower();
     CollectionAdminRequest.Create createRequest = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 2)
         .setRule("shard:*,replica:<2,node:*");
     CollectionAdminResponse response = createRequest.process(cloudClient);
     assertEquals(0, response.getStatus());
 
     CollectionAdminRequest.SplitShard splitShardRequest = CollectionAdminRequest.splitShard(collectionName)
-        .setShardName("shard1");
+        .setShardName("shard1").setSplitMethod(splitMethod.toLower());
     response = splitShardRequest.process(cloudClient);
     assertEquals(String.valueOf(response.getErrorMessages()), 0, response.getStatus());
   }
@@ -633,7 +672,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     // test with only one range
     subRanges.add(ranges.get(0));
     try {
-      splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null);
+      splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null, false);
       fail("Shard splitting with just one custom hash range should not succeed");
     } catch (HttpSolrClient.RemoteSolrException e) {
       log.info("Expected exception:", e);
@@ -644,7 +683,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     subRanges.add(ranges.get(3)); // order shouldn't matter
     subRanges.add(ranges.get(0));
     try {
-      splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null);
+      splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null, false);
       fail("Shard splitting with missing hashes in between given ranges should not succeed");
     } catch (HttpSolrClient.RemoteSolrException e) {
       log.info("Expected exception:", e);
@@ -657,7 +696,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     subRanges.add(ranges.get(2));
     subRanges.add(new DocRouter.Range(ranges.get(3).min - 15, ranges.get(3).max));
     try {
-      splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null);
+      splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null, false);
       fail("Shard splitting with overlapping ranges should not succeed");
     } catch (HttpSolrClient.RemoteSolrException e) {
       log.info("Expected exception:", e);
@@ -683,6 +722,9 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     final int[] docCounts = new int[ranges.size()];
     int numReplicas = shard1.getReplicas().size();
 
+    cloudClient.getZkStateReader().forceUpdateCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
+    clusterState = cloudClient.getZkStateReader().getClusterState();
+    log.debug("-- COLLECTION: {}", clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION));
     del("*:*");
     for (int id = 0; id <= 100; id++) {
       String shardKey = "" + (char)('a' + (id % 26)); // See comment in ShardRoutingTest for hash distribution
@@ -725,7 +767,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     try {
       for (int i = 0; i < 3; i++) {
         try {
-          splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null);
+          splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null, false);
           log.info("Layout after split: \n");
           printLayout();
           break;
@@ -807,7 +849,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
 
       for (int i = 0; i < 3; i++) {
         try {
-          splitShard(collectionName, SHARD1, null, null);
+          splitShard(collectionName, SHARD1, null, null, false);
           break;
         } catch (HttpSolrClient.RemoteSolrException e) {
           if (e.code() != 500) {
@@ -889,7 +931,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
 
       for (int i = 0; i < 3; i++) {
         try {
-          splitShard(collectionName, null, null, splitKey);
+          splitShard(collectionName, null, null, splitKey, false);
           break;
         } catch (HttpSolrClient.RemoteSolrException e) {
           if (e.code() != 500) {
@@ -992,9 +1034,11 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     }
   }
 
-  protected void splitShard(String collection, String shardId, List<DocRouter.Range> subRanges, String splitKey) throws SolrServerException, IOException {
+  protected void splitShard(String collection, String shardId, List<DocRouter.Range> subRanges, String splitKey, boolean offline) throws SolrServerException, IOException {
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set("action", CollectionParams.CollectionAction.SPLITSHARD.toString());
+    params.set("timing", "true");
+    params.set("offline", String.valueOf(offline));
     params.set("collection", collection);
     if (shardId != null)  {
       params.set("shard", shardId);
@@ -1019,7 +1063,8 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
 
     try (HttpSolrClient baseServer = getHttpSolrClient(baseUrl, 30000, 60000 * 5)) {
-      baseServer.request(request);
+      NamedList<Object> rsp = baseServer.request(request);
+      log.info("Shard split response: " + Utils.toJSONString(rsp));
     }
   }