You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2015/05/04 20:24:53 UTC

svn commit: r1677648 - in /lucene/dev/branches/branch_5x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/cloud/ solr/core/src/java/org/apache/solr/cloud/rule/ solr/core/src/java/org/apache/solr/core/ solr/core/src/java/org/apache/solr/handler/a...

Author: noble
Date: Mon May  4 18:24:52 2015
New Revision: 1677648

URL: http://svn.apache.org/r1677648
Log:
SOLR-6220: Rule Based Replica Assignment during collection creation

Added:
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/
      - copied from r1677607, lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/rule/
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/package-info.java
      - copied unchanged from r1677642, lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/rule/package-info.java
    lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/rule/
      - copied from r1677607, lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/rule/
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/GenericSolrRequest.java   (contents, props changed)
      - copied, changed from r1677607, lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/GenericSolrRequest.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/response/SimpleSolrResponse.java   (props changed)
      - copied unchanged from r1677607, lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/response/SimpleSolrResponse.java
Modified:
    lucene/dev/branches/branch_5x/   (props changed)
    lucene/dev/branches/branch_5x/solr/   (props changed)
    lucene/dev/branches/branch_5x/solr/CHANGES.txt   (contents, props changed)
    lucene/dev/branches/branch_5x/solr/core/   (props changed)
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/ImplicitSnitch.java   (props changed)
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/RemoteCallback.java   (props changed)
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java   (contents, props changed)
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/Rule.java   (contents, props changed)
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/Snitch.java   (contents, props changed)
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/SnitchContext.java   (contents, props changed)
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/RequestParams.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
    lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java   (contents, props changed)
    lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/rule/RulesTest.java   (props changed)
    lucene/dev/branches/branch_5x/solr/solrj/   (props changed)
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/params/ModifiableSolrParams.java

Modified: lucene/dev/branches/branch_5x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/CHANGES.txt?rev=1677648&r1=1677647&r2=1677648&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_5x/solr/CHANGES.txt Mon May  4 18:24:52 2015
@@ -106,6 +106,8 @@ New Features
 * SOLR-7231: DIH-TikaEntityprocessor, create lat-lon field from Metadata
   (Tim Allison via Noble Paul)
 
+* SOLR-6220: Rule Based Replica Assignment during collection creation (Noble Paul)
+
 
 Bug Fixes
 ----------------------

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1677648&r1=1677647&r2=1677648&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java Mon May  4 18:24:52 2015
@@ -811,7 +811,7 @@ public class Overseer implements Closeab
 
     ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
 
-    overseerCollectionProcessor = new OverseerCollectionProcessor(reader, id, shardHandler, adminPath, stats);
+    overseerCollectionProcessor = new OverseerCollectionProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this);
     ccThread = new OverseerThread(ccTg, overseerCollectionProcessor, "OverseerCollectionProcessor-" + id);
     ccThread.setDaemon(true);
     
@@ -829,6 +829,10 @@ public class Overseer implements Closeab
   public Stats getStats() {
     return stats;
   }
+
+  ZkController getZkController(){
+    return zkController;
+  }
   
   /**
    * For tests.

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1677648&r1=1677647&r2=1677648&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Mon May  4 18:24:52 2015
@@ -21,6 +21,7 @@ import static org.apache.solr.cloud.Assi
 import static org.apache.solr.common.cloud.ZkStateReader.*;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
 import static org.apache.solr.common.params.CommonParams.*;
+import static org.apache.solr.common.util.StrUtils.formatString;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -53,8 +54,12 @@ import org.apache.solr.client.solrj.resp
 import org.apache.solr.cloud.Assign.Node;
 import org.apache.solr.cloud.DistributedQueue.QueueEvent;
 import org.apache.solr.cloud.Overseer.LeaderStatus;
+import org.apache.solr.cloud.rule.Rule;
+import org.apache.solr.cloud.rule.ReplicaAssigner;
+import org.apache.solr.cloud.rule.ReplicaAssigner.Position;
 import org.apache.solr.cloud.overseer.ClusterStateMutator;
 import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.cloud.rule.SnitchContext;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.Aliases;
@@ -103,13 +108,13 @@ import com.google.common.collect.Immutab
 
 
 public class OverseerCollectionProcessor implements Runnable, Closeable {
-  
+
   public static final String NUM_SLICES = "numShards";
-  
+
   static final boolean CREATE_NODE_SET_SHUFFLE_DEFAULT = true;
   public static final String CREATE_NODE_SET_SHUFFLE = "createNodeSet.shuffle";
   public static final String CREATE_NODE_SET = "createNodeSet";
-  
+
   public static final String ROUTER = "router";
 
   public static final String SHARDS_PROP = "shards";
@@ -134,7 +139,9 @@ public class OverseerCollectionProcessor
       ROUTER, DocRouter.DEFAULT_NAME,
       ZkStateReader.REPLICATION_FACTOR, "1",
       ZkStateReader.MAX_SHARDS_PER_NODE, "1",
-      ZkStateReader.AUTO_ADD_REPLICAS, "false");
+      ZkStateReader.AUTO_ADD_REPLICAS, "false",
+      "rule", null,
+      "snitch",null);
 
   static final Random RANDOM;
   static {
@@ -149,10 +156,10 @@ public class OverseerCollectionProcessor
   }
 
   public ExecutorService tpe ;
-  
+
   private static Logger log = LoggerFactory
       .getLogger(OverseerCollectionProcessor.class);
-  
+
   private DistributedQueue workQueue;
   private DistributedMap runningMap;
   private DistributedMap completedMap;
@@ -185,13 +192,15 @@ public class OverseerCollectionProcessor
   // deleted from the work-queue as that is a batched operation.
   final private Set<String> runningZKTasks;
   private final Object waitLock = new Object();
+  private Overseer overseer;
 
   public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId,
                                      final ShardHandler shardHandler,
-                                     String adminPath, Overseer.Stats stats) {
+                                     String adminPath, Overseer.Stats stats, Overseer overseer) {
     this(zkStateReader, myId, shardHandler.getShardHandlerFactory(), adminPath, stats, Overseer.getCollectionQueue(zkStateReader.getZkClient(), stats),
         Overseer.getRunningMap(zkStateReader.getZkClient()),
         Overseer.getCompletedMap(zkStateReader.getZkClient()), Overseer.getFailureMap(zkStateReader.getZkClient()));
+    this.overseer = overseer;
   }
 
   protected OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId,
@@ -216,7 +225,7 @@ public class OverseerCollectionProcessor
     this.collectionWip = new HashSet();
     this.completedTasks = new HashMap<>();
   }
-  
+
   @Override
   public void run() {
     log.info("Process current queue of collection creations");
@@ -1113,12 +1122,12 @@ public class OverseerCollectionProcessor
       params.set(CoreAdminParams.DELETE_DATA_DIR, true);
       collectionCmd(zkStateReader.getClusterState(), message, params, results,
           null);
-      
+
       ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
           DELETE.toLower(), NAME, collection);
       Overseer.getInQueue(zkStateReader.getZkClient()).offer(
           ZkStateReader.toJSON(m));
-      
+
       // wait for a while until we don't see the collection
       long now = System.nanoTime();
       long timeout = now + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
@@ -1136,9 +1145,9 @@ public class OverseerCollectionProcessor
         throw new SolrException(ErrorCode.SERVER_ERROR,
             "Could not fully remove collection: " + collection);
       }
-      
+
     } finally {
-      
+
       try {
         if (zkStateReader.getZkClient().exists(
             ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, true)) {
@@ -1196,7 +1205,7 @@ public class OverseerCollectionProcessor
     }
 
   }
-  
+
   private void checkForAlias(String name, String value) {
 
     long now = System.nanoTime();
@@ -1215,7 +1224,7 @@ public class OverseerCollectionProcessor
       log.warn("Timeout waiting to be notified of Alias change...");
     }
   }
-  
+
   private void checkForAliasAbsence(String name) {
 
     long now = System.nanoTime();
@@ -1265,7 +1274,7 @@ public class OverseerCollectionProcessor
     } finally {
       MDCUtils.cleanupMDC(previousMDCContext);
     }
-    
+
   }
 
   private boolean createShard(ClusterState clusterState, ZkNodeProps message, NamedList results)
@@ -1773,21 +1782,21 @@ public class OverseerCollectionProcessor
       return ureq.process(client);
     }
   }
-  
+
   private String waitForCoreNodeName(String collectionName, String msgNodeName, String msgCore) {
     int retryCount = 320;
     while (retryCount-- > 0) {
       Map<String,Slice> slicesMap = zkStateReader.getClusterState()
           .getSlicesMap(collectionName);
       if (slicesMap != null) {
-        
+
         for (Slice slice : slicesMap.values()) {
           for (Replica replica : slice.getReplicas()) {
             // TODO: for really large clusters, we could 'index' on this
-            
+
             String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
             String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
-            
+
             if (nodeName.equals(msgNodeName) && core.equals(msgCore)) {
               return replica.getName();
             }
@@ -2263,10 +2272,10 @@ public class OverseerCollectionProcessor
       nodeList = new ArrayList<>(liveNodes);
       Collections.shuffle(nodeList, random);
     }
-    
-    return nodeList;    
+
+    return nodeList;
   }
-  
+
   private void createCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
     String collectionName = message.getStr(NAME);
     if (clusterState.hasCollection(collectionName)) {
@@ -2279,11 +2288,11 @@ public class OverseerCollectionProcessor
     } else if (!validateConfig(configName)) {
       throw new SolrException(ErrorCode.BAD_REQUEST, "Can not find the specified config set: " + configName);
     }
-    
+
     try {
       // look at the replication factor and see if it matches reality
       // if it does not, find best nodes to create more cores
-      
+
       int repFactor = message.getInt(ZkStateReader.REPLICATION_FACTOR, 1);
 
       ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
@@ -2304,21 +2313,21 @@ public class OverseerCollectionProcessor
       }
 
       int maxShardsPerNode = message.getInt(ZkStateReader.MAX_SHARDS_PER_NODE, 1);
-      
+
       if (repFactor <= 0) {
         throw new SolrException(ErrorCode.BAD_REQUEST, ZkStateReader.REPLICATION_FACTOR + " must be greater than 0");
       }
-      
+
       if (numSlices <= 0) {
         throw new SolrException(ErrorCode.BAD_REQUEST, NUM_SLICES + " must be > 0");
       }
-      
+
       // we need to look at every node and see how many cores it serves
       // add our new cores to existing nodes serving the least number of cores
       // but (for now) require that each core goes on a distinct node.
-      
+
       final List<String> nodeList = getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, RANDOM);
-      
+
       if (repFactor > nodeList.size()) {
         log.warn("Specified "
             + ZkStateReader.REPLICATION_FACTOR
@@ -2330,7 +2339,7 @@ public class OverseerCollectionProcessor
             + nodeList.size()
             + "). It's unusual to run two replica of the same slice on the same Solr-instance.");
       }
-      
+
       int maxShardsAllowedToCreate = maxShardsPerNode * nodeList.size();
       int requestedShardsToCreate = numSlices * repFactor;
       if (maxShardsAllowedToCreate < requestedShardsToCreate) {
@@ -2343,6 +2352,8 @@ public class OverseerCollectionProcessor
             + ". This requires " + requestedShardsToCreate
             + " shards to be created (higher than the allowed number)");
       }
+
+      Map<Position, String> positionVsNodes = identifyNodes(clusterState, nodeList, message, shardNames, repFactor);
       boolean isLegacyCloud =  Overseer.isLegacy(zkStateReader.getClusterProps());
 
       createConfNode(configName, collectionName, isLegacyCloud);
@@ -2363,62 +2374,60 @@ public class OverseerCollectionProcessor
       // For tracking async calls.
       HashMap<String, String> requestMap = new HashMap<String, String>();
 
-      log.info("Creating SolrCores for new collection {}, shardNames {} , replicationFactor : {}",
-          collectionName, shardNames, repFactor);
+
+      log.info(formatString("Creating SolrCores for new collection {0}, shardNames {1} , replicationFactor : {2}",
+          collectionName, shardNames, repFactor));
       Map<String ,ShardRequest> coresToCreate = new LinkedHashMap<>();
-      for (int i = 1; i <= shardNames.size(); i++) {
-        String sliceName = shardNames.get(i-1);
-        for (int j = 1; j <= repFactor; j++) {
-          String nodeName = nodeList.get((repFactor * (i - 1) + (j - 1)) % nodeList.size());
-          String coreName = collectionName + "_" + sliceName + "_replica" + j;
-          log.info("Creating shard " + coreName + " as part of slice "
-              + sliceName + " of collection " + collectionName + " on "
-              + nodeName);
+      for (Map.Entry<Position, String> e : positionVsNodes.entrySet()) {
+        Position position = e.getKey();
+        String nodeName = e.getValue();
+        String coreName = collectionName + "_" + position.shard + "_replica" + (position.index + 1);
+        log.info(formatString("Creating core {0} as part of shard {1} of collection {2} on {3}"
+            , coreName, position.shard, collectionName, nodeName));
+
+
+        String baseUrl = zkStateReader.getBaseUrlForNodeName(nodeName);
+        //in the new mode, create the replica in clusterstate prior to creating the core.
+        // Otherwise the core creation fails
+        if (!isLegacyCloud) {
+          ZkNodeProps props = new ZkNodeProps(
+              Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
+              ZkStateReader.COLLECTION_PROP, collectionName,
+              ZkStateReader.SHARD_ID_PROP, position.shard,
+              ZkStateReader.CORE_NAME_PROP, coreName,
+              ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
+              ZkStateReader.BASE_URL_PROP, baseUrl);
+          Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(props));
+        }
 
+        // Need to create new params for each request
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
 
-          String baseUrl = zkStateReader.getBaseUrlForNodeName(nodeName);
-          //in the new mode, create the replica in clusterstate prior to creating the core.
-          // Otherwise the core creation fails
-          if(!isLegacyCloud){
-            ZkNodeProps props = new ZkNodeProps(
-                Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
-                ZkStateReader.COLLECTION_PROP, collectionName,
-                ZkStateReader.SHARD_ID_PROP, sliceName,
-                ZkStateReader.CORE_NAME_PROP, coreName,
-                ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
-                ZkStateReader.BASE_URL_PROP,baseUrl);
-                Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(props));
-          }
+        params.set(CoreAdminParams.NAME, coreName);
+        params.set(COLL_CONF, configName);
+        params.set(CoreAdminParams.COLLECTION, collectionName);
+        params.set(CoreAdminParams.SHARD, position.shard);
+        params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
 
-          // Need to create new params for each request
-          ModifiableSolrParams params = new ModifiableSolrParams();
-          params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
+        if (async != null) {
+          String coreAdminAsyncId = async + Math.abs(System.nanoTime());
+          params.add(ASYNC, coreAdminAsyncId);
+          requestMap.put(nodeName, coreAdminAsyncId);
+        }
+        addPropertyParams(message, params);
 
-          params.set(CoreAdminParams.NAME, coreName);
-          params.set(COLL_CONF, configName);
-          params.set(CoreAdminParams.COLLECTION, collectionName);
-          params.set(CoreAdminParams.SHARD, sliceName);
-          params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
-
-          if (async != null)  {
-            String coreAdminAsyncId = async + Math.abs(System.nanoTime());
-            params.add(ASYNC, coreAdminAsyncId);
-            requestMap.put(nodeName, coreAdminAsyncId);
-          }
-          addPropertyParams(message, params);
-
-          ShardRequest sreq = new ShardRequest();
-          params.set("qt", adminPath);
-          sreq.purpose = 1;
-          sreq.shards = new String[] {baseUrl};
-          sreq.actualShards = sreq.shards;
-          sreq.params = params;
-
-          if(isLegacyCloud) {
-            shardHandler.submit(sreq, sreq.shards[0], sreq.params);
-          } else {
-            coresToCreate.put(coreName, sreq);
-          }
+        ShardRequest sreq = new ShardRequest();
+        params.set("qt", adminPath);
+        sreq.purpose = 1;
+        sreq.shards = new String[]{baseUrl};
+        sreq.actualShards = sreq.shards;
+        sreq.params = params;
+
+        if (isLegacyCloud) {
+          shardHandler.submit(sreq, sreq.shards[0], sreq.params);
+        } else {
+          coresToCreate.put(coreName, sreq);
         }
       }
 
@@ -2446,6 +2455,57 @@ public class OverseerCollectionProcessor
     }
   }
 
+  private Map<Position, String> identifyNodes(ClusterState clusterState,
+                                              List<String> nodeList,
+                                              ZkNodeProps message,
+                                              List<String> shardNames,
+                                              int repFactor) throws IOException {
+    List<Map> maps = (List) message.get("rule");
+    if (maps == null) {
+      int i = 0;
+      Map<Position, String> result = new HashMap<>();
+      for (String aShard : shardNames) {
+        for (int j = 0; j < repFactor; j++){
+          result.put(new Position(aShard, j), nodeList.get(i % nodeList.size()));
+          i++;
+        }
+      }
+      return result;
+    }
+
+    List<Rule> rules = new ArrayList<>();
+    for (Object map : maps) rules.add(new Rule((Map) map));
+
+    Map<String, Integer> sharVsReplicaCount = new HashMap<>();
+
+    for (String shard : shardNames) sharVsReplicaCount.put(shard, repFactor);
+    maps = (List<Map>) message.get("snitch");
+    List snitchList = maps == null? Collections.emptyList(): maps;
+    ReplicaAssigner replicaAssigner = new ReplicaAssigner(rules,
+        sharVsReplicaCount,
+        snitchList,
+        new HashMap<String, Set<String>>(),//this is a new collection. So, there are no nodes in any shard
+        nodeList,
+        overseer.getZkController().getCoreContainer(),
+        clusterState);
+
+    Map<Position, String> nodeMappings = replicaAssigner.getNodeMappings();
+    if(nodeMappings == null){
+      String msg = "Could not identify nodes matching the rules " + rules ;
+      if(!replicaAssigner.failedNodes.isEmpty()){
+        Map<String, String> failedNodes = new HashMap<>();
+        for (Map.Entry<String, SnitchContext> e : replicaAssigner.failedNodes.entrySet()) {
+          failedNodes.put(e.getKey(), e.getValue().getErrMsg());
+        }
+        msg+=" Some nodes where excluded from assigning replicas because tags could not be obtained from them "+ failedNodes;
+      }
+      msg+= ZkStateReader.toJSONString(replicaAssigner.getNodeVsTags());
+
+      throw new SolrException(ErrorCode.BAD_REQUEST, msg);
+    }
+    return nodeMappings;
+  }
+
   private Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreNames) throws InterruptedException {
     Map<String, Replica> result = new HashMap<>();
     long endTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java?rev=1677648&r1=1677607&r2=1677648&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java Mon May  4 18:24:52 2015
@@ -187,11 +187,11 @@ public class ReplicaAssigner {
   }
 
 
-  private Map<Position, String> tryAPermutationOfRules(int[] rulePermutation, List<Position> positions, boolean fuzzyPhase) {
-    Map<String, Map<String, Object>> nodeVsTagsCopy = getDeepCopy(nodeVsTags, 2);
+  private Map<Position, String> tryAPermutationOfRules(final int[] rulePermutation, List<Position> positions, boolean fuzzyPhase) {
+    final Map<String, Map<String, Object>> nodeVsTagsCopy = getDeepCopy(nodeVsTags, 2);
     Map<Position, String> result = new LinkedHashMap<>();
     int startPosition = 0;
-    Map<String, Set<String>> copyOfCurrentState = getDeepCopy(shardVsNodes, 2);
+    final Map<String, Set<String>> copyOfCurrentState = getDeepCopy(shardVsNodes, 2);
     List<String> sortedLiveNodes = new ArrayList<>(this.liveNodes);
     Collections.sort(sortedLiveNodes, new Comparator<String>() {
       @Override
@@ -300,6 +300,9 @@ public class ReplicaAssigner {
       public int[] next() {
         return next;
       }
+
+      @Override
+      public void remove() { }
     };
 
   }

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/Rule.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/Rule.java?rev=1677648&r1=1677607&r2=1677648&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/Rule.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/Rule.java Mon May  4 18:24:52 2015
@@ -113,10 +113,10 @@ public class Rule {
    * without violating this rule
    *
    * @param testNode       The node in question
-   * @param shardVsNodeSet
+   * @param shardVsNodeSet Set of nodes for every shard 
    * @param nodeVsTags     The pre-fetched tags for all the nodes
    * @param shardName      The shard to which this node should be attempted
-   * @return
+   * @return MatchStatus
    */
   MatchStatus tryAssignNodeToShard(String testNode,
                                    Map<String, Set<String>> shardVsNodeSet,

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/Snitch.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/Snitch.java?rev=1677648&r1=1677607&r2=1677648&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/Snitch.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/Snitch.java Mon May  4 18:24:52 2015
@@ -25,7 +25,7 @@ import com.google.common.collect.Immutab
  *
  */
 public abstract class Snitch {
-  static Set<Class> WELL_KNOWN_SNITCHES = ImmutableSet.of(ImplicitSnitch.class);
+  static Set<Class> WELL_KNOWN_SNITCHES = ImmutableSet.<Class>of(ImplicitSnitch.class);
 
 
   public abstract void getTags(String solrNode, Set<String> requestedTags, SnitchContext ctx);

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/SnitchContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/SnitchContext.java?rev=1677648&r1=1677607&r2=1677648&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/SnitchContext.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/rule/SnitchContext.java Mon May  4 18:24:52 2015
@@ -107,9 +107,11 @@ public class SnitchContext implements Re
   public SimpleSolrResponse invoke(UpdateShardHandler shardHandler,  final String url, String path, SolrParams params)
       throws IOException, SolrServerException {
     GenericSolrRequest request = new GenericSolrRequest(SolrRequest.METHOD.GET, path, params);
-    NamedList<Object> rsp = new HttpSolrClient(url, shardHandler.getHttpClient(), new BinaryResponseParser()).request(request);
-    request.response.nl = rsp;
-    return request.response;
+    try (HttpSolrClient client = new HttpSolrClient(url, shardHandler.getHttpClient(), new BinaryResponseParser())) {
+      NamedList<Object> rsp = client.request(request);
+      request.response.nl = rsp;
+      return request.response;
+    }
   }
 
 

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/RequestParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/RequestParams.java?rev=1677648&r1=1677647&r2=1677648&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/RequestParams.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/RequestParams.java Mon May  4 18:24:52 2015
@@ -204,6 +204,10 @@ public class RequestParams implements Ma
       Object v = e.getValue();
       if (v instanceof Map && maxDepth > 0) {
         v = getDeepCopy((Map) v, maxDepth - 1);
+      } else if (v instanceof Set) {
+        v = new HashSet((Set) v);
+      } else if (v instanceof List) {
+        v = new ArrayList((List) v);
       }
       copy.put(e.getKey(), v);
     }

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1677648&r1=1677647&r2=1677648&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Mon May  4 18:24:52 2015
@@ -48,6 +48,7 @@ import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.OverseerCollectionProcessor;
 import org.apache.solr.cloud.OverseerSolrResponse;
 import org.apache.solr.cloud.overseer.SliceMutator;
+import org.apache.solr.cloud.rule.Rule;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClusterState;
@@ -803,6 +804,8 @@ public class CollectionsHandler extends
     if(props.get(DocCollection.STATE_FORMAT) == null){
       props.put(DocCollection.STATE_FORMAT,"2");
     }
+    addRuleMap(req.getParams(), props, "rule");
+    addRuleMap(req.getParams(), props, "snitch");
 
     if(SYSTEM_COLL.equals(name)){
       //We must always create asystem collection with only a single shard
@@ -817,6 +820,15 @@ public class CollectionsHandler extends
     handleResponse(CREATE.toLower(), m, rsp);
   }
 
+  private void addRuleMap(SolrParams params, Map<String, Object> props, String key) {
+    String[] rules = params.getParams(key);
+    if(rules!= null && rules.length >0){
+      ArrayList<Map> l = new ArrayList<>();
+      for (String rule : rules) l.add(Rule.parseRule(rule));
+      props.put(key, l);
+    }
+  }
+
   private void createSysConfigSet() throws KeeperException, InterruptedException {
     SolrZkClient zk = coreContainer.getZkController().getZkStateReader().getZkClient();
     createNodeIfNotExists(zk,ZkStateReader.CONFIGS_ZKNODE, null);

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1677648&r1=1677647&r2=1677648&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Mon May  4 18:24:52 2015
@@ -67,6 +67,7 @@ import org.apache.solr.core.CoreDescript
 import org.apache.solr.core.DirectoryFactory;
 import org.apache.solr.core.DirectoryFactory.DirContext;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.core.SolrResourceLoader;
 import org.apache.solr.handler.RequestHandlerBase;
 import org.apache.solr.request.LocalSolrQueryRequest;
 import org.apache.solr.request.SolrQueryRequest;
@@ -311,11 +312,40 @@ public class CoreAdminHandler extends Re
             log.warn("zkController is null in CoreAdminHandler.handleRequestInternal:REJOINLEADERELCTIONS. No action taken.");
           }
           break;
+        case INVOKE:
+          handleInvoke(req, rsp);
+          break;
       }
     }
     rsp.setHttpCaching(false);
   }
 
+  public void handleInvoke(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
+    String[] klas = req.getParams().getParams("class");
+    if (klas == null || klas.length == 0) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "class is a required param");
+    }
+    for (String c : klas) {
+      Map<String, Object> result = invokeAClass(req, c);
+      rsp.add(c, result);
+    }
+
+  }
+
+  private Map<String, Object> invokeAClass(SolrQueryRequest req, String c) {
+    SolrResourceLoader loader = null;
+    if (req.getCore() != null) loader = req.getCore().getResourceLoader();
+    else if (req.getContext().get(CoreContainer.class.getName()) != null) {
+      CoreContainer cc = (CoreContainer) req.getContext().get(CoreContainer.class.getName());
+      loader = cc.getResourceLoader();
+    }
+
+    Invocable invokable = loader.newInstance(c, Invocable.class);
+    Map<String, Object> result = invokable.invoke(req);
+    log.info("Invocable_invoked {}", result);
+    return result;
+  }
+
 
   /**
    * Handle the core admin SPLIT action.
@@ -1315,4 +1345,11 @@ public class CoreAdminHandler extends Re
     if (parallelExecutor != null && !parallelExecutor.isShutdown())
       ExecutorUtil.shutdownAndAwaitTermination(parallelExecutor);
   }
+
+  /**
+   * used by the INVOKE action of core admin handler
+   */
+  public static interface Invocable {
+    public Map<String, Object> invoke(SolrQueryRequest req);
+  }
 }

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java?rev=1677648&r1=1677647&r2=1677648&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java Mon May  4 18:24:52 2015
@@ -286,6 +286,7 @@ public class SolrDispatchFilter extends
         handler = cores.getRequestHandler(path);
         if (handler != null) {
           solrReq = SolrRequestParsers.DEFAULT.parse(null, path, req);
+          solrReq.getContext().put(CoreContainer.class.getName(), cores);
           handleAdminRequest(req, response, handler, solrReq);
           return;
         }

Modified: lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java?rev=1677648&r1=1677607&r2=1677648&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java Mon May  4 18:24:52 2015
@@ -18,6 +18,7 @@ package org.apache.solr.cloud.rule;
  */
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -60,7 +61,7 @@ public class RuleEngineTest extends Solr
         "    'node':'127.0.0.1:49958_'," +
         "    'disk':992," +
         "    'cores':1}}";
-    MockSnitch.nodeVsTags = (Map) ZkStateReader.fromJSON(s.getBytes());
+    MockSnitch.nodeVsTags = (Map) ZkStateReader.fromJSON(s.getBytes(StandardCharsets.UTF_8));
     Map shardVsReplicaCount = makeMap("shard1", 2, "shard2", 2);
 
     List<Rule> rules = parseRules("[{'cores':'<4'}, {" +
@@ -105,7 +106,7 @@ public class RuleEngineTest extends Solr
         "    'node':'127.0.0.1:49958_'," +
         "    'disk':970," +
         "    'cores':1}}";
-    MockSnitch.nodeVsTags = (Map) ZkStateReader.fromJSON(s.getBytes());
+    MockSnitch.nodeVsTags = (Map) ZkStateReader.fromJSON(s.getBytes(StandardCharsets.UTF_8));
     //test not
     List<Rule> rules = parseRules(
          "[{cores:'<4'}, " +
@@ -182,7 +183,7 @@ public class RuleEngineTest extends Solr
 
   private List<Rule> parseRules(String s) {
 
-    List maps = (List) ZkStateReader.fromJSON(s.getBytes());
+    List maps = (List) ZkStateReader.fromJSON(s.getBytes(StandardCharsets.UTF_8));
 
     List<Rule> rules = new ArrayList<>();
     for (Object map : maps) rules.add(new Rule((Map) map));

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java?rev=1677648&r1=1677647&r2=1677648&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java Mon May  4 18:24:52 2015
@@ -190,7 +190,7 @@ public class CollectionAdminRequest exte
     protected Boolean autoAddReplicas;
     protected Integer stateFormat;
     protected String asyncId;
-
+    private String[] rule , snitch;
     public Create() {
       action = CollectionAction.CREATE;
     }
@@ -208,6 +208,8 @@ public class CollectionAdminRequest exte
     public void setAsyncId(String asyncId) {
       this.asyncId = asyncId;
     }
+    public void setRule(String... s){ this.rule = s; }
+    public void setSnitch(String... s){ this.snitch = s; }
 
     public String getConfigName()  { return configName; }
     public String getCreateNodeSet() { return createNodeSet; }
@@ -260,6 +262,8 @@ public class CollectionAdminRequest exte
       if (stateFormat != null) {
         params.set(DocCollection.STATE_FORMAT, stateFormat);
       }
+      if(rule != null) params.set("rule", rule);
+      if(snitch != null) params.set("snitch", snitch);
       return params;
     }
     

Copied: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/GenericSolrRequest.java (from r1677607, lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/GenericSolrRequest.java)
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/GenericSolrRequest.java?p2=lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/GenericSolrRequest.java&p1=lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/GenericSolrRequest.java&r1=1677607&r2=1677648&rev=1677648&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/GenericSolrRequest.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/GenericSolrRequest.java Mon May  4 18:24:52 2015
@@ -48,7 +48,7 @@ public class GenericSolrRequest extends
 
   @Override
   public Collection<ContentStream> getContentStreams() throws IOException {
-    return null;
+    return contentStreams;
   }
 
   @Override

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1677648&r1=1677647&r2=1677648&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Mon May  4 18:24:52 2015
@@ -37,6 +37,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -131,6 +132,10 @@ public class ZkStateReader implements Cl
     return toUTF8(out);
   }
 
+  public static String toJSONString(Object o) {
+    return new String(toJSON(o), StandardCharsets.UTF_8);
+  }
+
   public static byte[] toUTF8(CharArr out) {
     byte[] arr = new byte[out.size() << 2]; // is 4x the real worst-case upper-bound?
     int nBytes = ByteUtils.UTF16toUTF8(out, 0, out.size(), arr, 0);

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java?rev=1677648&r1=1677647&r2=1677648&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java Mon May  4 18:24:52 2015
@@ -138,8 +138,9 @@ public abstract class CoreAdminParams
     TRANSIENT,
     OVERSEEROP,
     REQUESTSTATUS,
-    REJOINLEADERELECTION;
-    
+    REJOINLEADERELECTION,
+    INVOKE;
+
     public static CoreAdminAction get( String p )
     {
       if( p != null ) {

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/params/ModifiableSolrParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/params/ModifiableSolrParams.java?rev=1677648&r1=1677647&r2=1677648&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/params/ModifiableSolrParams.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/params/ModifiableSolrParams.java Mon May  4 18:24:52 2015
@@ -57,6 +57,10 @@ public class ModifiableSolrParams extend
     }
   }
 
+  public int size() {
+    return vals == null ? 0 : vals.size();
+  }
+
   public Map<String,String[]> getMap() {
     return vals;
   }