You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2013/04/08 17:19:42 UTC

svn commit: r1465663 - in /lucene/dev/branches/branch_4x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/cloud/ solr/core/src/java/org/apache/solr/handler/admin/ solr/core/src/java/org/apache/solr/servlet/ solr/core/src/java/org/apache/solr/upd...

Author: shalin
Date: Mon Apr  8 15:19:41 2013
New Revision: 1465663

URL: http://svn.apache.org/r1465663
Log:
SOLR-3755: A new collections api to add additional shards dynamically by splitting existing shards

Added:
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
      - copied unchanged from r1465661, lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
      - copied unchanged from r1465661, lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
Modified:
    lucene/dev/branches/branch_4x/   (props changed)
    lucene/dev/branches/branch_4x/solr/   (props changed)
    lucene/dev/branches/branch_4x/solr/CHANGES.txt   (contents, props changed)
    lucene/dev/branches/branch_4x/solr/core/   (props changed)
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/AssignShard.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/SliceStateUpdateTest.java
    lucene/dev/branches/branch_4x/solr/solrj/   (props changed)
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/HashBasedRouter.java
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java

Modified: lucene/dev/branches/branch_4x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/CHANGES.txt?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/solr/CHANGES.txt Mon Apr  8 15:19:41 2013
@@ -111,6 +111,9 @@ New Features
   and add additional configuration parameters. See the javadocs for more details and
   examples. (Robert Muir)
 
+* SOLR-3755: A new collections api to add additional shards dynamically by splitting
+  existing shards. (yonik, Anshum Gupta, shalin)
+
 Bug Fixes
 ----------------------
 

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/AssignShard.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/AssignShard.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/AssignShard.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/AssignShard.java Mon Apr  8 15:19:41 2013
@@ -39,7 +39,7 @@ public class AssignShard {
       numShards = 1;
     }
     String returnShardId = null;
-    Map<String, Slice> sliceMap = state.getSlicesMap(collection);
+    Map<String, Slice> sliceMap = state.getActiveSlicesMap(collection);
 
 
     // TODO: now that we create shards ahead of time, is this code needed?  Esp since hash ranges aren't assigned when creating via this method?

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java Mon Apr  8 15:19:41 2013
@@ -18,6 +18,7 @@ package org.apache.solr.cloud;
  */
 
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.params.SolrParams;
 
 public class CloudDescriptor {
@@ -28,6 +29,11 @@ public class CloudDescriptor {
   private Integer numShards;
   private String nodeName = null;
 
+  /* shardRange and shardState are used once-only during sub shard creation for shard splits
+   * Use the values from {@link Slice} instead */
+  volatile String shardRange = null;
+  volatile String shardState = Slice.ACTIVE;
+
   volatile boolean isLeader = false;
   volatile String lastPublished = ZkStateReader.ACTIVE;
   
@@ -89,4 +95,19 @@ public class CloudDescriptor {
     this.nodeName = nodeName;
   }
 
+  public String getShardRange() {
+    return shardRange;
+  }
+
+  public void setShardRange(String shardRange) {
+    this.shardRange = shardRange;
+  }
+
+  public String getShardState() {
+    return shardState;
+  }
+
+  public void setShardState(String shardState) {
+    this.shardState = shardState;
+  }
 }

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Mon Apr  8 15:19:41 2013
@@ -268,6 +268,10 @@ final class ShardLeaderElectionContext e
         .getClusterState();
     Map<String,Slice> slices = clusterState.getSlicesMap(collection);
     Slice slice = slices.get(shardId);
+    if (!slice.getState().equals(Slice.ACTIVE)) {
+      //Return false if the Slice is not active yet.
+      return false;
+    }
     Map<String,Replica> replicasMap = slice.getReplicasMap();
     for (Map.Entry<String,Replica> shard : replicasMap.entrySet()) {
       String state = shard.getValue().getStr(ZkStateReader.STATE_PROP);

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java Mon Apr  8 15:19:41 2013
@@ -19,6 +19,7 @@ package org.apache.solr.cloud;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -27,6 +28,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClosableThread;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
@@ -190,6 +192,10 @@ public class Overseer {
             message.getStr(ZkStateReader.SHARD_ID_PROP),
             sb.length() > 0 ? sb.toString() : null);
 
+      } else if ("createshard".equals(operation)) {
+        clusterState = createShard(clusterState, message);
+      } else if ("updateshardstate".equals(operation))  {
+        clusterState = updateShardState(clusterState, message);
       } else {
         throw new RuntimeException("unknown operation:" + operation
             + " contents:" + message.getProperties());
@@ -197,6 +203,46 @@ public class Overseer {
       return clusterState;
     }
       
+    private ClusterState updateShardState(ClusterState clusterState, ZkNodeProps message) {
+      String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
+      log.info("Update shard state invoked for collection: " + collection);
+      for (String key : message.keySet()) {
+        if (ZkStateReader.COLLECTION_PROP.equals(key)) continue;
+        if (QUEUE_OPERATION.equals(key)) continue;
+
+        Slice slice = clusterState.getSlice(collection, key);
+        if (slice == null)  {
+          throw new RuntimeException("Overseer.updateShardState unknown collection: " + collection + " slice: " + key);
+        }
+        log.info("Update shard state " + key + " to " + message.getStr(key));
+        Map<String, Object> props = slice.shallowCopy();
+        props.put(Slice.STATE, message.getStr(key));
+        Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props);
+        clusterState = updateSlice(clusterState, collection, newSlice);
+      }
+
+      return clusterState;
+    }
+
+    private ClusterState createShard(ClusterState clusterState, ZkNodeProps message) {
+      String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
+      String shardId = message.getStr(ZkStateReader.SHARD_ID_PROP);
+      Slice slice = clusterState.getSlice(collection, shardId);
+      if (slice == null)  {
+        Map<String, Replica> replicas = Collections.EMPTY_MAP;
+        Map<String, Object> sliceProps = new HashMap<String, Object>();
+        String shardRange = message.getStr(ZkStateReader.SHARD_RANGE_PROP);
+        String shardState = message.getStr(ZkStateReader.SHARD_STATE_PROP);
+        sliceProps.put(Slice.RANGE, shardRange);
+        sliceProps.put(Slice.STATE, shardState);
+        slice = new Slice(shardId, replicas, sliceProps);
+        clusterState = updateSlice(clusterState, collection, slice);
+      } else  {
+        log.error("Unable to create Shard: " + shardId + " because it already exists in collection: " + collection);
+      }
+      return clusterState;
+    }
+
       private boolean amILeader() {
         try {
           ZkNodeProps props = ZkNodeProps.load(zkClient.getData("/overseer_elect/leader", null, null, true));
@@ -211,6 +257,7 @@ public class Overseer {
         log.info("According to ZK I (id=" + myId + ") am no longer a leader.");
         return false;
       }
+    
       /**
        * Try to assign core to the cluster. 
        */
@@ -247,15 +294,24 @@ public class Overseer {
             log.info("Collection already exists with " + ZkStateReader.NUM_SHARDS_PROP + "=" + numShards);
           }
           sliceName = AssignShard.assignShard(collection, state, numShards);
-          log.info("Assigning new node to shard shard=" + sliceName);
+          log.info("Assigning new node to shard=" + sliceName);
         }
 
         Slice slice = state.getSlice(collection, sliceName);
+        
         Map<String,Object> replicaProps = new LinkedHashMap<String,Object>();
 
         replicaProps.putAll(message.getProperties());
         // System.out.println("########## UPDATE MESSAGE: " + JSONUtil.toJSON(message));
         if (slice != null) {
+          String sliceState = slice.getState();
+          
+          // throw an exception if the slice is not yet active.
+
+          //if(!sliceState.equals(Slice.ACTIVE)) {
+          //  throw new SolrException(ErrorCode.BAD_REQUEST, "Can not assign core to a non-active slice [" + slice.getName() + "]");
+          //}
+          
           Replica oldReplica = slice.getReplicasMap().get(coreNodeName);
           if (oldReplica != null && oldReplica.containsKey(ZkStateReader.LEADER_PROP)) {
             replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP));
@@ -278,6 +334,9 @@ public class Overseer {
             replicaProps.remove(removeKey);
           }
           replicaProps.remove(ZkStateReader.CORE_NODE_NAME_PROP);
+          // remove shard specific properties
+          String shardRange = (String) replicaProps.remove(ZkStateReader.SHARD_RANGE_PROP);
+          String shardState = (String) replicaProps.remove(ZkStateReader.SHARD_STATE_PROP);
 
 
           Replica replica = new Replica(coreNodeName, replicaProps);
@@ -292,6 +351,9 @@ public class Overseer {
             replicas = slice.getReplicasCopy();
           } else {
             replicas = new HashMap<String, Replica>(1);
+            sliceProps = new HashMap<String, Object>();
+            sliceProps.put(Slice.RANGE, shardRange);
+            sliceProps.put(Slice.STATE, shardState);
           }
 
           replicas.put(replica.getName(), replica);
@@ -399,7 +461,11 @@ public class Overseer {
 
         Slice slice = slices.get(sliceName);
         if (slice == null) {
-          log.error("Could not mark leader for non existing slice:" + sliceName);
+          slice = coll.getSlice(sliceName);
+        }
+
+        if (slice == null) {
+          log.error("Could not mark leader for non existing/active slice:" + sliceName);
           return state;
         } else {
           // TODO: consider just putting the leader property on the shard, not on individual replicas

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Mon Apr  8 15:19:41 2013
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
 import org.apache.solr.cloud.DistributedQueue.QueueEvent;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
@@ -32,6 +33,8 @@ import org.apache.solr.common.cloud.Alia
 import org.apache.solr.common.cloud.ClosableThread;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.PlainIdRouter;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -70,6 +73,8 @@ public class OverseerCollectionProcessor
   
   public static final String DELETEALIAS = "deletealias";
   
+  public static final String SPLITSHARD = "splitshard";
+
   // TODO: use from Overseer?
   private static final String QUEUE_OPERATION = "operation";
   
@@ -171,6 +176,8 @@ public class OverseerCollectionProcessor
         createAlias(zkStateReader.getAliases(), message);
       } else if (DELETEALIAS.equals(operation)) {
         deleteAlias(zkStateReader.getAliases(), message);
+      } else if (SPLITSHARD.equals(operation))  {
+        splitShard(zkStateReader.getClusterState(), message, results);
       } else {
         throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
             + operation);
@@ -290,6 +297,265 @@ public class OverseerCollectionProcessor
     
   }
   
+  private boolean splitShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
+    log.info("Split shard invoked");
+    String collection = message.getStr("collection");
+    String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
+    Slice parentSlice = clusterState.getSlice(collection, slice);
+    
+    if (parentSlice == null) {
+      if(clusterState.getCollections().contains(collection)) {
+        throw new SolrException(ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice);
+      } else {
+        throw new SolrException(ErrorCode.BAD_REQUEST, "No collection with the specified name exists: " + collection);
+      }      
+    }
+    
+    // find the leader for the shard
+    Replica parentShardLeader = clusterState.getLeader(collection, slice);
+    
+    DocRouter.Range range = parentSlice.getRange();
+    if (range == null) {
+      range = new PlainIdRouter().fullRange();
+    }
+
+    // todo: fixed to two partitions?
+    // todo: accept the range as a param to api?
+    // todo: handle randomizing subshard name in case a shard with the same name already exists.
+    List<DocRouter.Range> subRanges = new PlainIdRouter().partitionRange(2, range);
+    try {
+      List<String> subSlices = new ArrayList<String>(subRanges.size());
+      List<String> subShardNames = new ArrayList<String>(subRanges.size());
+      String nodeName = parentShardLeader.getNodeName();
+      for (int i = 0; i < subRanges.size(); i++) {
+        String subSlice = slice + "_" + i;
+        subSlices.add(subSlice);
+        String subShardName = collection + "_" + subSlice + "_replica1";
+        subShardNames.add(subShardName);
+
+        Slice oSlice = clusterState.getSlice(collection, subSlice);
+        if (oSlice != null) {
+          if (Slice.ACTIVE.equals(oSlice.getState())) {
+            throw new SolrException(ErrorCode.BAD_REQUEST, "Sub-shard: " + subSlice + " exists in active state. Aborting split shard.");
+          } else if (Slice.CONSTRUCTION.equals(oSlice.getState()))  {
+            for (Replica replica : oSlice.getReplicas()) {
+              String core = replica.getStr("core");
+              log.info("Unloading core: " + core + " from node: " + replica.getNodeName());
+              ModifiableSolrParams params = new ModifiableSolrParams();
+              params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
+              params.set(CoreAdminParams.CORE, core);
+              sendShardRequest(replica.getNodeName(), params);
+            }
+          }
+        }
+      }
+
+      ShardResponse srsp;
+      do {
+        srsp = shardHandler.takeCompletedOrError();
+        if (srsp != null) {
+          processResponse(results, srsp);
+        }
+      } while (srsp != null);
+
+      for (int i=0; i<subRanges.size(); i++)  {
+        String subSlice = subSlices.get(i);
+        String subShardName = subShardNames.get(i);
+        DocRouter.Range subRange = subRanges.get(i);
+
+        log.info("Creating shard " + subShardName + " as part of slice "
+            + subSlice + " of collection " + collection + " on "
+            + nodeName);
+
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
+
+        params.set(CoreAdminParams.NAME, subShardName);
+        params.set(CoreAdminParams.COLLECTION, collection);
+        params.set(CoreAdminParams.SHARD, subSlice);
+        params.set(CoreAdminParams.SHARD_RANGE, subRange.toString());
+        params.set(CoreAdminParams.SHARD_STATE, Slice.CONSTRUCTION);
+        //params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices); todo: is it necessary, we're not creating collections?
+
+        sendShardRequest(nodeName, params);
+
+        // wait for parent leader to acknowledge the sub-shard core
+        log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
+        CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
+        cmd.setCoreName(subShardName);
+        cmd.setNodeName(nodeName);
+        cmd.setCoreNodeName(nodeName + "_" + subShardName);
+        cmd.setState(ZkStateReader.ACTIVE);
+        cmd.setCheckLive(true);
+        cmd.setOnlyIfLeader(true);
+        sendShardRequest(nodeName, new ModifiableSolrParams(cmd.getParams()));
+      }
+
+      do {
+        srsp = shardHandler.takeCompletedOrError();
+        if (srsp != null) {
+          processResponse(results, srsp);
+        }
+      } while (srsp != null);
+      
+      log.info("Successfully created all sub-shards for collection "
+          + collection + " parent shard: " + slice + " on: " + parentShardLeader);
+
+      log.info("Splitting shard " + parentShardLeader.getName() + " as part of slice "
+          + slice + " of collection " + collection + " on "
+          + parentShardLeader);
+
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set(CoreAdminParams.ACTION, CoreAdminAction.SPLIT.toString());
+      params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core"));
+      for (int i = 0; i < subShardNames.size(); i++) {
+        String subShardName = subShardNames.get(i);
+        params.add(CoreAdminParams.TARGET_CORE, subShardName);
+      }
+
+      sendShardRequest(parentShardLeader.getNodeName(), params);
+      do {
+        srsp = shardHandler.takeCompletedOrError();
+        if (srsp != null) {
+          processResponse(results, srsp);
+        }
+      } while (srsp != null);
+
+      log.info("Index on shard: " + nodeName + " split into two successfully");
+
+      // apply buffered updates on sub-shards
+      for (int i = 0; i < subShardNames.size(); i++) {
+        String subShardName = subShardNames.get(i);
+
+        log.info("Applying buffered updates on : " + subShardName);
+
+        params = new ModifiableSolrParams();
+        params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
+        params.set(CoreAdminParams.NAME, subShardName);
+
+        sendShardRequest(nodeName, params);
+      }
+
+      do {
+        srsp = shardHandler.takeCompletedOrError();
+        if (srsp != null) {
+          processResponse(results, srsp);
+        }
+      } while (srsp != null);
+
+      log.info("Successfully applied buffered updates on : " + subShardNames);
+
+      // Replica creation for the new Slices
+
+      // look at the replication factor and see if it matches reality
+      // if it does not, find best nodes to create more cores
+
+      // TODO: Have replication factor decided in some other way instead of numShards for the parent
+
+      int repFactor = clusterState.getSlice(collection, slice).getReplicas().size();
+
+      // we need to look at every node and see how many cores it serves
+      // add our new cores to existing nodes serving the least number of cores
+      // but (for now) require that each core goes on a distinct node.
+
+      // TODO: add smarter options that look at the current number of cores per
+      // node?
+      // for now we just go random
+      Set<String> nodes = clusterState.getLiveNodes();
+      List<String> nodeList = new ArrayList<String>(nodes.size());
+      nodeList.addAll(nodes);
+      
+      Collections.shuffle(nodeList);
+
+      // TODO: Have maxShardsPerNode param for this operation?
+
+      // Remove the node that hosts the parent shard for replica creation.
+      nodeList.remove(nodeName);
+      
+      // TODO: change this to handle sharding a slice into > 2 sub-shards.
+
+      for (int i = 1; i <= subSlices.size(); i++) {
+        Collections.shuffle(nodeList);
+        String sliceName = subSlices.get(i - 1);
+        for (int j = 2; j <= repFactor; j++) {
+          String subShardNodeName = nodeList.get((repFactor * (i - 1) + (j - 2)) % nodeList.size());
+          String shardName = collection + "_" + sliceName + "_replica" + (j);
+
+          log.info("Creating replica shard " + shardName + " as part of slice "
+              + sliceName + " of collection " + collection + " on "
+              + subShardNodeName);
+
+          // Need to create new params for each request
+          params = new ModifiableSolrParams();
+          params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
+
+          params.set(CoreAdminParams.NAME, shardName);
+          params.set(CoreAdminParams.COLLECTION, collection);
+          params.set(CoreAdminParams.SHARD, sliceName);
+          // TODO:  Figure the config used by the parent shard and use it.
+          //params.set("collection.configName", configName);
+          
+          //Not using this property. Do we really need to use it?
+          //params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
+
+          sendShardRequest(subShardNodeName, params);
+
+          // wait for the replicas to be seen as active on sub shard leader
+          log.info("Asking sub shard leader to wait for: " + shardName + " to be alive on: " + subShardNodeName);
+          CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
+          cmd.setCoreName(subShardNames.get(i-1));
+          cmd.setNodeName(subShardNodeName);
+          cmd.setCoreNodeName(subShardNodeName + "_" + shardName);
+          cmd.setState(ZkStateReader.ACTIVE);
+          cmd.setCheckLive(true);
+          cmd.setOnlyIfLeader(true);
+          sendShardRequest(nodeName, new ModifiableSolrParams(cmd.getParams()));
+        }
+      }
+
+      do {
+        srsp = shardHandler.takeCompletedOrError();
+        if (srsp != null) {
+          processResponse(results, srsp);
+        }
+      } while (srsp != null);
+      log.info("Successfully created all replica shards for all sub-slices "
+          + subSlices);
+
+      log.info("Requesting update shard state");
+      DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
+      Map<String, Object> propMap = new HashMap<String, Object>();
+      propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
+      propMap.put(slice, Slice.INACTIVE);
+      for (String subSlice : subSlices) {
+        propMap.put(subSlice, Slice.ACTIVE);
+      }
+      propMap.put(ZkStateReader.COLLECTION_PROP, collection);
+      ZkNodeProps m = new ZkNodeProps(propMap);
+      inQueue.offer(ZkStateReader.toJSON(m));
+
+      return true;
+    } catch (SolrException e) {
+      throw e;
+    } catch (Exception e) {
+      log.error("Error executing split operation for collection: " + collection + " parent shard: " + slice, e);
+      throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
+    }
+  }
+
+  private void sendShardRequest(String nodeName, ModifiableSolrParams params) {
+    ShardRequest sreq = new ShardRequest();
+    params.set("qt", adminPath);
+    sreq.purpose = 1;
+    String replica = zkStateReader.getZkClient().getBaseUrlForNodeName(nodeName);
+    if (replica.startsWith("http://")) replica = replica.substring(7);
+    sreq.shards = new String[]{replica};
+    sreq.actualShards = sreq.shards;
+    sreq.params = params;
+
+    shardHandler.submit(sreq, replica, sreq.params);
+  }
+  
   private void createCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) {
     String collectionName = message.getStr("name");
     if (clusterState.getCollections().contains(collectionName)) {
@@ -384,7 +650,6 @@ public class OverseerCollectionProcessor
           params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
           
           ShardRequest sreq = new ShardRequest();
-          sreq.nodeName = nodeName;
           params.set("qt", adminPath);
           sreq.purpose = 1;
           String replica = zkStateReader.getZkClient()

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java Mon Apr  8 15:19:41 2013
@@ -792,6 +792,13 @@ public final class ZkController {
 
       UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
       if (!core.isReloaded() && ulog != null) {
+        // disable recovery in case shard is in construction state (for shard splits)
+        Slice slice = getClusterState().getSlice(collection, shardId);
+        if (Slice.CONSTRUCTION.equals(slice.getState())) {
+          core.getUpdateHandler().getUpdateLog().bufferUpdates();
+          publish(desc, ZkStateReader.ACTIVE);
+
+        } else  {
         Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler()
             .getUpdateLog().recoverFromLog();
         if (recoveryFuture != null) {
@@ -802,12 +809,13 @@ public final class ZkController {
         } else {
           log.info("No LogReplay needed for core="+core.getName() + " baseURL=" + baseUrl);
         }
-      }      
       boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
           collection, coreZkNodeName, shardId, leaderProps, core, cc);
       if (!didRecovery) {
         publish(desc, ZkStateReader.ACTIVE);
       }
+        }
+      }
     } finally {
       if (core != null) {
         core.close();
@@ -817,7 +825,6 @@ public final class ZkController {
     
     // make sure we have an update cluster state right away
     zkStateReader.updateClusterState(true);
-
     return shardId;
   }
 
@@ -993,6 +1000,8 @@ public final class ZkController {
         ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles(),
         ZkStateReader.NODE_NAME_PROP, getNodeName(),
         ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId(),
+        ZkStateReader.SHARD_RANGE_PROP, cd.getCloudDescriptor().getShardRange(),
+        ZkStateReader.SHARD_STATE_PROP, cd.getCloudDescriptor().getShardState(),
         ZkStateReader.COLLECTION_PROP, cd.getCloudDescriptor()
             .getCollectionName(),
         ZkStateReader.NUM_SHARDS_PROP, numShards != null ? numShards.toString()
@@ -1274,6 +1283,9 @@ public final class ZkController {
     // before becoming available, make sure we are not live and active
     // this also gets us our assigned shard id if it was not specified
     publish(cd, ZkStateReader.DOWN, false);
+    // shardState and shardRange are for one-time use only, thereafter the actual values in the Slice should be used
+    cd.getCloudDescriptor().setShardState(null);
+    cd.getCloudDescriptor().setShardRange(null);
     String coreNodeName = getCoreNodeName(cd);
     
     // make sure the node name is set on the descriptor

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Mon Apr  8 15:19:41 2013
@@ -132,9 +132,13 @@ public class CollectionsHandler extends 
         this.handleDeleteAliasAction(req, rsp);
         break;
       }
+        case SPLITSHARD:  {
+          this.handleSplitShardAction(req, rsp);
+          break;
+        }
+
       default: {
-        throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown action: "
-            + action);
+          throw new RuntimeException("Unknown action: " + action);
       }
     }
 
@@ -277,6 +281,26 @@ public class CollectionsHandler extends 
     handleResponse(OverseerCollectionProcessor.CREATECOLLECTION, m, rsp);
   }
 
+  private void handleSplitShardAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
+    log.info("Splitting shard : " + req.getParamString());
+    String name = req.getParams().required().get("collection");
+    // TODO : add support for multiple shards
+    String shard = req.getParams().required().get("shard");
+    // TODO : add support for shard range
+
+    Map<String,Object> props = new HashMap<String,Object>();
+    props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.SPLITSHARD);
+    props.put("collection", name);
+    props.put(ZkStateReader.SHARD_ID_PROP, shard);
+
+    ZkNodeProps m = new ZkNodeProps(props);
+
+    // todo remove this hack
+    DEFAULT_ZK_TIMEOUT *= 5;
+    handleResponse(OverseerCollectionProcessor.SPLITSHARD, m, rsp);
+    DEFAULT_ZK_TIMEOUT /= 5;
+  }
+
   public static ModifiableSolrParams params(String... params) {
     ModifiableSolrParams msp = new ModifiableSolrParams();
     for (int i=0; i<params.length; i+=2) {

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Mon Apr  8 15:19:41 2013
@@ -28,6 +28,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.Future;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
@@ -40,6 +41,7 @@ import org.apache.solr.cloud.ZkControlle
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -64,6 +66,7 @@ import org.apache.solr.response.SolrQuer
 import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.update.MergeIndexesCommand;
 import org.apache.solr.update.SplitIndexCommand;
+import org.apache.solr.update.UpdateLog;
 import org.apache.solr.update.processor.UpdateRequestProcessor;
 import org.apache.solr.update.processor.UpdateRequestProcessorChain;
 import org.apache.solr.util.NumberUtils;
@@ -198,6 +201,12 @@ public class CoreAdminHandler extends Re
           break;
         }
         
+        // todo : Can this be done by the regular RecoveryStrategy route?
+        case REQUESTAPPLYUPDATES: {
+          this.handleRequestApplyUpdatesAction(req, rsp);
+          break;
+        }
+        
         default: {
           doPersist = this.handleCustomAction(req, rsp);
           break;
@@ -214,6 +223,12 @@ public class CoreAdminHandler extends Re
     rsp.setHttpCaching(false);
   }
 
+  
+  /**
+   * Handle the core admin SPLIT action.
+   * @return true if a modification has resulted that requires persistence 
+   *         of the CoreContainer configuration.
+   */
   protected boolean handleSplitAction(SolrQueryRequest adminReq, SolrQueryResponse rsp) throws IOException {
     SolrParams params = adminReq.getParams();
      // partitions=N    (split into N partitions, leaving it up to solr what the ranges are and where to put them)
@@ -228,6 +243,7 @@ public class CoreAdminHandler extends Re
     String[] newCoreNames = params.getParams("targetCore");
 
     String cname = params.get(CoreAdminParams.CORE, "");
+    log.info("Invoked split action for core: " + cname);
     SolrCore core = coreContainer.getCore(cname);
     SolrQueryRequest req = new LocalSolrQueryRequest(core, params);
     List<SolrCore> newCores = null;
@@ -237,14 +253,17 @@ public class CoreAdminHandler extends Re
       List<String> paths = null;
       int partitions = pathsArr != null ? pathsArr.length : params.getInt("partitions", 2);
 
-
       // TODO: if we don't know the real range of the current core, we should just
       //  split on every other doc rather than hash.
+      ClusterState clusterState = coreContainer.getZkController().getClusterState();
+      String collectionName = req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName();
+      DocCollection collection = clusterState.getCollection(collectionName);
+      String sliceName = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
+      Slice slice = clusterState.getSlice(collectionName, sliceName);
+      DocRouter.Range currentRange = slice.getRange() == null ?
+          new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE) : slice.getRange();
 
-      // TODO (cloud): get from the current core
-      DocRouter.Range currentRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
-
-      DocRouter hp = DocRouter.DEFAULT;  // TODO: get actual doc router for collection if available
+      DocRouter hp = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
       ranges = hp.partitionRange(partitions, currentRange);
 
       if (pathsArr == null) {
@@ -257,9 +276,6 @@ public class CoreAdminHandler extends Re
             throw new SolrException(ErrorCode.BAD_REQUEST, "Core with core name " + newCoreName + " expected but doesn't exist.");
           }
         }
-        // TODO (cloud): cores should be registered, should be in recovery / buffering-updates mode, and the shard
-        // leader should be forwarding updates to the new shards *before* we split the current shard
-        // into the new shards.
       } else {
         paths = Arrays.asList(pathsArr);
       }
@@ -462,6 +478,14 @@ public class CoreAdminHandler extends Re
         if (opts != null)
           cd.setShardId(opts);
         
+        opts = params.get(CoreAdminParams.SHARD_RANGE);
+        if (opts != null)
+          cd.setShardRange(opts);
+
+        opts = params.get(CoreAdminParams.SHARD_STATE);
+        if (opts != null)
+          cd.setShardState(opts);
+        
         opts = params.get(CoreAdminParams.ROLES);
         if (opts != null)
           cd.setRoles(opts);
@@ -848,6 +872,9 @@ public class CoreAdminHandler extends Re
     Boolean checkLive = params.getBool("checkLive");
     Boolean onlyIfLeader = params.getBool("onlyIfLeader");
 
+    log.info("Going to wait for coreNodeName: " + coreNodeName + ", state: " + waitForState
+        + ", checkLive: " + checkLive + ", onlyIfLeader: " + onlyIfLeader);
+
     String state = null;
     boolean live = false;
     int retry = 0;
@@ -942,6 +969,47 @@ public class CoreAdminHandler extends Re
       Thread.sleep(1000);
     }
 
+    log.info("Waited coreNodeName: " + coreNodeName + ", state: " + waitForState
+        + ", checkLive: " + checkLive + ", onlyIfLeader: " + onlyIfLeader + " for: " + retry + " seconds.");
+  }
+
+  private void handleRequestApplyUpdatesAction(SolrQueryRequest req, SolrQueryResponse rsp) {
+    SolrParams params = req.getParams();
+    String cname = params.get(CoreAdminParams.NAME, "");
+    SolrCore core = coreContainer.getCore(cname);
+    try {
+      UpdateLog updateLog = core.getUpdateHandler().getUpdateLog();
+      if (updateLog.getState() != UpdateLog.State.BUFFERING)  {
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Core " + cname + " not in buffering state");
+      }
+      Future<UpdateLog.RecoveryInfo> future = updateLog.applyBufferedUpdates();
+      if (future == null) {
+        log.info("No buffered updates available. core=" + cname);
+        rsp.add("core", cname);
+        rsp.add("status", "EMPTY_BUFFER");
+        return;
+      }
+      UpdateLog.RecoveryInfo report = future.get();
+      if (report.failed) {
+        SolrException.log(log, "Replay failed");
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Replay failed");
+      }
+      coreContainer.getZkController().publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
+      rsp.add("core", cname);
+      rsp.add("status", "BUFFER_APPLIED");
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      log.warn("Recovery was interrupted", e);
+    } catch (Throwable e) {
+      if (e instanceof SolrException)
+        throw (SolrException)e;
+      else
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Could not apply buffered updates", e);
+    } finally {
+      if (req != null) req.close();
+      if (core != null)
+        core.close();
+    }
     
   }
 

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java Mon Apr  8 15:19:41 2013
@@ -486,7 +486,7 @@ public class SolrDispatchFilter implemen
   
   private String getRemotCoreUrl(CoreContainer cores, String collectionName, String origCorename) {
     ClusterState clusterState = cores.getZkController().getClusterState();
-    Collection<Slice> slices = clusterState.getSlices(collectionName);
+    Collection<Slice> slices = clusterState.getActiveSlices(collectionName);
     boolean byCoreName = false;
     if (slices == null) {
       // look by core name
@@ -494,7 +494,7 @@ public class SolrDispatchFilter implemen
       Set<String> collections = clusterState.getCollections();
       for (String collection : collections) {
         slices = new ArrayList<Slice>();
-        slices.addAll(clusterState.getSlices(collection));
+        slices.addAll(clusterState.getActiveSlices(collection));
       }
     }
     
@@ -541,7 +541,7 @@ public class SolrDispatchFilter implemen
     ZkStateReader zkStateReader = cores.getZkController().getZkStateReader();
     
     ClusterState clusterState = zkStateReader.getClusterState();
-    Map<String,Slice> slices = clusterState.getSlicesMap(collection);
+    Map<String,Slice> slices = clusterState.getActiveSlicesMap(collection);
     if (slices == null) {
       return null;
     }

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java Mon Apr  8 15:19:41 2013
@@ -744,7 +744,7 @@ public class DirectUpdateHandler2 extend
 
   @Override
   public void split(SplitIndexCommand cmd) throws IOException {
-    // TODO: do a commit first?
+    commit(new CommitUpdateCommand(cmd.req, false));
     SolrIndexSplitter splitter = new SolrIndexSplitter(cmd);
     splitter.split();
   }

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java Mon Apr  8 15:19:41 2013
@@ -28,6 +28,7 @@ import org.apache.lucene.index.Terms;
 import org.apache.lucene.index.TermsEnum;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.CharsRef;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.OpenBitSet;
 import org.apache.solr.common.cloud.DocRouter;
@@ -149,8 +150,10 @@ public class SolrIndexSplitter {
 
       // figure out the hash for the term
       // TODO: hook in custom hashes (or store hashes)
-      int hash = Hash.murmurhash3_x86_32(term.bytes, term.offset, term.length, 0);
-
+      // TODO: performance implications of using indexedToReadable?
+      CharsRef ref = new CharsRef(term.length);
+      ref = field.getType().indexedToReadable(term, ref);
+      int hash = Hash.murmurhash3_x86_32(ref, ref.offset, ref.length, 0);
       docsEnum = termsEnum.docs(liveDocs, docsEnum, DocsEnum.FLAG_NONE);
       for (;;) {
         int doc = docsEnum.nextDoc();

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Mon Apr  8 15:19:41 2013
@@ -39,6 +39,7 @@ import org.apache.solr.common.SolrInputD
 import org.apache.solr.common.SolrInputField;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
@@ -137,6 +138,7 @@ public class DistributedUpdateProcessor 
   // method in this update processor
   private boolean isLeader = true;
   private boolean forwardToLeader = false;
+  private boolean forwardToSubShard = false;
   private List<Node> nodes;
 
   private int numNodes;
@@ -239,8 +241,12 @@ public class DistributedUpdateProcessor 
           List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
               .getReplicaProps(collection, shardId, coreNodeName,
                   coreName, null, ZkStateReader.DOWN);
+
+          nodes = addSubShardLeaders(coll, shardId, id, doc, nodes);
           if (replicaProps != null) {
+            if (nodes == null)  {
             nodes = new ArrayList<Node>(replicaProps.size());
+            }
             // check for test param that lets us miss replicas
             String[] skipList = req.getParams().getParams(TEST_DISTRIB_SKIP_SERVERS);
             Set<String> skipListSet = null;
@@ -280,17 +286,55 @@ public class DistributedUpdateProcessor 
     return nodes;
   }
 
+  private List<Node> addSubShardLeaders(DocCollection coll, String shardId, String docId, SolrInputDocument doc, List<Node> nodes) {
+    Collection<Slice> allSlices = coll.getSlices();
+    for (Slice aslice : allSlices) {
+      if (Slice.CONSTRUCTION.equals(aslice.getState()))  {
+        DocRouter.Range myRange = coll.getSlice(shardId).getRange();
+        if (myRange == null) myRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
+        boolean isSubset = aslice.getRange() != null && aslice.getRange().isSubsetOf(myRange);
+        if (isSubset &&
+            (docId == null // in case of deletes
+            || (docId != null && coll.getRouter().isTargetSlice(docId, doc, req.getParams(), aslice.getName(), coll)))) {
+          Replica sliceLeader = aslice.getLeader();
+          // slice leader can be null because node/shard is created zk before leader election
+          if (sliceLeader != null)  {
+            if (nodes == null) nodes = new ArrayList<Node>();
+            ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(sliceLeader);
+            nodes.add(new StdNode(nodeProps));
+            forwardToSubShard = true;
+          }
+        }
+      }
+    }
+    return nodes;
+  }
 
   private void doDefensiveChecks(DistribPhase phase) {
     boolean isReplayOrPeersync = (updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.REPLAY)) != 0;
     if (isReplayOrPeersync) return;
 
     String from = req.getParams().get("distrib.from");
-    boolean localIsLeader = req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader();
+    ClusterState clusterState = zkController.getClusterState();
+    CloudDescriptor cloudDescriptor = req.getCore().getCoreDescriptor().getCloudDescriptor();
+    Slice mySlice = clusterState.getSlice(collection, cloudDescriptor.getShardId());
+    boolean localIsLeader = cloudDescriptor.isLeader();
     if (DistribPhase.FROMLEADER == phase && localIsLeader && from != null) { // from will be null on log replay
+      String fromShard = req.getParams().get("distrib.from.parent");
+      if (fromShard != null)  {
+        // shard splitting case -- check ranges to see if we are a sub-shard
+        Slice fromSlice = zkController.getClusterState().getCollection(collection).getSlice(fromShard);
+        DocRouter.Range parentRange = fromSlice.getRange();
+        if (parentRange == null) parentRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
+        if (mySlice.getRange() != null && !mySlice.getRange().isSubsetOf(parentRange)) {
+          throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
+              "Request says it is coming from parent shard leader but parent hash range is not superset of my range");
+        }
+      } else  {
       log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString());
       throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader");
     }
+    }
     
     if (isLeader && !localIsLeader) {
       log.error("ClusterState says we are the leader, but locally we don't think so");
@@ -324,6 +368,8 @@ public class DistributedUpdateProcessor 
           nodes.add(new StdNode(props));
         }
       }
+
+      nodes = addSubShardLeaders(zkController.getClusterState().getCollection(collection), shardId, null, null, nodes);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
@@ -367,6 +413,9 @@ public class DistributedUpdateProcessor 
         params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
             zkController.getBaseUrl(), req.getCore().getName()));
       }
+      if (forwardToSubShard)  {
+        params.set("distrib.from.parent", req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
+      }
 
       params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
           zkController.getBaseUrl(), req.getCore().getName()));

Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/SliceStateUpdateTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/SliceStateUpdateTest.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/SliceStateUpdateTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/SliceStateUpdateTest.java Mon Apr  8 15:19:41 2013
@@ -171,7 +171,7 @@ public class SliceStateUpdateTest extend
     Map<String, Slice> slices = null;
     for (int i = 75; i > 0; i--) {
       clusterState2 = zkController2.getClusterState();
-      slices = clusterState2.getAllSlicesMap("collection1");
+      slices = clusterState2.getSlicesMap("collection1");
       if (slices != null && slices.containsKey("shard1")
           && slices.get("shard1").getState().equals("inactive")) {
         break;

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java Mon Apr  8 15:19:41 2013
@@ -228,7 +228,7 @@ public class CloudSolrServer extends Sol
       // add it to the Map of slices.
       Map<String,Slice> slices = new HashMap<String,Slice>();
       for (String collectionName : collectionsList) {
-        Collection<Slice> colSlices = clusterState.getSlices(collectionName);
+        Collection<Slice> colSlices = clusterState.getActiveSlices(collectionName);
         if (colSlices == null) {
           throw new SolrServerException("Could not find collection:" + collectionName);
         }

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java Mon Apr  8 15:19:41 2013
@@ -115,10 +115,10 @@ public class ClusterState implements JSO
     return coll.getSlicesMap();
   }
   
-  public Map<String, Slice> getAllSlicesMap(String collection) {
+  public Map<String, Slice> getActiveSlicesMap(String collection) {
     DocCollection coll = collectionStates.get(collection);
     if (coll == null) return null;
-    return coll.getAllSlicesMap();
+    return coll.getActiveSlicesMap();
   }
 
   public Collection<Slice> getSlices(String collection) {
@@ -127,6 +127,12 @@ public class ClusterState implements JSO
     return coll.getSlices();
   }
 
+  public Collection<Slice> getActiveSlices(String collection) {
+    DocCollection coll = collectionStates.get(collection);
+    if (coll == null) return null;
+    return coll.getActiveSlices();
+  }
+
   /**
    * Get the named DocCollection object, or throw an exception if it doesn't exist.
    */

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java Mon Apr  8 15:19:41 2013
@@ -91,7 +91,7 @@ public class CompositeIdRouter extends H
     if (shardKey == null) {
       // search across whole collection
       // TODO: this may need modification in the future when shard splitting could cause an overlap
-      return collection.getSlices();
+      return collection.getActiveSlices();
     }
     String id = shardKey;
 
@@ -132,7 +132,7 @@ public class CompositeIdRouter extends H
     Range completeRange = new Range(lowerBound, upperBound);
 
     List<Slice> targetSlices = new ArrayList<Slice>(1);
-    for (Slice slice : collection.getSlices()) {
+    for (Slice slice : collection.getActiveSlices()) {
       Range range = slice.getRange();
       if (range != null && range.overlaps(completeRange)) {
         targetSlices.add(slice);

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java Mon Apr  8 15:19:41 2013
@@ -36,7 +36,7 @@ public class DocCollection extends ZkNod
 
   private final String name;
   private final Map<String, Slice> slices;
-  private final Map<String, Slice> allSlices;
+  private final Map<String, Slice> activeSlices;
   private final DocRouter router;
 
   /**
@@ -48,15 +48,15 @@ public class DocCollection extends ZkNod
     super( props==null ? Collections.<String,Object>emptyMap() : props);
     this.name = name;
 
-    this.allSlices = slices;
-    this.slices = new HashMap<String, Slice>();
+    this.slices = slices;
+    this.activeSlices = new HashMap<String, Slice>();
 
     Iterator<Map.Entry<String, Slice>> iter = slices.entrySet().iterator();
 
     while (iter.hasNext()) {
       Map.Entry<String, Slice> slice = iter.next();
       if (slice.getValue().getState().equals(Slice.ACTIVE))
-        this.slices.put(slice.getKey(), slice.getValue());
+        this.activeSlices.put(slice.getKey(), slice.getValue());
     }
     this.router = router;
 
@@ -72,11 +72,11 @@ public class DocCollection extends ZkNod
   }
 
   public Slice getSlice(String sliceName) {
-    return allSlices.get(sliceName);
+    return slices.get(sliceName);
   }
 
   /**
-   * Gets the list of active slices for this collection.
+   * Gets the list of all slices for this collection.
    */
   public Collection<Slice> getSlices() {
     return slices.values();
@@ -84,24 +84,24 @@ public class DocCollection extends ZkNod
 
 
   /**
-   * Return the list of all slices for this collection.
+   * Return the list of active slices for this collection.
    */
-  public Collection<Slice> getAllSlices() {
-    return allSlices.values();
+  public Collection<Slice> getActiveSlices() {
+    return activeSlices.values();
   }
 
   /**
-   * Get the map of active slices (sliceName->Slice) for this collection.
+   * Get the map of all slices (sliceName->Slice) for this collection.
    */
   public Map<String, Slice> getSlicesMap() {
     return slices;
   }
 
   /**
-   * Get the map of all slices (sliceName->Slice) for this collection.
+   * Get the map of active slices (sliceName->Slice) for this collection.
    */
-  public Map<String, Slice> getAllSlicesMap() {
-    return allSlices;
+  public Map<String, Slice> getActiveSlicesMap() {
+    return activeSlices;
   }
 
   public DocRouter getRouter() {
@@ -115,9 +115,9 @@ public class DocCollection extends ZkNod
 
   @Override
   public void write(JSONWriter jsonWriter) {
-    LinkedHashMap<String, Object> all = new LinkedHashMap<String, Object>(allSlices.size() + 1);
+    LinkedHashMap<String, Object> all = new LinkedHashMap<String, Object>(slices.size() + 1);
     all.putAll(propMap);
-    all.put(SHARDS, allSlices);
+    all.put(SHARDS, slices);
     jsonWriter.write(all);
   }
 }

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java Mon Apr  8 15:19:41 2013
@@ -164,6 +164,8 @@ public abstract class DocRouter {
    **/
   public abstract Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection);
 
+  public abstract boolean isTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, String shardId, DocCollection collection);
+
 
   /** This method is consulted to determine what slices should be queried for a request when
    *  an explicit shards parameter was not used.

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/HashBasedRouter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/HashBasedRouter.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/HashBasedRouter.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/HashBasedRouter.java Mon Apr  8 15:19:41 2013
@@ -34,6 +34,14 @@ public abstract class HashBasedRouter ex
     return hashToSlice(hash, collection);
   }
 
+  @Override
+  public boolean isTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, String shardId, DocCollection collection) {
+    if (id == null) id = getId(sdoc, params);
+    int hash = sliceHash(id, sdoc, params);
+    Range range = collection.getSlice(shardId).getRange();
+    return range != null && range.includes(hash);
+  }
+
   protected int sliceHash(String id, SolrInputDocument sdoc, SolrParams params) {
     return Hash.murmurhash3_x86_32(id, 0, id.length(), 0);
   }
@@ -45,11 +53,11 @@ public abstract class HashBasedRouter ex
   }
 
   protected Slice hashToSlice(int hash, DocCollection collection) {
-    for (Slice slice : collection.getSlices()) {
+    for (Slice slice : collection.getActiveSlices()) {
       Range range = slice.getRange();
       if (range != null && range.includes(hash)) return slice;
     }
-    throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No slice servicing hash code " + Integer.toHexString(hash) + " in " + collection);
+    throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No active slice servicing hash code " + Integer.toHexString(hash) + " in " + collection);
   }
 
 
@@ -58,7 +66,7 @@ public abstract class HashBasedRouter ex
     if (shardKey == null) {
       // search across whole collection
       // TODO: this may need modification in the future when shard splitting could cause an overlap
-      return collection.getSlices();
+      return collection.getActiveSlices();
     }
 
     // use the shardKey as an id for plain hashing

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java Mon Apr  8 15:19:41 2013
@@ -55,9 +55,15 @@ public class ImplicitDocRouter extends D
   }
 
   @Override
+  public boolean isTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, String shardId, DocCollection collection) {
+    // todo : how to handle this?
+    return false;
+  }
+
+  @Override
   public Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection) {
     if (shardKey == null) {
-      return collection.getSlices();
+      return collection.getActiveSlices();
     }
 
     // assume the shardKey is just a slice name

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java Mon Apr  8 15:19:41 2013
@@ -34,6 +34,8 @@ public class Slice extends ZkNodeProps {
   public static String STATE = "state";
   public static String LEADER = "leader";       // FUTURE: do we want to record the leader as a slice property in the JSON (as opposed to isLeader as a replica property?)
   public static String ACTIVE = "active";
+  public static String INACTIVE = "inactive";
+  public static String CONSTRUCTION = "construction";
 
   private final String name;
   private final DocRouter.Range range;
@@ -52,10 +54,10 @@ public class Slice extends ZkNodeProps {
     this.name = name;
 
     Object rangeObj = propMap.get(RANGE);
-    if (propMap.containsKey(STATE))
-      state = (String) propMap.get(STATE);
+    if (propMap.containsKey(STATE) && propMap.get(STATE) != null)
+      this.state = (String) propMap.get(STATE);
     else {
-      state = ACTIVE;                         //Default to ACTIVE
+      this.state = ACTIVE;                         //Default to ACTIVE
       propMap.put(STATE, this.state);
     }
     DocRouter.Range tmpRange = null;

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Mon Apr  8 15:19:41 2013
@@ -57,6 +57,8 @@ public class ZkStateReader {
   public static final String CORE_NAME_PROP = "core";
   public static final String COLLECTION_PROP = "collection";
   public static final String SHARD_ID_PROP = "shard";
+  public static final String SHARD_RANGE_PROP = "shard_range";
+  public static final String SHARD_STATE_PROP = "shard_state";
   public static final String NUM_SHARDS_PROP = "numShards";
   public static final String LEADER_PROP = "leader";
   

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java Mon Apr  8 15:19:41 2013
@@ -28,7 +28,7 @@ public interface CollectionParams 
 
 
   public enum CollectionAction {
-    CREATE, DELETE, RELOAD, SYNCSHARD, CREATEALIAS, DELETEALIAS;
+    CREATE, DELETE, RELOAD, SYNCSHARD, CREATEALIAS, DELETEALIAS, SPLITSHARD;
     
     public static CollectionAction get( String p )
     {

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java Mon Apr  8 15:19:41 2013
@@ -74,6 +74,16 @@ public interface CoreAdminParams 
   /** The shard id in solr cloud */
   public final static String SHARD = "shard";
   
+  /** The shard range in solr cloud */
+  public final static String SHARD_RANGE = "shard.range";
+
+  /** The shard range in solr cloud */
+  public final static String SHARD_STATE = "shard.state";
+
+  /** The target core to which a split index should be written to
+   * Multiple targetCores can be specified by multiple targetCore parameters */
+  public final static String TARGET_CORE = "targetCore";
+  
   public static final String ROLES = "roles";
   
   public static final String CORE_NODE_NAME = "coreNodeName";
@@ -108,6 +118,7 @@ public interface CoreAdminParams 
     REQUESTSYNCSHARD,
     CREATEALIAS,
     DELETEALIAS,
+    REQUESTAPPLYUPDATES,
     LOAD_ON_STARTUP,
     TRANSIENT;