You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2018/08/01 14:31:08 UTC

[2/2] lucene-solr:master: SOLR-12509: Improve SplitShardCmd performance and reliability.

SOLR-12509: Improve SplitShardCmd performance and reliability.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/1133bf98
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/1133bf98
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/1133bf98

Branch: refs/heads/master
Commit: 1133bf98a5fd075173efecfb75a51493fceb62b3
Parents: c6e0c28
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Wed Aug 1 14:39:37 2018 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Wed Aug 1 16:30:59 2018 +0200

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   4 +
 .../cloud/api/collections/SplitShardCmd.java    | 144 ++++--
 .../solr/cloud/overseer/ReplicaMutator.java     |  47 +-
 .../solr/handler/admin/CollectionsHandler.java  |   6 +-
 .../org/apache/solr/handler/admin/SplitOp.java  |  24 +-
 .../solr/update/DirectUpdateHandler2.java       |   5 +-
 .../apache/solr/update/SolrIndexSplitter.java   | 464 +++++++++++++++++--
 .../apache/solr/update/SplitIndexCommand.java   |  22 +-
 .../solr/cloud/ChaosMonkeyShardSplitTest.java   |   2 +-
 .../cloud/api/collections/ShardSplitTest.java   |  73 ++-
 .../solr/update/SolrIndexSplitterTest.java      | 112 ++++-
 solr/solr-ref-guide/src/collections-api.adoc    |  20 +-
 .../solrj/request/CollectionAdminRequest.java   |  11 +
 .../solr/common/params/CommonAdminParams.java   |   2 +
 .../solr/common/params/CoreAdminParams.java     |   2 +-
 15 files changed, 773 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 37dd5a7..49fc7fe 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -205,6 +205,10 @@ Optimizations
 * SOLR-12305: When a replica is applying updates, some kind of updates can skip buffering for faster recovery.
   (Cao Manh Dat)
 
+* SOLR-12509: Improve SplitShardCmd performance and reliability. A new method of splitting has been
+  introduced (splitMethod=link) which uses hard-linking of index files when possible, resulting in
+  significant speedups and reduced CPU / IO load on shard leader. (ab)
+
 Other Changes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index b5408f8..00488a3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -51,11 +52,14 @@ import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.rule.ImplicitSnitch;
 import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.update.SolrIndexSplitter;
+import org.apache.solr.util.RTimerTree;
 import org.apache.solr.util.TestInjection;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -87,13 +91,23 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
 
   public boolean split(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
     boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
+    String methodStr = message.getStr(CommonAdminParams.SPLIT_METHOD, SolrIndexSplitter.SplitMethod.REWRITE.toLower());
+    SolrIndexSplitter.SplitMethod splitMethod = SolrIndexSplitter.SplitMethod.get(methodStr);
+    if (splitMethod == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown value '" + CommonAdminParams.SPLIT_METHOD +
+          ": " + methodStr);
+    }
+    boolean withTiming = message.getBool(CommonParams.TIMING, false);
+
     String collectionName = message.getStr(CoreAdminParams.COLLECTION);
 
-    log.info("Split shard invoked");
+    log.debug("Split shard invoked: {}", message);
     ZkStateReader zkStateReader = ocmh.zkStateReader;
     zkStateReader.forceUpdateCollection(collectionName);
     AtomicReference<String> slice = new AtomicReference<>();
     slice.set(message.getStr(ZkStateReader.SHARD_ID_PROP));
+    Set<String> offlineSlices = new HashSet<>();
+    RTimerTree timings = new RTimerTree();
 
     String splitKey = message.getStr("split.key");
     DocCollection collection = clusterState.getCollection(collectionName);
@@ -101,6 +115,9 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
     PolicyHelper.SessionWrapper sessionWrapper = null;
 
     Slice parentSlice = getParentSlice(clusterState, collectionName, slice, splitKey);
+    if (parentSlice.getState() != Slice.State.ACTIVE) {
+      throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Parent slice is not active: " + parentSlice.getState());
+    }
 
     // find the leader for the shard
     Replica parentShardLeader = null;
@@ -111,7 +128,9 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Interrupted.");
     }
 
+    RTimerTree t = timings.sub("checkDiskSpace");
     checkDiskSpace(collectionName, slice.get(), parentShardLeader);
+    t.stop();
 
     // let's record the ephemeralOwner of the parent leader node
     Stat leaderZnodeStat = zkStateReader.getZkClient().exists(ZkStateReader.LIVE_NODES_ZKNODE + "/" + parentShardLeader.getNodeName(), null, true);
@@ -142,20 +161,22 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
     });
     int repFactor = numNrt.get() + numTlog.get() + numPull.get();
 
-    // type of the first subreplica will be the same as leader
-    boolean firstNrtReplica = parentShardLeader.getType() == Replica.Type.NRT;
-    // verify that we indeed have the right number of correct replica types
-    if ((firstNrtReplica && numNrt.get() < 1) || (!firstNrtReplica && numTlog.get() < 1)) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "aborting split - inconsistent replica types in collection " + collectionName +
-          ": nrt=" + numNrt.get() + ", tlog=" + numTlog.get() + ", pull=" + numPull.get() + ", shard leader type is " +
-      parentShardLeader.getType());
-    }
-
-    List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
+    boolean success = false;
+    try {
+      // type of the first subreplica will be the same as leader
+      boolean firstNrtReplica = parentShardLeader.getType() == Replica.Type.NRT;
+      // verify that we indeed have the right number of correct replica types
+      if ((firstNrtReplica && numNrt.get() < 1) || (!firstNrtReplica && numTlog.get() < 1)) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "aborting split - inconsistent replica types in collection " + collectionName +
+            ": nrt=" + numNrt.get() + ", tlog=" + numTlog.get() + ", pull=" + numPull.get() + ", shard leader type is " +
+            parentShardLeader.getType());
+      }
 
-    String rangesStr = fillRanges(ocmh.cloudManager, message, collection, parentSlice, subRanges, subSlices, subShardNames, firstNrtReplica);
+      List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
 
-    try {
+      t = timings.sub("fillRanges");
+      String rangesStr = fillRanges(ocmh.cloudManager, message, collection, parentSlice, subRanges, subSlices, subShardNames, firstNrtReplica);
+      t.stop();
 
       boolean oldShardsDeleted = false;
       for (String subSlice : subSlices) {
@@ -196,12 +217,13 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
       Map<String, String> requestMap = new HashMap<>();
       String nodeName = parentShardLeader.getNodeName();
 
+      t = timings.sub("createSubSlicesAndLeadersInState");
       for (int i = 0; i < subRanges.size(); i++) {
         String subSlice = subSlices.get(i);
         String subShardName = subShardNames.get(i);
         DocRouter.Range subRange = subRanges.get(i);
 
-        log.info("Creating slice " + subSlice + " of collection " + collectionName + " on " + nodeName);
+        log.debug("Creating slice " + subSlice + " of collection " + collectionName + " on " + nodeName);
 
         Map<String, Object> propMap = new HashMap<>();
         propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD.toLower());
@@ -210,7 +232,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
         propMap.put(ZkStateReader.SHARD_RANGE_PROP, subRange.toString());
         propMap.put(ZkStateReader.SHARD_STATE_PROP, Slice.State.CONSTRUCTION.toString());
         propMap.put(ZkStateReader.SHARD_PARENT_PROP, parentSlice.getName());
-        propMap.put("shard_parent_node", parentShardLeader.getNodeName());
+        propMap.put("shard_parent_node", nodeName);
         propMap.put("shard_parent_zk_session", leaderZnodeStat.getEphemeralOwner());
         DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
         inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
@@ -221,7 +243,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
         // refresh cluster state
         clusterState = zkStateReader.getClusterState();
 
-        log.info("Adding replica " + subShardName + " as part of slice " + subSlice + " of collection " + collectionName
+        log.debug("Adding first replica " + subShardName + " as part of slice " + subSlice + " of collection " + collectionName
             + " on " + nodeName);
         propMap = new HashMap<>();
         propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
@@ -248,9 +270,11 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
 
       ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard leaders", asyncId, requestMap);
 
+      t.stop();
+      t = timings.sub("waitForSubSliceLeadersAlive");
       for (String subShardName : subShardNames) {
         // wait for parent leader to acknowledge the sub-shard core
-        log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
+        log.debug("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
         String coreNodeName = ocmh.waitForCoreNodeName(collectionName, nodeName, subShardName);
         CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
         cmd.setCoreName(subShardName);
@@ -266,8 +290,9 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
 
       ocmh.processResponses(results, shardHandler, true, "SPLITSHARD timed out waiting for subshard leaders to come up",
           asyncId, requestMap);
+      t.stop();
 
-      log.info("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice
+      log.debug("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice
           + " on: " + parentShardLeader);
 
       log.info("Splitting shard " + parentShardLeader.getName() + " as part of slice " + slice + " of collection "
@@ -275,6 +300,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
 
       ModifiableSolrParams params = new ModifiableSolrParams();
       params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.SPLIT.toString());
+      params.set(CommonAdminParams.SPLIT_METHOD, splitMethod.toLower());
       params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core"));
       for (int i = 0; i < subShardNames.size(); i++) {
         String subShardName = subShardNames.get(i);
@@ -282,18 +308,22 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
       }
       params.set(CoreAdminParams.RANGES, rangesStr);
 
+      t = timings.sub("splitParentCore");
+
       ocmh.sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
 
       ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to invoke SPLIT core admin command", asyncId,
           requestMap);
+      t.stop();
 
-      log.info("Index on shard: " + nodeName + " split into two successfully");
+      log.debug("Index on shard: " + nodeName + " split into two successfully");
 
+      t = timings.sub("applyBufferedUpdates");
       // apply buffered updates on sub-shards
       for (int i = 0; i < subShardNames.size(); i++) {
         String subShardName = subShardNames.get(i);
 
-        log.info("Applying buffered updates on : " + subShardName);
+        log.debug("Applying buffered updates on : " + subShardName);
 
         params = new ModifiableSolrParams();
         params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString());
@@ -304,8 +334,9 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
 
       ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed while asking sub shard leaders" +
           " to apply buffered updates", asyncId, requestMap);
+      t.stop();
 
-      log.info("Successfully applied buffered updates on : " + subShardNames);
+      log.debug("Successfully applied buffered updates on : " + subShardNames);
 
       // Replica creation for the new Slices
       // replica placement is controlled by the autoscaling policy framework
@@ -329,6 +360,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
         numTlog.decrementAndGet();
       }
 
+      t = timings.sub("identifyNodesForReplicas");
       List<ReplicaPosition> replicaPositions = Assign.identifyNodes(ocmh.cloudManager,
           clusterState,
           new ArrayList<>(clusterState.getLiveNodes()),
@@ -336,13 +368,15 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
           new ZkNodeProps(collection.getProperties()),
           subSlices, numNrt.get(), numTlog.get(), numPull.get());
       sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
+      t.stop();
 
+      t = timings.sub("createReplicaPlaceholders");
       for (ReplicaPosition replicaPosition : replicaPositions) {
         String sliceName = replicaPosition.shard;
         String subShardNodeName = replicaPosition.node;
         String solrCoreName = Assign.buildSolrCoreName(collectionName, sliceName, replicaPosition.type, replicaPosition.index);
 
-        log.info("Creating replica shard " + solrCoreName + " as part of slice " + sliceName + " of collection "
+        log.debug("Creating replica shard " + solrCoreName + " as part of slice " + sliceName + " of collection "
             + collectionName + " on " + subShardNodeName);
 
         // we first create all replicas in DOWN state without actually creating their cores in order to
@@ -384,7 +418,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
 
         replicas.add(propMap);
       }
-
+      t.stop();
       assert TestInjection.injectSplitFailureBeforeReplicaCreation();
 
       long ephemeralOwner = leaderZnodeStat.getEphemeralOwner();
@@ -414,12 +448,12 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
       }
 
       // we must set the slice state into recovery before actually creating the replica cores
-      // this ensures that the logic inside Overseer to update sub-shard state to 'active'
+      // this ensures that the logic inside ReplicaMutator to update sub-shard state to 'active'
       // always gets a chance to execute. See SOLR-7673
 
       if (repFactor == 1) {
         // switch sub shard states to 'active'
-        log.info("Replication factor is 1 so switching shard states");
+        log.debug("Replication factor is 1 so switching shard states");
         DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
         Map<String, Object> propMap = new HashMap<>();
         propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
@@ -431,7 +465,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
         ZkNodeProps m = new ZkNodeProps(propMap);
         inQueue.offer(Utils.toJSON(m));
       } else {
-        log.info("Requesting shard state be set to 'recovery'");
+        log.debug("Requesting shard state be set to 'recovery'");
         DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
         Map<String, Object> propMap = new HashMap<>();
         propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
@@ -443,6 +477,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
         inQueue.offer(Utils.toJSON(m));
       }
 
+      t = timings.sub("createCoresForReplicas");
       // now actually create replica cores on sub shard nodes
       for (Map<String, Object> replica : replicas) {
         ocmh.addReplica(clusterState, new ZkNodeProps(replica), results, null);
@@ -451,20 +486,28 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
       assert TestInjection.injectSplitFailureAfterReplicaCreation();
 
       ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard replicas", asyncId, requestMap);
+      t.stop();
 
       log.info("Successfully created all replica shards for all sub-slices " + subSlices);
 
+      t = timings.sub("finalCommit");
       ocmh.commit(results, slice.get(), parentShardLeader);
-
+      t.stop();
+      if (withTiming) {
+        results.add(CommonParams.TIMING, timings.asNamedList());
+      }
+      success = true;
       return true;
     } catch (SolrException e) {
-      cleanupAfterFailure(zkStateReader, collectionName, parentSlice.getName(), subSlices);
       throw e;
     } catch (Exception e) {
       log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e);
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, e);
     } finally {
       if (sessionWrapper != null) sessionWrapper.release();
+      if (!success) {
+        cleanupAfterFailure(zkStateReader, collectionName, parentSlice.getName(), subSlices, offlineSlices);
+      }
     }
   }
 
@@ -505,13 +548,14 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
     }
   }
 
-  private void cleanupAfterFailure(ZkStateReader zkStateReader, String collectionName, String parentShard, List<String> subSlices) {
-    log.debug("- cleanup after failed split of " + collectionName + "/" + parentShard);
+  private void cleanupAfterFailure(ZkStateReader zkStateReader, String collectionName, String parentShard,
+                                   List<String> subSlices, Set<String> offlineSlices) {
+    log.info("Cleaning up after a failed split of " + collectionName + "/" + parentShard);
     // get the latest state
     try {
       zkStateReader.forceUpdateCollection(collectionName);
     } catch (KeeperException | InterruptedException e) {
-      log.warn("Cleanup after failed split of " + collectionName + "/" + parentShard + ": (force update collection)", e);
+      log.warn("Cleanup failed after failed split of " + collectionName + "/" + parentShard + ": (force update collection)", e);
       return;
     }
     ClusterState clusterState = zkStateReader.getClusterState();
@@ -524,7 +568,8 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
     // set already created sub shards states to CONSTRUCTION - this prevents them
     // from entering into RECOVERY or ACTIVE (SOLR-9455)
     DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
-    Map<String, Object> propMap = new HashMap<>();
+    final Map<String, Object> propMap = new HashMap<>();
+    boolean sendUpdateState = false;
     propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
     propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
     for (Slice s : coll.getSlices()) {
@@ -532,20 +577,29 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
         continue;
       }
       propMap.put(s.getName(), Slice.State.CONSTRUCTION.toString());
+      sendUpdateState = true;
     }
 
     // if parent is inactive activate it again
     Slice parentSlice = coll.getSlice(parentShard);
     if (parentSlice.getState() == Slice.State.INACTIVE) {
+      sendUpdateState = true;
       propMap.put(parentShard, Slice.State.ACTIVE.toString());
     }
+    // plus any other previously deactivated slices
+    for (String sliceName : offlineSlices) {
+      propMap.put(sliceName, Slice.State.ACTIVE.toString());
+      sendUpdateState = true;
+    }
 
-    try {
-      ZkNodeProps m = new ZkNodeProps(propMap);
-      inQueue.offer(Utils.toJSON(m));
-    } catch (Exception e) {
-      // don't give up yet - just log the error, we may still be able to clean up
-      log.warn("Cleanup after failed split of " + collectionName + "/" + parentShard + ": (slice state changes)", e);
+    if (sendUpdateState) {
+      try {
+        ZkNodeProps m = new ZkNodeProps(propMap);
+        inQueue.offer(Utils.toJSON(m));
+      } catch (Exception e) {
+        // don't give up yet - just log the error, we may still be able to clean up
+        log.warn("Cleanup failed after failed split of " + collectionName + "/" + parentShard + ": (slice state changes)", e);
+      }
     }
 
     // delete existing subShards
@@ -554,16 +608,16 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
       if (s == null) {
         continue;
       }
-      log.info("Sub-shard: {} already exists therefore requesting its deletion", subSlice);
-      propMap = new HashMap<>();
-      propMap.put(Overseer.QUEUE_OPERATION, "deleteshard");
-      propMap.put(COLLECTION_PROP, collectionName);
-      propMap.put(SHARD_ID_PROP, subSlice);
-      ZkNodeProps m = new ZkNodeProps(propMap);
+      log.debug("- sub-shard: {} exists therefore requesting its deletion", subSlice);
+      HashMap<String, Object> props = new HashMap<>();
+      props.put(Overseer.QUEUE_OPERATION, "deleteshard");
+      props.put(COLLECTION_PROP, collectionName);
+      props.put(SHARD_ID_PROP, subSlice);
+      ZkNodeProps m = new ZkNodeProps(props);
       try {
         ocmh.commandMap.get(DELETESHARD).call(clusterState, m, new NamedList());
       } catch (Exception e) {
-        log.warn("Cleanup after failed split of " + collectionName + "/" + parentShard + ": (deleting existing sub shard " + subSlice + ")", e);
+        log.warn("Cleanup failed after failed split of " + collectionName + "/" + parentShard + ": (deleting existing sub shard " + subSlice + ")", e);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
index f897072..34843c1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
@@ -25,6 +25,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.solr.client.solrj.cloud.DistribStateManager;
@@ -52,12 +53,12 @@ import static org.apache.solr.common.params.CommonParams.NAME;
 public class ReplicaMutator {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  protected final SolrCloudManager dataProvider;
+  protected final SolrCloudManager cloudManager;
   protected final DistribStateManager stateManager;
 
-  public ReplicaMutator(SolrCloudManager dataProvider) {
-    this.dataProvider = dataProvider;
-    this.stateManager = dataProvider.getDistribStateManager();
+  public ReplicaMutator(SolrCloudManager cloudManager) {
+    this.cloudManager = cloudManager;
+    this.stateManager = cloudManager.getDistribStateManager();
   }
 
   protected Replica setProperty(Replica replica, String key, String value) {
@@ -96,11 +97,11 @@ public class ReplicaMutator {
   }
 
   public ZkWriteCommand addReplicaProperty(ClusterState clusterState, ZkNodeProps message) {
-    if (checkKeyExistence(message, ZkStateReader.COLLECTION_PROP) == false ||
-        checkKeyExistence(message, ZkStateReader.SHARD_ID_PROP) == false ||
-        checkKeyExistence(message, ZkStateReader.REPLICA_PROP) == false ||
-        checkKeyExistence(message, ZkStateReader.PROPERTY_PROP) == false ||
-        checkKeyExistence(message, ZkStateReader.PROPERTY_VALUE_PROP) == false) {
+    if (!checkKeyExistence(message, ZkStateReader.COLLECTION_PROP) ||
+        !checkKeyExistence(message, ZkStateReader.SHARD_ID_PROP) ||
+        !checkKeyExistence(message, ZkStateReader.REPLICA_PROP) ||
+        !checkKeyExistence(message, ZkStateReader.PROPERTY_PROP) ||
+        !checkKeyExistence(message, ZkStateReader.PROPERTY_VALUE_PROP)) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
           "Overseer ADDREPLICAPROP requires " +
               ZkStateReader.COLLECTION_PROP + " and " + ZkStateReader.SHARD_ID_PROP + " and " +
@@ -200,7 +201,7 @@ public class ReplicaMutator {
   }
 
   public ZkWriteCommand setState(ClusterState clusterState, ZkNodeProps message) {
-    if (Overseer.isLegacy(dataProvider.getClusterStateProvider())) {
+    if (Overseer.isLegacy(cloudManager.getClusterStateProvider())) {
       return updateState(clusterState, message);
     } else {
       return updateStateNew(clusterState, message);
@@ -224,7 +225,7 @@ public class ReplicaMutator {
       ClusterStateMutator.getShardNames(numShards, shardNames);
       Map<String, Object> createMsg = Utils.makeMap(NAME, cName);
       createMsg.putAll(message.getProperties());
-      writeCommand = new ClusterStateMutator(dataProvider).createCollection(prevState, new ZkNodeProps(createMsg));
+      writeCommand = new ClusterStateMutator(cloudManager).createCollection(prevState, new ZkNodeProps(createMsg));
       DocCollection collection = writeCommand.collection;
       newState = ClusterStateMutator.newState(prevState, cName, collection);
     }
@@ -451,30 +452,34 @@ public class ReplicaMutator {
               }
             }
 
+            Map<String, Object> propMap = new HashMap<>();
+            propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
+            propMap.put(ZkStateReader.COLLECTION_PROP, collection.getName());
             if (isLeaderSame) {
               log.info("Sub-shard leader node is still the same one at {} with ZK session id {}. Preparing to switch shard states.", shardParentNode, shardParentZkSession);
-              Map<String, Object> propMap = new HashMap<>();
-              propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
               propMap.put(parentSliceName, Slice.State.INACTIVE.toString());
               propMap.put(sliceName, Slice.State.ACTIVE.toString());
+              long now = cloudManager.getTimeSource().getEpochTimeNs();
               for (Slice subShardSlice : subShardSlices) {
                 propMap.put(subShardSlice.getName(), Slice.State.ACTIVE.toString());
+                String lastTimeStr = subShardSlice.getStr(ZkStateReader.STATE_TIMESTAMP_PROP);
+                if (lastTimeStr != null) {
+                  long start = Long.parseLong(lastTimeStr);
+                  log.info("TIMINGS: Sub-shard " + subShardSlice.getName() + " recovered in " +
+                      TimeUnit.MILLISECONDS.convert(now - start, TimeUnit.NANOSECONDS) + " ms");
+                } else {
+                  log.info("TIMINGS Sub-shard " + subShardSlice.getName() + " not available: " + subShardSlice);
+                }
               }
-              propMap.put(ZkStateReader.COLLECTION_PROP, collection.getName());
-              ZkNodeProps m = new ZkNodeProps(propMap);
-              return new SliceMutator(dataProvider).updateShardState(prevState, m).collection;
             } else  {
               // we must mark the shard split as failed by switching sub-shards to recovery_failed state
-              Map<String, Object> propMap = new HashMap<>();
-              propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
               propMap.put(sliceName, Slice.State.RECOVERY_FAILED.toString());
               for (Slice subShardSlice : subShardSlices) {
                 propMap.put(subShardSlice.getName(), Slice.State.RECOVERY_FAILED.toString());
               }
-              propMap.put(ZkStateReader.COLLECTION_PROP, collection.getName());
-              ZkNodeProps m = new ZkNodeProps(propMap);
-              return new SliceMutator(dataProvider).updateShardState(prevState, m).collection;
             }
+            ZkNodeProps m = new ZkNodeProps(propMap);
+            return new SliceMutator(cloudManager).updateShardState(prevState, m).collection;
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 8d7cdbf..3a46b2b 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -144,8 +144,10 @@ import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTIO
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
 import static org.apache.solr.common.params.CommonAdminParams.IN_PLACE_MOVE;
+import static org.apache.solr.common.params.CommonAdminParams.SPLIT_METHOD;
 import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
 import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.params.CommonParams.TIMING;
 import static org.apache.solr.common.params.CommonParams.VALUE_LONG;
 import static org.apache.solr.common.params.CoreAdminParams.DATA_DIR;
 import static org.apache.solr.common.params.CoreAdminParams.DELETE_DATA_DIR;
@@ -662,7 +664,9 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           SHARD_ID_PROP,
           "split.key",
           CoreAdminParams.RANGES,
-          WAIT_FOR_FINAL_STATE);
+          WAIT_FOR_FINAL_STATE,
+          TIMING,
+          SPLIT_METHOD);
       return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX);
     }),
     DELETESHARD_OP(DELETESHARD, (req, rsp, h) -> {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java b/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
index 9dda6d4..31382c3 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
@@ -30,11 +30,13 @@ import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.params.CommonAdminParams;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.request.LocalSolrQueryRequest;
 import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.update.SolrIndexSplitter;
 import org.apache.solr.update.SplitIndexCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,9 +80,14 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
     }
 
     log.info("Invoked split action for core: " + cname);
-    SolrCore core = it.handler.coreContainer.getCore(cname);
-    SolrQueryRequest req = new LocalSolrQueryRequest(core, params);
+    String methodStr = params.get(CommonAdminParams.SPLIT_METHOD, SolrIndexSplitter.SplitMethod.REWRITE.toLower());
+    SolrIndexSplitter.SplitMethod splitMethod = SolrIndexSplitter.SplitMethod.get(methodStr);
+    if (splitMethod == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unsupported value of '" + CommonAdminParams.SPLIT_METHOD + "': " + methodStr);
+    }
+    SolrCore parentCore = it.handler.coreContainer.getCore(cname);
     List<SolrCore> newCores = null;
+    SolrQueryRequest req = null;
 
     try {
       // TODO: allow use of rangesStr in the future
@@ -91,9 +98,9 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
       String routeFieldName = null;
       if (it.handler.coreContainer.isZooKeeperAware()) {
         ClusterState clusterState = it.handler.coreContainer.getZkController().getClusterState();
-        String collectionName = req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName();
+        String collectionName = parentCore.getCoreDescriptor().getCloudDescriptor().getCollectionName();
         DocCollection collection = clusterState.getCollection(collectionName);
-        String sliceName = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
+        String sliceName = parentCore.getCoreDescriptor().getCloudDescriptor().getShardId();
         Slice slice = collection.getSlice(sliceName);
         router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
         if (ranges == null) {
@@ -101,7 +108,7 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
           ranges = currentRange != null ? router.partitionRange(partitions, currentRange) : null;
         }
         Object routerObj = collection.get(DOC_ROUTER); // for back-compat with Solr 4.4
-        if (routerObj != null && routerObj instanceof Map) {
+        if (routerObj instanceof Map) {
           Map routerProps = (Map) routerObj;
           routeFieldName = (String) routerProps.get("field");
         }
@@ -131,9 +138,10 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
         paths = Arrays.asList(pathsArr);
       }
 
+      req = new LocalSolrQueryRequest(parentCore, params);
 
-      SplitIndexCommand cmd = new SplitIndexCommand(req, paths, newCores, ranges, router, routeFieldName, splitKey);
-      core.getUpdateHandler().split(cmd);
+      SplitIndexCommand cmd = new SplitIndexCommand(req, it.rsp, paths, newCores, ranges, router, routeFieldName, splitKey, splitMethod);
+      parentCore.getUpdateHandler().split(cmd);
 
       if (it.handler.coreContainer.isZooKeeperAware()) {
         for (SolrCore newcore : newCores) {
@@ -150,7 +158,7 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
       throw e;
     } finally {
       if (req != null) req.close();
-      if (core != null) core.close();
+      if (parentCore != null) parentCore.close();
       if (newCores != null) {
         for (SolrCore newCore : newCores) {
           newCore.close();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index bcc97eb..e64ee8a 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -47,6 +47,7 @@ import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.SolrConfig.UpdateHandlerInfo;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.metrics.SolrMetricManager;
@@ -902,8 +903,10 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
     commit(new CommitUpdateCommand(cmd.req, false));
     SolrIndexSplitter splitter = new SolrIndexSplitter(cmd);
     splitCommands.mark();
+    NamedList<Object> results = new NamedList<>();
     try {
-      splitter.split();
+      splitter.split(results);
+      cmd.rsp.addResponse(results);
     } catch (IOException e) {
       numErrors.increment();
       numErrorsCumulative.mark();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java b/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
index aadbe74..75234fa 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
@@ -18,31 +18,59 @@ package org.apache.solr.update;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
 
 import org.apache.lucene.index.CodecReader;
 import org.apache.lucene.index.FilterCodecReader;
+import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.LeafReader;
 import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.NoMergePolicy;
 import org.apache.lucene.index.PostingsEnum;
 import org.apache.lucene.index.SlowCodecReaderWrapper;
 import org.apache.lucene.index.Terms;
 import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.search.ConstantScoreScorer;
+import org.apache.lucene.search.ConstantScoreWeight;
 import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreMode;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.Weight;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.HardlinkCopyDirectoryWrapper;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.util.BitSetIterator;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.CharsRefBuilder;
 import org.apache.lucene.util.FixedBitSet;
 import org.apache.lucene.util.IOUtils;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.CompositeIdRouter;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.HashBasedRouter;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.DirectoryFactory;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.IndexFetcher;
+import org.apache.solr.handler.SnapShooter;
 import org.apache.solr.schema.SchemaField;
 import org.apache.solr.search.BitsFilteredPostingsEnum;
 import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.util.RTimerTree;
 import org.apache.solr.util.RefCounted;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,26 +78,47 @@ import org.slf4j.LoggerFactory;
 public class SolrIndexSplitter {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  private static final String INDEX_PREFIX = "index.";
+
+  public enum SplitMethod {
+    REWRITE,
+    LINK;
+
+    public static SplitMethod get(String p) {
+      if (p != null) {
+        try {
+          return SplitMethod.valueOf(p.toUpperCase(Locale.ROOT));
+        } catch (Exception ex) {
+          return null;
+        }
+      }
+      return null;
+    }
+
+    public String toLower() {
+      return toString().toLowerCase(Locale.ROOT);
+    }
+  }
+
   SolrIndexSearcher searcher;
   SchemaField field;
   List<DocRouter.Range> ranges;
   DocRouter.Range[] rangesArr; // same as ranges list, but an array for extra speed in inner loops
   List<String> paths;
   List<SolrCore> cores;
-  DocRouter router;
   HashBasedRouter hashRouter;
   int numPieces;
-  int currPartition = 0;
   String routeFieldName;
   String splitKey;
+  SplitMethod splitMethod;
+  RTimerTree timings = new RTimerTree();
 
   public SolrIndexSplitter(SplitIndexCommand cmd) {
     searcher = cmd.getReq().getSearcher();
     ranges = cmd.ranges;
     paths = cmd.paths;
     cores = cmd.cores;
-    router = cmd.router;
-    hashRouter = router instanceof HashBasedRouter ? (HashBasedRouter)router : null;
+    hashRouter = cmd.router instanceof HashBasedRouter ? (HashBasedRouter)cmd.router : null;
 
     if (ranges == null) {
       numPieces =  paths != null ? paths.size() : cores.size();
@@ -86,83 +135,413 @@ public class SolrIndexSplitter {
     if (cmd.splitKey != null) {
       splitKey = getRouteKey(cmd.splitKey);
     }
+    if (cores == null) {
+      this.splitMethod = SplitMethod.REWRITE;
+    } else {
+      this.splitMethod = cmd.splitMethod;
+    }
   }
 
-  public void split() throws IOException {
+  public void split(NamedList<Object> results) throws IOException {
+    SolrCore parentCore = searcher.getCore();
+    Directory parentDirectory = searcher.getRawReader().directory();
+    Lock parentDirectoryLock = null;
+    UpdateLog ulog = parentCore.getUpdateHandler().getUpdateLog();
+    if (ulog == null && splitMethod == SplitMethod.LINK) {
+      log.warn("No updateLog in parent core, switching to use potentially slower 'splitMethod=rewrite'");
+      splitMethod = SplitMethod.REWRITE;
+    }
+    if (splitMethod == SplitMethod.LINK) {
+      RTimerTree t = timings.sub("closeParentIW");
+      try {
+        // start buffering updates
+        ulog.bufferUpdates();
+        parentCore.getSolrCoreState().closeIndexWriter(parentCore, false);
+        // make sure we can lock the directory for our exclusive use
+        parentDirectoryLock = parentDirectory.obtainLock(IndexWriter.WRITE_LOCK_NAME);
+        log.info("Splitting in 'link' mode: closed parent IndexWriter...");
+        t.stop();
+      } catch (Exception e) {
+        if (parentDirectoryLock != null) {
+          IOUtils.closeWhileHandlingException(parentDirectoryLock);
+        }
+        try {
+          parentCore.getSolrCoreState().openIndexWriter(parentCore);
+          ulog.applyBufferedUpdates();
+        } catch (Exception e1) {
+          log.error("Error reopening IndexWriter after failed close", e1);
+          log.error("Original error closing IndexWriter:", e);
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error reopening IndexWriter after failed close", e1);
+        }
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error closing current IndexWriter, aborting offline split...", e);
+      }
+    }
+    boolean success = false;
+    try {
+      RTimerTree t = timings.sub("doSplit");
+      doSplit();
+      t.stop();
+      success = true;
+    } catch (Exception e) {
+      results.add("failed", e.toString());
+      throw e;
+    } finally {
+      if (splitMethod == SplitMethod.LINK) {
+        IOUtils.closeWhileHandlingException(parentDirectoryLock);
+        RTimerTree t = timings.sub("reopenParentIW");
+        parentCore.getSolrCoreState().openIndexWriter(parentCore);
+        t.stop();
+        t = timings.sub("parentApplyBufferedUpdates");
+        ulog.applyBufferedUpdates();
+        t.stop();
+        log.info("Splitting in 'offline' mode " + (success? "finished" : "FAILED") +
+            ": re-opened parent IndexWriter.");
+      }
+    }
+    results.add(CommonParams.TIMING, timings.asNamedList());
+  }
+
+  public void doSplit() throws IOException {
 
     List<LeafReaderContext> leaves = searcher.getRawReader().leaves();
+    Directory parentDirectory = searcher.getRawReader().directory();
     List<FixedBitSet[]> segmentDocSets = new ArrayList<>(leaves.size());
-
-    log.info("SolrIndexSplitter: partitions=" + numPieces + " segments="+leaves.size());
-
-    for (LeafReaderContext readerContext : leaves) {
-      assert readerContext.ordInParent == segmentDocSets.size();  // make sure we're going in order
-      FixedBitSet[] docSets = split(readerContext);
-      segmentDocSets.add( docSets );
+    SolrIndexConfig parentConfig = searcher.getCore().getSolrConfig().indexConfig;
+    String timestamp = new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date());
+
+    log.info("SolrIndexSplitter: partitions=" + numPieces + " segments=" + leaves.size());
+    RTimerTree t;
+
+    if (splitMethod != SplitMethod.LINK) {
+      t = timings.sub("findDocSetsPerLeaf");
+      for (LeafReaderContext readerContext : leaves) {
+        assert readerContext.ordInParent == segmentDocSets.size();  // make sure we're going in order
+        FixedBitSet[] docSets = split(readerContext, numPieces, field, rangesArr, splitKey, hashRouter, false);
+        segmentDocSets.add(docSets);
+      }
+      t.stop();
     }
 
 
+    Map<IndexReader.CacheKey, FixedBitSet[]> docsToDeleteCache = new ConcurrentHashMap<>();
+
     // would it be more efficient to write segment-at-a-time to each new index?
     // - need to worry about number of open descriptors
     // - need to worry about if IW.addIndexes does a sync or not...
     // - would be more efficient on the read side, but prob less efficient merging
-
     for (int partitionNumber=0; partitionNumber<numPieces; partitionNumber++) {
-      log.info("SolrIndexSplitter: partition #" + partitionNumber + " partitionCount=" + numPieces + (ranges != null ? " range=" + ranges.get(partitionNumber) : ""));
+      String partitionName = "SolrIndexSplitter:partition=" + partitionNumber + ",partitionCount=" + numPieces + (ranges != null ? ",range=" + ranges.get(partitionNumber) : "");
+      log.info(partitionName);
 
       boolean success = false;
 
       RefCounted<IndexWriter> iwRef = null;
-      IndexWriter iw = null;
-      if (cores != null) {
+      IndexWriter iw;
+      if (cores != null && splitMethod != SplitMethod.LINK) {
         SolrCore subCore = cores.get(partitionNumber);
         iwRef = subCore.getUpdateHandler().getSolrCoreState().getIndexWriter(subCore);
         iw = iwRef.get();
       } else {
-        SolrCore core = searcher.getCore();
-        String path = paths.get(partitionNumber);
-        iw = SolrIndexWriter.create(core, "SplittingIndexWriter"+partitionNumber + (ranges != null ? " " + ranges.get(partitionNumber) : ""), path,
-                                    core.getDirectoryFactory(), true, core.getLatestSchema(),
-                                    core.getSolrConfig().indexConfig, core.getDeletionPolicy(), core.getCodec());
+        if (splitMethod == SplitMethod.LINK) {
+          SolrCore subCore = cores.get(partitionNumber);
+          String path = subCore.getDataDir() + INDEX_PREFIX + timestamp;
+          t = timings.sub("hardLinkCopy");
+          t.resume();
+          // copy by hard-linking
+          Directory splitDir = subCore.getDirectoryFactory().get(path, DirectoryFactory.DirContext.DEFAULT, subCore.getSolrConfig().indexConfig.lockType);
+          // the wrapper doesn't hold any resources itself so it doesn't need closing
+          HardlinkCopyDirectoryWrapper hardLinkedDir = new HardlinkCopyDirectoryWrapper(splitDir);
+          boolean copiedOk = false;
+          try {
+            for (String file : parentDirectory.listAll()) {
+              // we've closed the IndexWriter, so ignore write.lock
+              // its file may be present even when IndexWriter is closed but
+              // we've already checked that the lock is not held by anyone else
+              if (file.equals(IndexWriter.WRITE_LOCK_NAME)) {
+                continue;
+              }
+              hardLinkedDir.copyFrom(parentDirectory, file, file, IOContext.DEFAULT);
+            }
+            copiedOk = true;
+          } finally {
+            if (!copiedOk) {
+              subCore.getDirectoryFactory().doneWithDirectory(splitDir);
+              subCore.getDirectoryFactory().remove(splitDir);
+            }
+          }
+          t.pause();
+          IndexWriterConfig iwConfig = parentConfig.toIndexWriterConfig(subCore);
+          // don't run merges at this time
+          iwConfig.setMergePolicy(NoMergePolicy.INSTANCE);
+          t = timings.sub("createSubIW");
+          t.resume();
+          iw = new SolrIndexWriter(partitionName, splitDir, iwConfig);
+          t.pause();
+        } else {
+          SolrCore core = searcher.getCore();
+          String path = paths.get(partitionNumber);
+          t = timings.sub("createSubIW");
+          t.resume();
+          iw = SolrIndexWriter.create(core, partitionName, path,
+              core.getDirectoryFactory(), true, core.getLatestSchema(),
+              core.getSolrConfig().indexConfig, core.getDeletionPolicy(), core.getCodec());
+          t.pause();
+        }
       }
 
       try {
-        // This removes deletions but optimize might still be needed because sub-shards will have the same number of segments as the parent shard.
-        for (int segmentNumber = 0; segmentNumber<leaves.size(); segmentNumber++) {
-          log.info("SolrIndexSplitter: partition #" + partitionNumber + " partitionCount=" + numPieces + (ranges != null ? " range=" + ranges.get(partitionNumber) : "") + " segment #"+segmentNumber + " segmentCount=" + leaves.size());
-          CodecReader subReader = SlowCodecReaderWrapper.wrap(leaves.get(segmentNumber).reader());
-          iw.addIndexes(new LiveDocsReader(subReader, segmentDocSets.get(segmentNumber)[partitionNumber]));
+        if (splitMethod == SplitMethod.LINK) {
+          t = timings.sub("deleteDocuments");
+          t.resume();
+          // apply deletions specific to this partition. As a side-effect on the first call this also populates
+          // a cache of docsets to delete per leaf reader per partition, which is reused for subsequent partitions.
+          iw.deleteDocuments(new SplittingQuery(partitionNumber, field, rangesArr, hashRouter, splitKey, docsToDeleteCache));
+          t.pause();
+        } else {
+          // This removes deletions but optimize might still be needed because sub-shards will have the same number of segments as the parent shard.
+          t = timings.sub("addIndexes");
+          t.resume();
+          for (int segmentNumber = 0; segmentNumber<leaves.size(); segmentNumber++) {
+            log.info("SolrIndexSplitter: partition #" + partitionNumber + " partitionCount=" + numPieces + (ranges != null ? " range=" + ranges.get(partitionNumber) : "") + " segment #"+segmentNumber + " segmentCount=" + leaves.size());
+            CodecReader subReader = SlowCodecReaderWrapper.wrap(leaves.get(segmentNumber).reader());
+            iw.addIndexes(new LiveDocsReader(subReader, segmentDocSets.get(segmentNumber)[partitionNumber]));
+          }
+          t.pause();
         }
         // we commit explicitly instead of sending a CommitUpdateCommand through the processor chain
         // because the sub-shard cores will just ignore such a commit because the update log is not
         // in active state at this time.
         //TODO no commitUpdateCommand
         SolrIndexWriter.setCommitData(iw, -1);
+        t = timings.sub("subIWCommit");
+        t.resume();
         iw.commit();
+        t.pause();
         success = true;
       } finally {
         if (iwRef != null) {
           iwRef.decref();
         } else {
           if (success) {
+            t = timings.sub("subIWClose");
+            t.resume();
             iw.close();
+            t.pause();
           } else {
             IOUtils.closeWhileHandlingException(iw);
           }
+          if (splitMethod == SplitMethod.LINK) {
+            SolrCore subCore = cores.get(partitionNumber);
+            subCore.getDirectoryFactory().release(iw.getDirectory());
+          }
+        }
+      }
+    }
+    // all sub-indexes created ok
+    // when using hard-linking switch directories & refresh cores
+    if (splitMethod == SplitMethod.LINK && cores != null) {
+      boolean switchOk = true;
+      t = timings.sub("switchSubIndexes");
+      for (int partitionNumber = 0; partitionNumber < numPieces; partitionNumber++) {
+        SolrCore subCore = cores.get(partitionNumber);
+        String indexDirPath = subCore.getIndexDir();
+
+        log.debug("Switching directories");
+        String hardLinkPath = subCore.getDataDir() + INDEX_PREFIX + timestamp;
+        subCore.modifyIndexProps(INDEX_PREFIX + timestamp);
+        try {
+          subCore.getUpdateHandler().newIndexWriter(false);
+          openNewSearcher(subCore);
+        } catch (Exception e) {
+          log.error("Failed to switch sub-core " + indexDirPath + " to " + hardLinkPath + ", split will fail.", e);
+          switchOk = false;
+          break;
         }
       }
+      t.stop();
+      if (!switchOk) {
+        t = timings.sub("rollbackSubIndexes");
+        // rollback the switch
+        for (int partitionNumber = 0; partitionNumber < numPieces; partitionNumber++) {
+          SolrCore subCore = cores.get(partitionNumber);
+          Directory dir = null;
+          try {
+            dir = subCore.getDirectoryFactory().get(subCore.getDataDir(), DirectoryFactory.DirContext.META_DATA,
+                subCore.getSolrConfig().indexConfig.lockType);
+            dir.deleteFile(IndexFetcher.INDEX_PROPERTIES);
+          } finally {
+            if (dir != null) {
+              subCore.getDirectoryFactory().release(dir);
+            }
+          }
+          // switch back if necessary and remove the hardlinked dir
+          String hardLinkPath = subCore.getDataDir() + INDEX_PREFIX + timestamp;
+          try {
+            dir = subCore.getDirectoryFactory().get(hardLinkPath, DirectoryFactory.DirContext.DEFAULT,
+                subCore.getSolrConfig().indexConfig.lockType);
+            subCore.getDirectoryFactory().doneWithDirectory(dir);
+            subCore.getDirectoryFactory().remove(dir);
+          } finally {
+            if (dir != null) {
+              subCore.getDirectoryFactory().release(dir);
+            }
+          }
+          subCore.getUpdateHandler().newIndexWriter(false);
+          try {
+            openNewSearcher(subCore);
+          } catch (Exception e) {
+            log.warn("Error rolling back failed split of " + hardLinkPath, e);
+          }
+        }
+        t.stop();
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "There were errors during index split");
+      } else {
+        // complete the switch - remove original index
+        t = timings.sub("cleanSubIndex");
+        for (int partitionNumber = 0; partitionNumber < numPieces; partitionNumber++) {
+          SolrCore subCore = cores.get(partitionNumber);
+          String oldIndexPath = subCore.getDataDir() + "index";
+          Directory indexDir = null;
+          try {
+            indexDir = subCore.getDirectoryFactory().get(oldIndexPath,
+                DirectoryFactory.DirContext.DEFAULT, subCore.getSolrConfig().indexConfig.lockType);
+            subCore.getDirectoryFactory().doneWithDirectory(indexDir);
+            subCore.getDirectoryFactory().remove(indexDir);
+          } finally {
+            if (indexDir != null) {
+              subCore.getDirectoryFactory().release(indexDir);
+            }
+          }
+        }
+        t.stop();
+      }
+    }
+  }
+
+  private void openNewSearcher(SolrCore core) throws Exception {
+    Future[] waitSearcher = new Future[1];
+    core.getSearcher(true, false, waitSearcher, true);
+    if (waitSearcher[0] != null) {
+      waitSearcher[0].get();
+    }
+  }
+
+  private class SplittingQuery extends Query {
+    private final int partition;
+    private final SchemaField field;
+    private final DocRouter.Range[] rangesArr;
+    private final HashBasedRouter hashRouter;
+    private final String splitKey;
+    private final Map<IndexReader.CacheKey, FixedBitSet[]> docsToDelete;
+
+    SplittingQuery(int partition, SchemaField field, DocRouter.Range[] rangesArr, HashBasedRouter hashRouter, String splitKey,
+                   Map<IndexReader.CacheKey, FixedBitSet[]> docsToDelete) {
+      this.partition = partition;
+      this.field = field;
+      this.rangesArr = rangesArr;
+      this.hashRouter = hashRouter;
+      this.splitKey = splitKey;
+      this.docsToDelete = docsToDelete;
+    }
+
+    @Override
+    public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException {
+      return new ConstantScoreWeight(this, boost) {
+
+        @Override
+        public Scorer scorer(LeafReaderContext context) throws IOException {
+          RTimerTree t = timings.sub("findDocsToDelete");
+          t.resume();
+          FixedBitSet set = findDocsToDelete(context);
+          t.pause();
+          log.info("### partition=" + partition + ", leaf=" + context + ", maxDoc=" + context.reader().maxDoc() +
+          ", numDels=" + context.reader().numDeletedDocs() + ", setLen=" + set.length() + ", setCard=" + set.cardinality());
+          Bits liveDocs = context.reader().getLiveDocs();
+          if (liveDocs != null) {
+            // check that we don't delete already deleted docs
+            FixedBitSet dels = FixedBitSet.copyOf(liveDocs);
+            dels.flip(0, dels.length());
+            dels.and(set);
+            if (dels.cardinality() > 0) {
+              log.error("### INVALID DELS " + dels.cardinality());
+            }
+          }
+          return new ConstantScoreScorer(this, score(), new BitSetIterator(set, set.length()));
+        }
 
+        @Override
+        public boolean isCacheable(LeafReaderContext ctx) {
+          return false;
+        }
+
+        @Override
+        public String toString() {
+          return "weight(shardSplittingQuery,part" + partition + ")";
+        }
+      };
     }
 
+    private FixedBitSet findDocsToDelete(LeafReaderContext readerContext) throws IOException {
+      // check whether a cached copy of bitsets already exists for this reader
+      FixedBitSet[] perPartition = docsToDelete.get(readerContext.reader().getCoreCacheHelper().getKey());
+      if (perPartition != null) {
+        return perPartition[partition];
+      }
+      synchronized (docsToDelete) {
+        perPartition = docsToDelete.get(readerContext.reader().getCoreCacheHelper().getKey());
+        if (perPartition != null) {
+          return perPartition[partition];
+        }
+
+        perPartition = split(readerContext, numPieces, field, rangesArr, splitKey, hashRouter, true);
+        docsToDelete.put(readerContext.reader().getCoreCacheHelper().getKey(), perPartition);
+        return perPartition[partition];
+      }
+    }
+
+    @Override
+    public String toString(String field) {
+      return "shardSplittingQuery";
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null) {
+        return false;
+      }
+      if (this == obj) {
+        return true;
+      }
+      if (!(obj instanceof SplittingQuery)) {
+        return false;
+      }
+      SplittingQuery q = (SplittingQuery)obj;
+      return partition == q.partition;
+    }
+
+    @Override
+    public int hashCode() {
+      return partition;
+    }
   }
 
-  FixedBitSet[] split(LeafReaderContext readerContext) throws IOException {
+  static FixedBitSet[] split(LeafReaderContext readerContext, int numPieces, SchemaField field, DocRouter.Range[] rangesArr,
+                             String splitKey, HashBasedRouter hashRouter, boolean delete) throws IOException {
     LeafReader reader = readerContext.reader();
     FixedBitSet[] docSets = new FixedBitSet[numPieces];
     for (int i=0; i<docSets.length; i++) {
       docSets[i] = new FixedBitSet(reader.maxDoc());
+      if (delete) {
+        docSets[i].set(0, reader.maxDoc());
+      }
     }
     Bits liveDocs = reader.getLiveDocs();
+    if (liveDocs != null && delete) {
+      FixedBitSet liveDocsSet = FixedBitSet.copyOf(liveDocs);
+      for (FixedBitSet set : docSets) {
+        set.and(liveDocsSet);
+      }
+    }
 
     Terms terms = reader.terms(field.getName());
     TermsEnum termsEnum = terms==null ? null : terms.iterator();
@@ -172,11 +551,12 @@ public class SolrIndexSplitter {
     PostingsEnum postingsEnum = null;
 
     int[] docsMatchingRanges = null;
-    if (ranges != null) {
+    if (rangesArr != null) {
       // +1 because documents can belong to *zero*, one, several or all ranges in rangesArr
       docsMatchingRanges = new int[rangesArr.length+1];
     }
 
+    int partition = 0;
     CharsRefBuilder idRef = new CharsRefBuilder();
     for (;;) {
       term = termsEnum.next();
@@ -209,14 +589,22 @@ public class SolrIndexSplitter {
       for (;;) {
         int doc = postingsEnum.nextDoc();
         if (doc == DocIdSetIterator.NO_MORE_DOCS) break;
-        if (ranges == null) {
-          docSets[currPartition].set(doc);
-          currPartition = (currPartition + 1) % numPieces;
+        if (rangesArr == null) {
+          if (delete) {
+            docSets[partition].clear(doc);
+          } else {
+            docSets[partition].set(doc);
+          }
+          partition = (partition + 1) % numPieces;
         } else  {
           int matchingRangesCount = 0;
           for (int i=0; i<rangesArr.length; i++) {      // inner-loop: use array here for extra speed.
             if (rangesArr[i].includes(hash)) {
-              docSets[i].set(doc);
+              if (delete) {
+                docSets[i].clear(doc);
+              } else {
+                docSets[i].set(doc);
+              }
               ++matchingRangesCount;
             }
           }
@@ -256,12 +644,10 @@ public class SolrIndexSplitter {
     if (idx <= 0) return null;
     String part1 = idString.substring(0, idx);
     int commaIdx = part1.indexOf(CompositeIdRouter.bitsSeparator);
-    if (commaIdx > 0) {
-      if (commaIdx + 1 < part1.length())  {
-        char ch = part1.charAt(commaIdx + 1);
-        if (ch >= '0' && ch <= '9') {
-          part1 = part1.substring(0, commaIdx);
-        }
+    if (commaIdx > 0 && commaIdx + 1 < part1.length())  {
+      char ch = part1.charAt(commaIdx + 1);
+      if (ch >= '0' && ch <= '9') {
+        part1 = part1.substring(0, commaIdx);
       }
     }
     return part1;
@@ -273,7 +659,7 @@ public class SolrIndexSplitter {
     final FixedBitSet liveDocs;
     final int numDocs;
 
-    public LiveDocsReader(CodecReader in, FixedBitSet liveDocs) throws IOException {
+    public LiveDocsReader(CodecReader in, FixedBitSet liveDocs) {
       super(in);
       this.liveDocs = liveDocs;
       this.numDocs = liveDocs.cardinality();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java b/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java
index eaa1e59..7ea8a2a 100644
--- a/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java
+++ b/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java
@@ -19,6 +19,7 @@ package org.apache.solr.update;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
 
 import java.util.List;
 
@@ -29,22 +30,26 @@ import java.util.List;
  *
  */
 public class SplitIndexCommand extends UpdateCommand {
-  // public List<Directory> dirs;
-  public List<String> paths;
-  public List<SolrCore> cores;  // either paths or cores should be specified
-  public List<DocRouter.Range> ranges;
-  public DocRouter router;
-  public String routeFieldName;
-  public String splitKey;
+  public final SolrQueryResponse rsp;
+  public final List<String> paths;
+  public final List<SolrCore> cores;  // either paths or cores should be specified
+  public final List<DocRouter.Range> ranges;
+  public final DocRouter router;
+  public final String routeFieldName;
+  public final String splitKey;
+  public final SolrIndexSplitter.SplitMethod splitMethod;
 
-  public SplitIndexCommand(SolrQueryRequest req, List<String> paths, List<SolrCore> cores, List<DocRouter.Range> ranges, DocRouter router, String routeFieldName, String splitKey) {
+  public SplitIndexCommand(SolrQueryRequest req, SolrQueryResponse rsp, List<String> paths, List<SolrCore> cores, List<DocRouter.Range> ranges,
+                           DocRouter router, String routeFieldName, String splitKey, SolrIndexSplitter.SplitMethod splitMethod) {
     super(req);
+    this.rsp = rsp;
     this.paths = paths;
     this.cores = cores;
     this.ranges = ranges;
     this.router = router;
     this.routeFieldName = routeFieldName;
     this.splitKey = splitKey;
+    this.splitMethod = splitMethod;
   }
 
   @Override
@@ -65,6 +70,7 @@ public class SplitIndexCommand extends UpdateCommand {
     if (splitKey != null) {
       sb.append(",split.key=" + splitKey);
     }
+    sb.append(",method=" + splitMethod.toLower());
     sb.append('}');
     return sb.toString();
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
index 22862b4..dd5f9f3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
@@ -134,7 +134,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
       killerThread.start();
       killCounter.incrementAndGet();
 
-      splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, null, null);
+      splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, null, null, false);
 
       log.info("Layout after split: \n");
       printLayout();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
index 92fd4d5..f6ee7b4 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
@@ -62,7 +62,10 @@ import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.Utils;
+import org.apache.solr.update.SolrIndexSplitter;
+import org.apache.solr.util.LogLevel;
 import org.apache.solr.util.TestInjection;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -72,6 +75,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
 
+@LogLevel("org.apache.solr.cloud.api.collections=DEBUG")
 @Slow
 public class ShardSplitTest extends BasicDistributedZkTest {
 
@@ -116,14 +120,24 @@ public class ShardSplitTest extends BasicDistributedZkTest {
   Creates a collection with replicationFactor=1, splits a shard. Restarts the sub-shard leader node.
   Add a replica. Ensure count matches in leader and replica.
    */
+  @Test
   public void testSplitStaticIndexReplication() throws Exception {
+    doSplitStaticIndexReplication(SolrIndexSplitter.SplitMethod.REWRITE);
+  }
+
+  @Test
+  public void testSplitStaticIndexReplicationLink() throws Exception {
+    doSplitStaticIndexReplication(SolrIndexSplitter.SplitMethod.LINK);
+  }
+
+  private void doSplitStaticIndexReplication(SolrIndexSplitter.SplitMethod splitMethod) throws Exception {
     waitForThingsToLevelOut(15);
 
     DocCollection defCol = cloudClient.getZkStateReader().getClusterState().getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
     Replica replica = defCol.getReplicas().get(0);
     String nodeName = replica.getNodeName();
 
-    String collectionName = "testSplitStaticIndexReplication";
+    String collectionName = "testSplitStaticIndexReplication_" + splitMethod.toLower();
     CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 1);
     create.setMaxShardsPerNode(5); // some high number so we can create replicas without hindrance
     create.setCreateNodeSet(nodeName); // we want to create the leader on a fixed node so that we know which one to restart later
@@ -141,6 +155,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
 
         CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName);
         splitShard.setShardName(SHARD1);
+        splitShard.setSplitMethod(splitMethod.toLower());
         String asyncId = splitShard.processAsync(client);
         RequestStatusState state = CollectionAdminRequest.requestStatus(asyncId).waitFor(client, 120);
         if (state == RequestStatusState.COMPLETED)  {
@@ -351,16 +366,31 @@ public class ShardSplitTest extends BasicDistributedZkTest {
 
   @Test
   public void testSplitMixedReplicaTypes() throws Exception {
+    doSplitMixedReplicaTypes(SolrIndexSplitter.SplitMethod.REWRITE);
+  }
+
+  @Test
+  public void testSplitMixedReplicaTypesLink() throws Exception {
+    doSplitMixedReplicaTypes(SolrIndexSplitter.SplitMethod.LINK);
+  }
+
+  private void doSplitMixedReplicaTypes(SolrIndexSplitter.SplitMethod splitMethod) throws Exception {
     waitForThingsToLevelOut(15);
-    String collectionName = "testSplitMixedReplicaTypes";
+    String collectionName = "testSplitMixedReplicaTypes_" + splitMethod.toLower();
     CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 2, 2, 2);
     create.setMaxShardsPerNode(5); // some high number so we can create replicas without hindrance
     create.process(cloudClient);
     waitForRecoveriesToFinish(collectionName, false);
 
+    for (int i = 0; i < 100; i++) {
+      cloudClient.add(collectionName, getDoc("id", "id-" + i, "foo_s", "bar " + i));
+    }
+    cloudClient.commit(collectionName);
+
     CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName);
     splitShard.setShardName(SHARD1);
-    splitShard.process(cloudClient);
+    splitShard.setSplitMethod(splitMethod.toLower());
+    CollectionAdminResponse rsp = splitShard.process(cloudClient);
     waitForThingsToLevelOut(15);
 
     cloudClient.getZkStateReader().forceUpdateCollection(collectionName);
@@ -393,7 +423,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     assertEquals("actual PULL", numPull, actualPull.get());
   }
 
-    @Test
+  @Test
   @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
   public void testSplitWithChaosMonkey() throws Exception {
     waitForThingsToLevelOut(15);
@@ -600,6 +630,15 @@ public class ShardSplitTest extends BasicDistributedZkTest {
 
   @Test
   public void testSplitShardWithRule() throws Exception {
+    doSplitShardWithRule(SolrIndexSplitter.SplitMethod.LINK);
+  }
+
+  @Test
+  public void testSplitShardWithRuleLink() throws Exception {
+    doSplitShardWithRule(SolrIndexSplitter.SplitMethod.LINK);
+  }
+
+  private void doSplitShardWithRule(SolrIndexSplitter.SplitMethod splitMethod) throws Exception {
     waitForThingsToLevelOut(15);
 
     if (usually()) {
@@ -609,14 +648,14 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     }
 
     log.info("Starting testSplitShardWithRule");
-    String collectionName = "shardSplitWithRule";
+    String collectionName = "shardSplitWithRule_" + splitMethod.toLower();
     CollectionAdminRequest.Create createRequest = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 2)
         .setRule("shard:*,replica:<2,node:*");
     CollectionAdminResponse response = createRequest.process(cloudClient);
     assertEquals(0, response.getStatus());
 
     CollectionAdminRequest.SplitShard splitShardRequest = CollectionAdminRequest.splitShard(collectionName)
-        .setShardName("shard1");
+        .setShardName("shard1").setSplitMethod(splitMethod.toLower());
     response = splitShardRequest.process(cloudClient);
     assertEquals(String.valueOf(response.getErrorMessages()), 0, response.getStatus());
   }
@@ -633,7 +672,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     // test with only one range
     subRanges.add(ranges.get(0));
     try {
-      splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null);
+      splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null, false);
       fail("Shard splitting with just one custom hash range should not succeed");
     } catch (HttpSolrClient.RemoteSolrException e) {
       log.info("Expected exception:", e);
@@ -644,7 +683,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     subRanges.add(ranges.get(3)); // order shouldn't matter
     subRanges.add(ranges.get(0));
     try {
-      splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null);
+      splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null, false);
       fail("Shard splitting with missing hashes in between given ranges should not succeed");
     } catch (HttpSolrClient.RemoteSolrException e) {
       log.info("Expected exception:", e);
@@ -657,7 +696,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     subRanges.add(ranges.get(2));
     subRanges.add(new DocRouter.Range(ranges.get(3).min - 15, ranges.get(3).max));
     try {
-      splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null);
+      splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null, false);
       fail("Shard splitting with overlapping ranges should not succeed");
     } catch (HttpSolrClient.RemoteSolrException e) {
       log.info("Expected exception:", e);
@@ -683,6 +722,9 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     final int[] docCounts = new int[ranges.size()];
     int numReplicas = shard1.getReplicas().size();
 
+    cloudClient.getZkStateReader().forceUpdateCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
+    clusterState = cloudClient.getZkStateReader().getClusterState();
+    log.debug("-- COLLECTION: {}", clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION));
     del("*:*");
     for (int id = 0; id <= 100; id++) {
       String shardKey = "" + (char)('a' + (id % 26)); // See comment in ShardRoutingTest for hash distribution
@@ -725,7 +767,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     try {
       for (int i = 0; i < 3; i++) {
         try {
-          splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null);
+          splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null, false);
           log.info("Layout after split: \n");
           printLayout();
           break;
@@ -807,7 +849,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
 
       for (int i = 0; i < 3; i++) {
         try {
-          splitShard(collectionName, SHARD1, null, null);
+          splitShard(collectionName, SHARD1, null, null, false);
           break;
         } catch (HttpSolrClient.RemoteSolrException e) {
           if (e.code() != 500) {
@@ -889,7 +931,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
 
       for (int i = 0; i < 3; i++) {
         try {
-          splitShard(collectionName, null, null, splitKey);
+          splitShard(collectionName, null, null, splitKey, false);
           break;
         } catch (HttpSolrClient.RemoteSolrException e) {
           if (e.code() != 500) {
@@ -992,9 +1034,11 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     }
   }
 
-  protected void splitShard(String collection, String shardId, List<DocRouter.Range> subRanges, String splitKey) throws SolrServerException, IOException {
+  protected void splitShard(String collection, String shardId, List<DocRouter.Range> subRanges, String splitKey, boolean offline) throws SolrServerException, IOException {
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set("action", CollectionParams.CollectionAction.SPLITSHARD.toString());
+    params.set("timing", "true");
+    params.set("offline", String.valueOf(offline));
     params.set("collection", collection);
     if (shardId != null)  {
       params.set("shard", shardId);
@@ -1019,7 +1063,8 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
 
     try (HttpSolrClient baseServer = getHttpSolrClient(baseUrl, 30000, 60000 * 5)) {
-      baseServer.request(request);
+      NamedList<Object> rsp = baseServer.request(request);
+      log.info("Shard split response: " + Utils.toJSONString(rsp));
     }
   }