You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by cm...@apache.org on 2013/08/11 14:19:39 UTC

svn commit: r1512909 [24/38] - in /lucene/dev/branches/lucene4956: ./ dev-tools/ dev-tools/eclipse/ dev-tools/idea/.idea/libraries/ dev-tools/idea/lucene/suggest/ dev-tools/idea/solr/contrib/dataimporthandler/ dev-tools/idea/solr/core/src/test/ dev-too...

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1512909&r1=1512908&r2=1512909&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Sun Aug 11 12:19:13 2013
@@ -17,15 +17,13 @@ package org.apache.solr.cloud;
  * limitations under the License.
  */
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.UpdateResponse;
 import org.apache.solr.cloud.DistributedQueue.QueueEvent;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
@@ -34,15 +32,18 @@ import org.apache.solr.common.cloud.Clos
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
 import org.apache.solr.common.cloud.PlainIdRouter;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.UpdateParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.common.util.StrUtils;
@@ -53,6 +54,21 @@ import org.apache.zookeeper.KeeperExcept
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.solr.cloud.Assign.Node;
+import static org.apache.solr.cloud.Assign.getNodesForNewShard;
+import static org.apache.solr.common.cloud.DocRouter.ROUTE_FIELD;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+
+
 public class OverseerCollectionProcessor implements Runnable, ClosableThread {
   
   public static final String NUM_SLICES = "numShards";
@@ -75,6 +91,24 @@ public class OverseerCollectionProcessor
   
   public static final String SPLITSHARD = "splitshard";
 
+  public static final String DELETESHARD = "deleteshard";
+
+  public static final String ROUTER = "router";
+
+  public static final String SHARDS_PROP = "shards";
+
+  public static final String CREATESHARD = "createshard";
+
+  public static final String COLL_CONF = "collection.configName";
+
+
+  public static final Map<String,Object> COLL_PROPS = asMap(
+      ROUTER,DocRouter.DEFAULT_NAME,
+      REPLICATION_FACTOR, "1",
+      MAX_SHARDS_PER_NODE,"1",
+      ROUTE_FIELD,null);
+
+
   // TODO: use from Overseer?
   private static final String QUEUE_OPERATION = "operation";
   
@@ -158,37 +192,14 @@ public class OverseerCollectionProcessor
   
   
   protected SolrResponse processMessage(ZkNodeProps message, String operation) {
-    
+    log.warn("OverseerCollectionProcessor.processMessage : "+ operation + " , "+ message.toString());
+
     NamedList results = new NamedList();
     try {
       if (CREATECOLLECTION.equals(operation)) {
         createCollection(zkStateReader.getClusterState(), message, results);
       } else if (DELETECOLLECTION.equals(operation)) {
-        ModifiableSolrParams params = new ModifiableSolrParams();
-        params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
-        params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
-        params.set(CoreAdminParams.DELETE_DATA_DIR, true);
-        collectionCmd(zkStateReader.getClusterState(), message, params, results, null);
-        
-        ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
-            Overseer.REMOVECOLLECTION, "name", message.getStr("name"));
-          Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(m));
-          
-        // wait for a while until we don't see the collection
-        long now = System.currentTimeMillis();
-        long timeout = now + 30000;
-        boolean removed = false;
-        while (System.currentTimeMillis() < timeout) {
-          Thread.sleep(100);
-          removed = !zkStateReader.getClusterState().getCollections().contains(message.getStr("name"));
-          if (removed) {
-            Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
-            break;
-          }
-        }
-        if (!removed) {
-          throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully remove collection: " + message.getStr("name"));
-        }
+        deleteCollection(message, results);
       } else if (RELOADCOLLECTION.equals(operation)) {
         ModifiableSolrParams params = new ModifiableSolrParams();
         params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
@@ -199,6 +210,10 @@ public class OverseerCollectionProcessor
         deleteAlias(zkStateReader.getAliases(), message);
       } else if (SPLITSHARD.equals(operation))  {
         splitShard(zkStateReader.getClusterState(), message, results);
+      } else if (CREATESHARD.equals(operation))  {
+        createShard(zkStateReader.getClusterState(), message, results);
+      } else if (DELETESHARD.equals(operation)) {
+        deleteShard(zkStateReader.getClusterState(), message, results);
       } else {
         throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
             + operation);
@@ -217,6 +232,34 @@ public class OverseerCollectionProcessor
     return new OverseerSolrResponse(results);
   }
 
+  private void deleteCollection(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
+    params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
+    params.set(CoreAdminParams.DELETE_DATA_DIR, true);
+    collectionCmd(zkStateReader.getClusterState(), message, params, results, null);
+
+    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
+        Overseer.REMOVECOLLECTION, "name", message.getStr("name"));
+    Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(m));
+
+    // wait for a while until we don't see the collection
+    long now = System.currentTimeMillis();
+    long timeout = now + 30000;
+    boolean removed = false;
+    while (System.currentTimeMillis() < timeout) {
+      Thread.sleep(100);
+      removed = !zkStateReader.getClusterState().getCollections().contains(message.getStr("name"));
+      if (removed) {
+        Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
+        break;
+      }
+    }
+    if (!removed) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully remove collection: " + message.getStr("name"));
+    }
+  }
+
   private void createAlias(Aliases aliases, ZkNodeProps message) {
     String aliasName = message.getStr("name");
     String collections = message.getStr("collections");
@@ -317,7 +360,84 @@ public class OverseerCollectionProcessor
     }
     
   }
-  
+
+  private boolean createShard(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
+    log.info("create shard invoked");
+    String collectionName = message.getStr(COLLECTION_PROP);
+    String shard = message.getStr(SHARD_ID_PROP);
+    if(collectionName == null || shard ==null)
+      throw new SolrException(ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters" );
+    int numSlices = 1;
+
+    DocCollection collection = clusterState.getCollection(collectionName);
+    int maxShardsPerNode = collection.getInt(MAX_SHARDS_PER_NODE, 1);
+    int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(MAX_SHARDS_PER_NODE, 1));
+//    int minReplicas = message.getInt("minReplicas",repFactor);
+    String createNodeSetStr =message.getStr(CREATE_NODE_SET);
+
+    ArrayList<Node> sortedNodeList = getNodesForNewShard(clusterState, collectionName, numSlices, maxShardsPerNode, repFactor, createNodeSetStr);
+
+    Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(message));
+    // wait for a while until we don't see the collection
+    long waitUntil = System.currentTimeMillis() + 30000;
+    boolean created = false;
+    while (System.currentTimeMillis() < waitUntil) {
+      Thread.sleep(100);
+      created = zkStateReader.getClusterState().getCollection(collectionName).getSlice(shard) !=null;
+      if (created) break;
+    }
+    if (!created)
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully create shard: " + message.getStr("name"));
+
+
+    String configName = message.getStr(COLL_CONF);
+    String sliceName = shard;
+    for (int j = 1; j <= repFactor; j++) {
+      String nodeName = sortedNodeList.get(((j - 1)) % sortedNodeList.size()).nodeName;
+      String shardName = collectionName + "_" + sliceName + "_replica" + j;
+      log.info("Creating shard " + shardName + " as part of slice "
+          + sliceName + " of collection " + collectionName + " on "
+          + nodeName);
+
+      // Need to create new params for each request
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
+
+      params.set(CoreAdminParams.NAME, shardName);
+      params.set(COLL_CONF, configName);
+      params.set(CoreAdminParams.COLLECTION, collectionName);
+      params.set(CoreAdminParams.SHARD, sliceName);
+      params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
+
+      ShardRequest sreq = new ShardRequest();
+      params.set("qt", adminPath);
+      sreq.purpose = 1;
+      String replica = zkStateReader.getZkClient()
+          .getBaseUrlForNodeName(nodeName);
+      if (replica.startsWith("http://")) replica = replica.substring(7);
+      sreq.shards = new String[]{replica};
+      sreq.actualShards = sreq.shards;
+      sreq.params = params;
+
+      shardHandler.submit(sreq, replica, sreq.params);
+
+    }
+
+    ShardResponse srsp;
+    do {
+      srsp = shardHandler.takeCompletedOrError();
+      if (srsp != null) {
+        processResponse(results, srsp);
+      }
+    } while (srsp != null);
+
+    log.info("Finished create command on all shards for collection: "
+        + collectionName);
+
+    return true;
+  }
+
+
   private boolean splitShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
     log.info("Split shard invoked");
     String collectionName = message.getStr("collection");
@@ -360,13 +480,17 @@ public class OverseerCollectionProcessor
             throw new SolrException(ErrorCode.BAD_REQUEST, "Sub-shard: " + subSlice + " exists in active state. Aborting split shard.");
           } else if (Slice.CONSTRUCTION.equals(oSlice.getState()))  {
             for (Replica replica : oSlice.getReplicas()) {
-              String core = replica.getStr("core");
-              log.info("Unloading core: " + core + " from node: " + replica.getNodeName());
-              ModifiableSolrParams params = new ModifiableSolrParams();
-              params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
-              params.set(CoreAdminParams.CORE, core);
-              params.set(CoreAdminParams.DELETE_INDEX, "true");
-              sendShardRequest(replica.getNodeName(), params);
+              if (clusterState.liveNodesContain(replica.getNodeName())) {
+                String core = replica.getStr("core");
+                log.info("Unloading core: " + core + " from node: " + replica.getNodeName());
+                ModifiableSolrParams params = new ModifiableSolrParams();
+                params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
+                params.set(CoreAdminParams.CORE, core);
+                params.set(CoreAdminParams.DELETE_INDEX, "true");
+                sendShardRequest(replica.getNodeName(), params);
+              } else  {
+                log.warn("Replica {} exists in shard {} but is not live and cannot be unloaded", replica, oSlice);
+              }
             }
           }
         }
@@ -397,13 +521,19 @@ public class OverseerCollectionProcessor
         //params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices); todo: is it necessary, we're not creating collections?
 
         sendShardRequest(nodeName, params);
+      }
 
+      collectShardResponses(results, true,
+          "SPLTSHARD failed to create subshard leaders");
+
+      for (String subShardName : subShardNames) {
         // wait for parent leader to acknowledge the sub-shard core
         log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
+        String coreNodeName = waitForCoreNodeName(collection, zkStateReader.getZkClient().getBaseUrlForNodeName(nodeName), subShardName);
         CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
         cmd.setCoreName(subShardName);
         cmd.setNodeName(nodeName);
-        cmd.setCoreNodeName(nodeName + "_" + subShardName);
+        cmd.setCoreNodeName(coreNodeName);
         cmd.setState(ZkStateReader.ACTIVE);
         cmd.setCheckLive(true);
         cmd.setOnlyIfLeader(true);
@@ -411,7 +541,7 @@ public class OverseerCollectionProcessor
       }
 
       collectShardResponses(results, true,
-          "SPLTSHARD failed to create subshard leaders or timed out waiting for them to come up");
+          "SPLTSHARD timed out waiting for subshard leaders to come up");
       
       log.info("Successfully created all sub-shards for collection "
           + collectionName + " parent shard: " + slice + " on: " + parentShardLeader);
@@ -506,12 +636,13 @@ public class OverseerCollectionProcessor
 
           sendShardRequest(subShardNodeName, params);
 
+          String coreNodeName = waitForCoreNodeName(collection, zkStateReader.getZkClient().getBaseUrlForNodeName(subShardNodeName), shardName);
           // wait for the replicas to be seen as active on sub shard leader
           log.info("Asking sub shard leader to wait for: " + shardName + " to be alive on: " + subShardNodeName);
           CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
           cmd.setCoreName(subShardNames.get(i-1));
           cmd.setNodeName(subShardNodeName);
-          cmd.setCoreNodeName(subShardNodeName + "_" + shardName);
+          cmd.setCoreNodeName(coreNodeName);
           cmd.setState(ZkStateReader.ACTIVE);
           cmd.setCheckLive(true);
           cmd.setOnlyIfLeader(true);
@@ -522,6 +653,18 @@ public class OverseerCollectionProcessor
       collectShardResponses(results, true,
           "SPLTSHARD failed to create subshard replicas or timed out waiting for them to come up");
 
+      String coreUrl = new ZkCoreNodeProps(parentShardLeader).getCoreUrl();
+      // HttpShardHandler is hard coded to send a QueryRequest hence we go direct
+      // and we force open a searcher so that we have documents to show upon switching states
+      UpdateResponse updateResponse = null;
+      try {
+        updateResponse = commit(coreUrl, true);
+        processResponse(results, null, coreUrl, updateResponse, slice);
+      } catch (Exception e) {
+        processResponse(results, e, coreUrl, updateResponse, slice);
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to call distrib commit on: " + coreUrl, e);
+      }
+
       log.info("Successfully created all replica shards for all sub-slices "
           + subSlices);
 
@@ -546,19 +689,140 @@ public class OverseerCollectionProcessor
     }
   }
 
+  static UpdateResponse commit(String url, boolean openSearcher) throws SolrServerException, IOException {
+    HttpSolrServer server = null;
+    try {
+      server = new HttpSolrServer(url);
+      server.setConnectionTimeout(30000);
+      server.setSoTimeout(60000);
+      UpdateRequest ureq = new UpdateRequest();
+      ureq.setParams(new ModifiableSolrParams());
+      ureq.getParams().set(UpdateParams.OPEN_SEARCHER, openSearcher);
+      ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true);
+      return ureq.process(server);
+    } finally {
+      if (server != null) {
+        server.shutdown();
+      }
+    }
+  }
+  
+  private String waitForCoreNodeName(DocCollection collection, String msgBaseUrl, String msgCore) {
+    int retryCount = 320;
+    while (retryCount-- > 0) {
+      Map<String,Slice> slicesMap = zkStateReader.getClusterState()
+          .getSlicesMap(collection.getName());
+      if (slicesMap != null) {
+        
+        for (Slice slice : slicesMap.values()) {
+          for (Replica replica : slice.getReplicas()) {
+            // TODO: for really large clusters, we could 'index' on this
+            
+            String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+            String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
+            
+            if (baseUrl.equals(msgBaseUrl) && core.equals(msgCore)) {
+              return replica.getName();
+            }
+          }
+        }
+      }
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+    throw new SolrException(ErrorCode.SERVER_ERROR, "Could not find coreNodeName");
+  }
+
   private void collectShardResponses(NamedList results, boolean abortOnError, String msgOnError) {
     ShardResponse srsp;
     do {
       srsp = shardHandler.takeCompletedOrError();
       if (srsp != null) {
         processResponse(results, srsp);
-        if (abortOnError && srsp.getException() != null)  {
-          throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, srsp.getException());
+        Throwable exception = srsp.getException();
+        if (abortOnError && exception != null)  {
+          // drain pending requests
+          while (srsp != null)  {
+            srsp = shardHandler.takeCompletedOrError();
+          }
+          throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, exception);
         }
       }
     } while (srsp != null);
   }
 
+  
+  private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
+    log.info("Delete shard invoked");
+    String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
+
+    String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
+    Slice slice = clusterState.getSlice(collection, sliceId);
+    
+    if (slice == null) {
+      if(clusterState.getCollections().contains(collection)) {
+        throw new SolrException(ErrorCode.BAD_REQUEST,
+            "No shard with the specified name exists: " + slice);
+      } else {
+        throw new SolrException(ErrorCode.BAD_REQUEST,
+            "No collection with the specified name exists: " + collection);
+      }      
+    }
+    // For now, only allow for deletions of Inactive slices or custom hashes (range==null).
+    // TODO: Add check for range gaps on Slice deletion
+    if (!(slice.getRange() == null || slice.getState().equals(Slice.INACTIVE))) {
+      throw new SolrException(ErrorCode.BAD_REQUEST,
+          "The slice: " + slice.getName() + " is currently "
+          + slice.getState() + ". Only INACTIVE (or custom-hashed) slices can be deleted.");
+    }
+
+    try {
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
+      params.set(CoreAdminParams.DELETE_INDEX, "true");
+      sliceCmd(clusterState, params, null, slice);
+
+      ShardResponse srsp;
+      do {
+        srsp = shardHandler.takeCompletedOrError();
+        if (srsp != null) {
+          processResponse(results, srsp);
+        }
+      } while (srsp != null);
+
+      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
+          Overseer.REMOVESHARD, ZkStateReader.COLLECTION_PROP, collection);
+      Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(m));
+
+      // wait for a while until we don't see the shard
+      long now = System.currentTimeMillis();
+      long timeout = now + 30000;
+      boolean removed = false;
+      while (System.currentTimeMillis() < timeout) {
+        Thread.sleep(100);
+        removed = zkStateReader.getClusterState().getSlice(collection, message.getStr("name")) == null;
+        if (removed) {
+          Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
+          break;
+        }
+      }
+      if (!removed) {
+        throw new SolrException(ErrorCode.SERVER_ERROR,
+            "Could not fully remove collection: " + collection + " shard: " + message.getStr("name"));
+      }
+
+      log.info("Successfully deleted collection " + collection + ", shard: " + message.getStr("name"));
+
+    } catch (SolrException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Error executing delete operation for collection: " + collection + " shard: " + message.getStr("name"), e);
+    }
+  }
+
   private void sendShardRequest(String nodeName, ModifiableSolrParams params) {
     ShardRequest sreq = new ShardRequest();
     params.set("qt", adminPath);
@@ -571,8 +835,8 @@ public class OverseerCollectionProcessor
 
     shardHandler.submit(sreq, replica, sreq.params);
   }
-  
-  private void createCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) {
+
+  private void createCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
     String collectionName = message.getStr("name");
     if (clusterState.getCollections().contains(collectionName)) {
       throw new SolrException(ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
@@ -582,14 +846,22 @@ public class OverseerCollectionProcessor
       // look at the replication factor and see if it matches reality
       // if it does not, find best nodes to create more cores
       
-      int repFactor = msgStrToInt(message, REPLICATION_FACTOR, 1);
-      Integer numSlices = msgStrToInt(message, NUM_SLICES, null);
-      
-      if (numSlices == null) {
+      int repFactor = message.getInt( REPLICATION_FACTOR, 1);
+      Integer numSlices = message.getInt(NUM_SLICES, null);
+      String router = message.getStr(ROUTER, DocRouter.DEFAULT_NAME);
+      List<String> shardNames = new ArrayList<>();
+      if(ImplicitDocRouter.NAME.equals(router)){
+        Overseer.getShardNames(shardNames, message.getStr("shards",null));
+        numSlices = shardNames.size();
+      } else {
+        Overseer.getShardNames(numSlices,shardNames);
+      }
+
+      if (numSlices == null ) {
         throw new SolrException(ErrorCode.BAD_REQUEST, NUM_SLICES + " is a required param");
       }
-      
-      int maxShardsPerNode = msgStrToInt(message, MAX_SHARDS_PER_NODE, 1);
+
+      int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1);
       String createNodeSetStr; 
       List<String> createNodeList = ((createNodeSetStr = message.getStr(CREATE_NODE_SET)) == null)?null:StrUtils.splitSmart(createNodeSetStr, ",", true);
       
@@ -601,8 +873,6 @@ public class OverseerCollectionProcessor
         throw new SolrException(ErrorCode.BAD_REQUEST, NUM_SLICES + " must be > 0");
       }
       
-      String configName = message.getStr("collection.configName");
-      
       // we need to look at every node and see how many cores it serves
       // add our new cores to existing nodes serving the least number of cores
       // but (for now) require that each core goes on a distinct node.
@@ -645,26 +915,44 @@ public class OverseerCollectionProcessor
             + ". This requires " + requestedShardsToCreate
             + " shards to be created (higher than the allowed number)");
       }
-      
-      for (int i = 1; i <= numSlices; i++) {
+
+//      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
+//          Overseer.CREATECOLLECTION, "name", message.getStr("name"));
+      Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(message));
+
+      // wait for a while until we don't see the collection
+      long waitUntil = System.currentTimeMillis() + 30000;
+      boolean created = false;
+      while (System.currentTimeMillis() < waitUntil) {
+        Thread.sleep(100);
+        created = zkStateReader.getClusterState().getCollections().contains(message.getStr("name"));
+        if(created) break;
+      }
+      if (!created)
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully createcollection: " + message.getStr("name"));
+
+
+      String configName = message.getStr(COLL_CONF);
+      log.info("going to create cores replicas shardNames {} , repFactor : {}", shardNames, repFactor);
+      for (int i = 1; i <= shardNames.size(); i++) {
+        String sliceName = shardNames.get(i-1);
         for (int j = 1; j <= repFactor; j++) {
           String nodeName = nodeList.get((repFactor * (i - 1) + (j - 1)) % nodeList.size());
-          String sliceName = "shard" + i;
           String shardName = collectionName + "_" + sliceName + "_replica" + j;
           log.info("Creating shard " + shardName + " as part of slice "
               + sliceName + " of collection " + collectionName + " on "
               + nodeName);
-          
+
           // Need to create new params for each request
           ModifiableSolrParams params = new ModifiableSolrParams();
           params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
-          
+
           params.set(CoreAdminParams.NAME, shardName);
-          params.set("collection.configName", configName);
+          params.set(COLL_CONF, configName);
           params.set(CoreAdminParams.COLLECTION, collectionName);
           params.set(CoreAdminParams.SHARD, sliceName);
           params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
-          
+
           ShardRequest sreq = new ShardRequest();
           params.set("qt", adminPath);
           sreq.purpose = 1;
@@ -674,12 +962,12 @@ public class OverseerCollectionProcessor
           sreq.shards = new String[] {replica};
           sreq.actualShards = sreq.shards;
           sreq.params = params;
-          
+
           shardHandler.submit(sreq, replica, sreq.params);
-          
+
         }
       }
-      
+
       ShardResponse srsp;
       do {
         srsp = shardHandler.takeCompletedOrError();
@@ -697,7 +985,7 @@ public class OverseerCollectionProcessor
       throw new SolrException(ErrorCode.SERVER_ERROR, null, ex);
     }
   }
-  
+
   private void collectionCmd(ClusterState clusterState, ZkNodeProps message, ModifiableSolrParams params, NamedList results, String stateMatcher) {
     log.info("Executing Collection Cmd : " + params);
     String collectionName = message.getStr("name");
@@ -711,33 +999,7 @@ public class OverseerCollectionProcessor
     
     for (Map.Entry<String,Slice> entry : coll.getSlicesMap().entrySet()) {
       Slice slice = entry.getValue();
-      Map<String,Replica> shards = slice.getReplicasMap();
-      Set<Map.Entry<String,Replica>> shardEntries = shards.entrySet();
-      for (Map.Entry<String,Replica> shardEntry : shardEntries) {
-        final ZkNodeProps node = shardEntry.getValue();
-        if (clusterState.liveNodesContain(node.getStr(ZkStateReader.NODE_NAME_PROP)) && (stateMatcher != null ? node.getStr(ZkStateReader.STATE_PROP).equals(stateMatcher) : true)) {
-          // For thread safety, only simple clone the ModifiableSolrParams
-          ModifiableSolrParams cloneParams = new ModifiableSolrParams();
-          cloneParams.add(params);
-          cloneParams.set(CoreAdminParams.CORE,
-              node.getStr(ZkStateReader.CORE_NAME_PROP));
-          
-          String replica = node.getStr(ZkStateReader.BASE_URL_PROP);
-          ShardRequest sreq = new ShardRequest();
-          sreq.nodeName = node.getStr(ZkStateReader.NODE_NAME_PROP);
-          // yes, they must use same admin handler path everywhere...
-          cloneParams.set("qt", adminPath);
-          sreq.purpose = 1;
-          // TODO: this sucks
-          if (replica.startsWith("http://")) replica = replica.substring(7);
-          sreq.shards = new String[] {replica};
-          sreq.actualShards = sreq.shards;
-          sreq.params = cloneParams;
-          log.info("Collection Admin sending CoreAdmin cmd to " + replica
-              + " params:" + sreq.params);
-          shardHandler.submit(sreq, replica, sreq.params);
-        }
-      }
+      sliceCmd(clusterState, params, stateMatcher, slice);
     }
     
     ShardResponse srsp;
@@ -750,39 +1012,66 @@ public class OverseerCollectionProcessor
 
   }
 
+  private void sliceCmd(ClusterState clusterState, ModifiableSolrParams params, String stateMatcher, Slice slice) {
+    Map<String,Replica> shards = slice.getReplicasMap();
+    Set<Map.Entry<String,Replica>> shardEntries = shards.entrySet();
+    for (Map.Entry<String,Replica> shardEntry : shardEntries) {
+      final ZkNodeProps node = shardEntry.getValue();
+      if (clusterState.liveNodesContain(node.getStr(ZkStateReader.NODE_NAME_PROP)) && (stateMatcher != null ? node.getStr(ZkStateReader.STATE_PROP).equals(stateMatcher) : true)) {
+        // For thread safety, only simple clone the ModifiableSolrParams
+        ModifiableSolrParams cloneParams = new ModifiableSolrParams();
+        cloneParams.add(params);
+        cloneParams.set(CoreAdminParams.CORE,
+            node.getStr(ZkStateReader.CORE_NAME_PROP));
+
+        String replica = node.getStr(ZkStateReader.BASE_URL_PROP);
+        ShardRequest sreq = new ShardRequest();
+        sreq.nodeName = node.getStr(ZkStateReader.NODE_NAME_PROP);
+        // yes, they must use same admin handler path everywhere...
+        cloneParams.set("qt", adminPath);
+        sreq.purpose = 1;
+        // TODO: this sucks
+        if (replica.startsWith("http://")) replica = replica.substring(7);
+        sreq.shards = new String[] {replica};
+        sreq.actualShards = sreq.shards;
+        sreq.params = cloneParams;
+        log.info("Collection Admin sending CoreAdmin cmd to " + replica
+            + " params:" + sreq.params);
+        shardHandler.submit(sreq, replica, sreq.params);
+      }
+    }
+  }
+
   private void processResponse(NamedList results, ShardResponse srsp) {
     Throwable e = srsp.getException();
+    String nodeName = srsp.getNodeName();
+    SolrResponse solrResponse = srsp.getSolrResponse();
+    String shard = srsp.getShard();
+
+    processResponse(results, e, nodeName, solrResponse, shard);
+  }
+
+  private void processResponse(NamedList results, Throwable e, String nodeName, SolrResponse solrResponse, String shard) {
     if (e != null) {
-      log.error("Error from shard: " + srsp.getShard(), e);
-      
+      log.error("Error from shard: " + shard, e);
+
       SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
       if (failure == null) {
         failure = new SimpleOrderedMap();
         results.add("failure", failure);
       }
 
-      failure.add(srsp.getNodeName(), e.getClass().getName() + ":" + e.getMessage());
-      
+      failure.add(nodeName, e.getClass().getName() + ":" + e.getMessage());
+
     } else {
-      
+
       SimpleOrderedMap success = (SimpleOrderedMap) results.get("success");
       if (success == null) {
         success = new SimpleOrderedMap();
         results.add("success", success);
       }
-      
-      success.add(srsp.getNodeName(), srsp.getSolrResponse().getResponse());
-    }
-  }
-  
-  private Integer msgStrToInt(ZkNodeProps message, String key, Integer def)
-      throws Exception {
-    String str = message.getStr(key);
-    try {
-      return str == null ? def : Integer.parseInt(str);
-    } catch (Exception ex) {
-      SolrException.log(log, "Could not parse " + key, ex);
-      throw ex;
+
+      success.add(nodeName, solrResponse.getResponse());
     }
   }
 
@@ -790,4 +1079,12 @@ public class OverseerCollectionProcessor
   public boolean isClosed() {
     return isClosed;
   }
+
+  public static Map<String, Object> asMap(Object... vals) {
+    HashMap<String, Object> m = new HashMap<String, Object>();
+    for(int i=0; i<vals.length; i+=2) {
+      m.put(String.valueOf(vals[i]), vals[i+1]);
+    }
+    return m;
+  }
 }

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1512909&r1=1512908&r2=1512909&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Sun Aug 11 12:19:13 2013
@@ -91,7 +91,7 @@ public class RecoveryStrategy extends Th
     zkController = cc.getZkController();
     zkStateReader = zkController.getZkStateReader();
     baseUrl = zkController.getBaseUrl();
-    coreZkNodeName = zkController.getCoreNodeName(cd);
+    coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName();
   }
 
   public void setRecoveringAfterStartup(boolean recoveringAfterStartup) {
@@ -118,44 +118,40 @@ public class RecoveryStrategy extends Th
     }
   }
   
-  private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops, String baseUrl)
+  private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops)
       throws SolrServerException, IOException {
-   
-    String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
+
     ZkCoreNodeProps leaderCNodeProps = new ZkCoreNodeProps(leaderprops);
     String leaderUrl = leaderCNodeProps.getCoreUrl();
     
     log.info("Attempting to replicate from " + leaderUrl + ". core=" + coreName);
     
-    // if we are the leader, either we are trying to recover faster
-    // then our ephemeral timed out or we are the only node
-    if (!leaderBaseUrl.equals(baseUrl)) {
-      
-      // send commit
-      commitOnLeader(leaderUrl);
-      
-      // use rep handler directly, so we can do this sync rather than async
-      SolrRequestHandler handler = core.getRequestHandler(REPLICATION_HANDLER);
-      if (handler instanceof LazyRequestHandlerWrapper) {
-        handler = ((LazyRequestHandlerWrapper)handler).getWrappedHandler();
-      }
-      ReplicationHandler replicationHandler = (ReplicationHandler) handler;
-      
-      if (replicationHandler == null) {
-        throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
-            "Skipping recovery, no " + REPLICATION_HANDLER + " handler found");
-      }
-      
-      ModifiableSolrParams solrParams = new ModifiableSolrParams();
-      solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl);
-      
-      if (isClosed()) retries = INTERRUPTED;
-      boolean success = replicationHandler.doFetch(solrParams, false);
+    // send commit
+    commitOnLeader(leaderUrl);
+    
+    // use rep handler directly, so we can do this sync rather than async
+    SolrRequestHandler handler = core.getRequestHandler(REPLICATION_HANDLER);
+    if (handler instanceof LazyRequestHandlerWrapper) {
+      handler = ((LazyRequestHandlerWrapper) handler).getWrappedHandler();
+    }
+    ReplicationHandler replicationHandler = (ReplicationHandler) handler;
+    
+    if (replicationHandler == null) {
+      throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
+          "Skipping recovery, no " + REPLICATION_HANDLER + " handler found");
+    }
+    
+    ModifiableSolrParams solrParams = new ModifiableSolrParams();
+    solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl);
+    
+    if (isClosed()) retries = INTERRUPTED;
+    boolean success = replicationHandler.doFetch(solrParams, false);
+    
+    if (!success) {
+      throw new SolrException(ErrorCode.SERVER_ERROR,
+          "Replication for recovery failed.");
+    }
 
-      if (!success) {
-        throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed.");
-      }
-      
       // solrcloud_debug
 //      try {
 //        RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
@@ -169,7 +165,7 @@ public class RecoveryStrategy extends Th
 //      } catch (Exception e) {
 //        
 //      }
-    }
+    
   }
 
   private void commitOnLeader(String leaderUrl) throws SolrServerException, IOException {
@@ -329,10 +325,10 @@ public class RecoveryStrategy extends Th
         String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
 
         boolean isLeader = leaderUrl.equals(ourUrl);
-        if (isLeader && !cloudDesc.isLeader) {
+        if (isLeader && !cloudDesc.isLeader()) {
           throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
         }
-        if (cloudDesc.isLeader) {
+        if (cloudDesc.isLeader()) {
           // we are now the leader - no one else must have been suitable
           log.warn("We have not yet recovered - but we are now the leader! core=" + coreName);
           log.info("Finished recovery process. core=" + coreName);
@@ -406,8 +402,7 @@ public class RecoveryStrategy extends Th
         
         try {
 
-          replicate(zkController.getNodeName(), core,
-              leaderprops, leaderUrl);
+          replicate(zkController.getNodeName(), core, leaderprops);
 
           replay(ulog);
           replayed = true;

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/SolrZkServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/SolrZkServer.java?rev=1512909&r1=1512908&r2=1512909&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/SolrZkServer.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/SolrZkServer.java Sun Aug 11 12:19:13 2013
@@ -179,7 +179,7 @@ class SolrZkServerProps extends QuorumPe
       Properties cfg = new Properties();
       FileInputStream in = new FileInputStream(configFile);
       try {
-        cfg.load(in);
+        cfg.load(new InputStreamReader(in, IOUtils.CHARSET_UTF_8));
       } finally {
         in.close();
       }

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java?rev=1512909&r1=1512908&r2=1512909&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java Sun Aug 11 12:19:13 2013
@@ -62,12 +62,12 @@ public class SyncStrategy {
 
   private volatile boolean isClosed;
   
-  private final static HttpClient client;
-  static {
+  private final HttpClient client;
+  {
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 10000);
     params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 20);
-    params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, 30000);
+    params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, 15000);
     params.set(HttpClientUtil.PROP_SO_TIMEOUT, 30000);
     params.set(HttpClientUtil.PROP_USE_RETRY, false);
     client = HttpClientUtil.createClient(params);
@@ -87,6 +87,10 @@ public class SyncStrategy {
     if (SKIP_AUTO_RECOVERY) {
       return true;
     }
+    if (isClosed) {
+      log.warn("Closed, skipping sync up.");
+      return false;
+    }
     log.info("Sync replicas to " + ZkCoreNodeProps.getCoreUrl(leaderProps));
     // TODO: look at our state usage of sync
     // zkController.publish(core, ZkStateReader.SYNC);
@@ -112,20 +116,6 @@ public class SyncStrategy {
       log.info("We have been closed, won't sync with replicas");
       return false;
     }
-    // if no one that is up is active, we are willing to wait...
-    // we don't want a recovering node to become leader and then
-    // a better candidate pops up a second later.
-//    int tries = 20;
-//    while (!areAnyReplicasActive(zkController, collection, shardId)) {
-//      if (tries-- == 0) {
-//        break;
-//      }
-//      try {
-//        Thread.sleep(500);
-//      } catch (InterruptedException e) {
-//        Thread.currentThread().interrupt();
-//      }
-//    }
     
     // first sync ourselves - we are the potential leader after all
     try {
@@ -146,7 +136,7 @@ public class SyncStrategy {
         syncToMe(zkController, collection, shardId, leaderProps, core.getCoreDescriptor());
         
       } else {
-        log.info("Leader's attempt to sync with shard failed, moving to the next canidate");
+        log.info("Leader's attempt to sync with shard failed, moving to the next candidate");
         // lets see who seems ahead...
       }
       
@@ -160,8 +150,7 @@ public class SyncStrategy {
   private boolean syncWithReplicas(ZkController zkController, SolrCore core,
       ZkNodeProps props, String collection, String shardId) {
     List<ZkCoreNodeProps> nodes = zkController.getZkStateReader()
-        .getReplicaProps(collection, shardId,
-            zkController.getCoreNodeName(core.getCoreDescriptor()),
+        .getReplicaProps(collection, shardId,core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName(),
             props.getStr(ZkStateReader.CORE_NAME_PROP));
     
     if (nodes == null) {
@@ -189,7 +178,7 @@ public class SyncStrategy {
     List<ZkCoreNodeProps> nodes = zkController
         .getZkStateReader()
         .getReplicaProps(collection, shardId,
-            zkController.getCoreNodeName(cd),
+            cd.getCloudDescriptor().getCoreNodeName(),
             leaderProps.getStr(ZkStateReader.CORE_NAME_PROP));
     if (nodes == null) {
       log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + " has no replicas");
@@ -273,6 +262,11 @@ public class SyncStrategy {
   public void close() {
     this.isClosed = true;
     try {
+      client.getConnectionManager().shutdown();
+    } catch (Throwable e) {
+      SolrException.log(log, e);
+    }
+    try {
       ExecutorUtil.shutdownNowAndAwaitTermination(recoveryCmdExecutor);
     } catch (Throwable e) {
       SolrException.log(log, e);
@@ -280,7 +274,6 @@ public class SyncStrategy {
   }
   
   private void requestRecovery(final ZkNodeProps leaderProps, final String baseUrl, final String coreName) throws SolrServerException, IOException {
-    // TODO: do this in background threads
     Thread thread = new Thread() {
       {
         setDaemon(true);
@@ -291,9 +284,9 @@ public class SyncStrategy {
         recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
         recoverRequestCmd.setCoreName(coreName);
         
-        HttpSolrServer server = new HttpSolrServer(baseUrl);
-        server.setConnectionTimeout(45000);
-        server.setSoTimeout(45000);
+        HttpSolrServer server = new HttpSolrServer(baseUrl, client);
+        server.setConnectionTimeout(15000);
+        server.setSoTimeout(30000);
         try {
           server.request(recoverRequestCmd);
         } catch (Throwable t) {

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/ZkCLI.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/ZkCLI.java?rev=1512909&r1=1512908&r2=1512909&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/ZkCLI.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/ZkCLI.java Sun Aug 11 12:19:13 2013
@@ -1,14 +1,5 @@
 package org.apache.solr.cloud;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-import java.util.concurrent.TimeoutException;
-
-import javax.xml.parsers.ParserConfigurationException;
-
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -17,18 +8,21 @@ import org.apache.commons.cli.OptionBuil
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.PosixParser;
-import org.apache.commons.io.IOUtils;
 import org.apache.solr.common.cloud.OnReconnect;
 import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.core.Config;
-import org.apache.solr.core.ConfigSolr;
-import org.apache.solr.core.ConfigSolrXml;
-import org.apache.solr.core.ConfigSolrXmlOld;
-import org.apache.solr.core.SolrResourceLoader;
+import org.apache.solr.core.CoreContainer;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.xml.sax.InputSource;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
 import org.xml.sax.SAXException;
 
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -49,6 +43,7 @@ import org.xml.sax.SAXException;
 public class ZkCLI {
   
   private static final String MAKEPATH = "makepath";
+  private static final String PUT = "put";
   private static final String DOWNCONFIG = "downconfig";
   private static final String ZK_CLI_NAME = "ZkCLI";
   private static final String HELP = "help";
@@ -92,7 +87,7 @@ public class ZkCLI {
         .hasArg(true)
         .withDescription(
             "cmd to run: " + BOOTSTRAP + ", " + UPCONFIG + ", " + DOWNCONFIG
-                + ", " + LINKCONFIG + ", " + MAKEPATH + ", "+ LIST + ", " +CLEAR).create(CMD));
+                + ", " + LINKCONFIG + ", " + MAKEPATH + ", "+ PUT + ", "+ LIST + ", " + CLEAR).create(CMD));
 
     Option zkHostOption = new Option("z", ZKHOST, true,
         "ZooKeeper host address");
@@ -135,6 +130,7 @@ public class ZkCLI {
         System.out.println("zkcli.sh -zkhost localhost:9983 -cmd " + DOWNCONFIG + " -" + CONFDIR + " /opt/solr/collection1/conf" + " -" + CONFNAME + " myconf");
         System.out.println("zkcli.sh -zkhost localhost:9983 -cmd " + LINKCONFIG + " -" + COLLECTION + " collection1" + " -" + CONFNAME + " myconf");
         System.out.println("zkcli.sh -zkhost localhost:9983 -cmd " + MAKEPATH + " /apache/solr");
+        System.out.println("zkcli.sh -zkhost localhost:9983 -cmd " + PUT + " /solr.conf 'conf data'");
         System.out.println("zkcli.sh -zkhost localhost:9983 -cmd " + CLEAR + " /solr");
         System.out.println("zkcli.sh -zkhost localhost:9983 -cmd " + LIST);
         return;
@@ -175,35 +171,18 @@ public class ZkCLI {
                 + " is required for " + BOOTSTRAP);
             System.exit(1);
           }
-          SolrResourceLoader loader = new SolrResourceLoader(solrHome);
-          solrHome = loader.getInstanceDir();
-
-          File configFile = new File(solrHome, SOLR_XML);
-          InputStream is = new FileInputStream(configFile);
-
-          ConfigSolr cfg;
-
-          try {
-            Config config = new Config(loader, null, new InputSource(is), null, false);
-            
-            boolean oldStyle = (config.getNode("solr/cores", false) != null);
-
-             if (oldStyle) {
-               cfg = new ConfigSolrXmlOld(config, null);
-             } else {
-               cfg = new ConfigSolrXml(config, null);
-             }
-          } finally {
-            IOUtils.closeQuietly(is);
-          }
 
+          CoreContainer cc = new CoreContainer(solrHome);
 
           if(!ZkController.checkChrootPath(zkServerAddress, true)) {
             System.out.println("A chroot was specified in zkHost but the znode doesn't exist. ");
             System.exit(1);
           }
 
-          ZkController.bootstrapConf(zkClient, cfg, solrHome);
+          ZkController.bootstrapConf(zkClient, cc, solrHome);
+
+          // No need to shutdown the CoreContainer, as it wasn't started
+          // up in the first place...
           
         } else if (line.getOptionValue(CMD).equals(UPCONFIG)) {
           if (!line.hasOption(CONFDIR) || !line.hasOption(CONFNAME)) {
@@ -256,6 +235,15 @@ public class ZkCLI {
             System.exit(1);
           }
           zkClient.makePath(arglist.get(0).toString(), true);
+        } else if (line.getOptionValue(CMD).equals(PUT)) {
+          List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+          List arglist = line.getArgList();
+          if (arglist.size() != 2) {
+            System.out.println("-" + PUT + " requires two args - the path to create and the data string");
+            System.exit(1);
+          }
+          zkClient.create(arglist.get(0).toString(), arglist.get(1).toString().getBytes("UTF-8"),
+                          acl, CreateMode.PERSISTENT, true);
         }
       } finally {
         if (solrPort != null) {

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1512909&r1=1512908&r2=1512909&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/cloud/ZkController.java Sun Aug 11 12:19:13 2013
@@ -17,29 +17,8 @@ package org.apache.solr.cloud;
  * limitations under the License.
  */
 
-import java.io.File;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.URLEncoder;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeoutException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.solr.client.solrj.impl.HttpSolrServer;
 import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
 import org.apache.solr.common.SolrException;
@@ -58,14 +37,12 @@ import org.apache.solr.common.cloud.ZkNo
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.core.ConfigSolr;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.component.ShardHandler;
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.update.UpdateShardHandler;
-import org.apache.solr.util.PropertiesUtil;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -74,6 +51,28 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.URLEncoder;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 /**
  * Handle ZooKeeper interactions.
  * 
@@ -131,7 +130,9 @@ public final class ZkController {
 
   protected volatile Overseer overseer;
 
-  private String leaderVoteWait;
+  private int leaderVoteWait;
+  
+  private boolean genericCoreNodeNames;
 
   private int clientTimeout;
 
@@ -140,11 +141,11 @@ public final class ZkController {
   private UpdateShardHandler updateShardHandler;
 
   public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
-      String localHostContext, String leaderVoteWait, int distribUpdateConnTimeout, int distribUpdateSoTimeout, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
+      String localHostContext, int leaderVoteWait, boolean genericCoreNodeNames, int distribUpdateConnTimeout, int distribUpdateSoTimeout, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
       TimeoutException, IOException {
     if (cc == null) throw new IllegalArgumentException("CoreContainer cannot be null.");
     this.cc = cc;
-
+    this.genericCoreNodeNames = genericCoreNodeNames;
     // be forgiving and strip this off leading/trailing slashes
     // this allows us to support users specifying hostContext="/" in 
     // solr.xml to indicate the root context, instead of hostContext="" 
@@ -241,7 +242,7 @@ public final class ZkController {
     init(registerOnReconnect);
   }
 
-  public String getLeaderVoteWait() {
+  public int getLeaderVoteWait() {
     return leaderVoteWait;
   }
 
@@ -254,9 +255,9 @@ public final class ZkController {
       // before registering as live, make sure everyone is in a
       // down state
       for (CoreDescriptor descriptor : descriptors) {
-        final String coreZkNodeName = getCoreNodeName(descriptor);
+        final String coreZkNodeName = descriptor.getCloudDescriptor().getCoreNodeName();
         try {
-          descriptor.getCloudDescriptor().isLeader = false;
+          descriptor.getCloudDescriptor().setLeader(false);
           publish(descriptor, ZkStateReader.DOWN, updateLastPublished);
         } catch (Exception e) {
           if (isClosed) {
@@ -323,7 +324,7 @@ public final class ZkController {
         .getCurrentDescriptors();
     if (descriptors != null) {
       for (CoreDescriptor descriptor : descriptors) {
-        descriptor.getCloudDescriptor().isLeader = false;
+        descriptor.getCloudDescriptor().setLeader(false);
       }
     }
   }
@@ -544,7 +545,6 @@ public final class ZkController {
           if (replica.getNodeName().equals(getNodeName())
               && !(replica.getStr(ZkStateReader.STATE_PROP)
                   .equals(ZkStateReader.DOWN))) {
-            assert replica.getStr(ZkStateReader.SHARD_ID_PROP) != null;
             ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
                 ZkStateReader.STATE_PROP, ZkStateReader.DOWN,
                 ZkStateReader.BASE_URL_PROP, getBaseUrl(),
@@ -555,8 +555,7 @@ public final class ZkController {
                 ZkStateReader.NODE_NAME_PROP, getNodeName(),
                 ZkStateReader.SHARD_ID_PROP,
                 replica.getStr(ZkStateReader.SHARD_ID_PROP),
-                ZkStateReader.COLLECTION_PROP,
-                replica.getStr(ZkStateReader.COLLECTION_PROP),
+                ZkStateReader.COLLECTION_PROP, collectionName,
                 ZkStateReader.CORE_NODE_NAME_PROP, replica.getName());
             updatedNodes.add(replica.getStr(ZkStateReader.CORE_NAME_PROP));
             overseerJobQueue.offer(ZkStateReader.toJSON(m));
@@ -735,7 +734,8 @@ public final class ZkController {
     final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
     final String collection = cloudDesc.getCollectionName();
 
-    final String coreZkNodeName = getCoreNodeName(desc);
+    final String coreZkNodeName = desc.getCloudDescriptor().getCoreNodeName();
+    assert coreZkNodeName != null : "we should have a coreNodeName by now";
     
     String shardId = cloudDesc.getShardId();
 
@@ -769,7 +769,7 @@ public final class ZkController {
     // in this case, we want to wait for the leader as long as the leader might 
     // wait for a vote, at least - but also long enough that a large cluster has
     // time to get its act together
-    String leaderUrl = getLeader(cloudDesc, Integer.parseInt(leaderVoteWait) + 600000);
+    String leaderUrl = getLeader(cloudDesc, leaderVoteWait + 600000);
     
     String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
     log.info("We are " + ourUrl + " and leader is " + leaderUrl);
@@ -794,9 +794,7 @@ public final class ZkController {
       if (!core.isReloaded() && ulog != null) {
         // disable recovery in case shard is in construction state (for shard splits)
         Slice slice = getClusterState().getSlice(collection, shardId);
-        if (Slice.CONSTRUCTION.equals(slice.getState())) {
-          publish(desc, ZkStateReader.ACTIVE);
-        } else {
+        if (!Slice.CONSTRUCTION.equals(slice.getState()) || !isLeader) {
           Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler()
               .getUpdateLog().recoverFromLog();
           if (recoveryFuture != null) {
@@ -807,11 +805,11 @@ public final class ZkController {
           } else {
             log.info("No LogReplay needed for core=" + core.getName() + " baseURL=" + baseUrl);
           }
-          boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
-              collection, coreZkNodeName, shardId, leaderProps, core, cc);
-          if (!didRecovery) {
-            publish(desc, ZkStateReader.ACTIVE);
-          }
+        }
+        boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
+            collection, coreZkNodeName, shardId, leaderProps, core, cc);
+        if (!didRecovery) {
+          publish(desc, ZkStateReader.ACTIVE);
         }
       }
     } finally {
@@ -923,16 +921,16 @@ public final class ZkController {
     props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
     props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
     
-    final String coreZkNodeName = getCoreNodeName(cd);
+    final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
     ZkNodeProps ourProps = new ZkNodeProps(props);
     String collection = cd.getCloudDescriptor()
         .getCollectionName();
     
     ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
-        collection, coreZkNodeName, ourProps, this, cc);
+        collection, coreNodeName, ourProps, this, cc);
 
     leaderElector.setup(context);
-    electionContexts.put(coreZkNodeName, context);
+    electionContexts.put(coreNodeName, context);
     leaderElector.joinElection(context, false);
   }
 
@@ -989,6 +987,9 @@ public final class ZkController {
       numShards = Integer.getInteger(ZkStateReader.NUM_SHARDS_PROP);
     }
     
+    assert cd.getCloudDescriptor().getCollectionName() != null && cd.getCloudDescriptor()
+        .getCollectionName().length() > 0;
+    
     String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
     //assert cd.getCloudDescriptor().getShardId() != null;
     ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state", 
@@ -1017,7 +1018,7 @@ public final class ZkController {
 
     final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
     
-    final String shardId = state.getShardId(coreNodeName);
+    final String shardId = state.getShardId(getBaseUrl(), desc.getName());
 
     if (shardId != null) {
       cloudDesc.setShardId(shardId);
@@ -1028,16 +1029,21 @@ public final class ZkController {
 
   public void unregister(String coreName, CoreDescriptor cd)
       throws InterruptedException, KeeperException {
-    final String zkNodeName = getCoreNodeName(cd);
-    ElectionContext context = electionContexts.remove(zkNodeName);
+    final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
+    ElectionContext context = electionContexts.remove(coreNodeName);
+    
+    assert context != null : coreNodeName;
+    
     if (context != null) {
       context.cancelElection();
     }
+    CloudDescriptor cloudDescriptor = cd.getCloudDescriptor();
     
     ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
-        "deletecore", ZkStateReader.CORE_NAME_PROP, coreName,
+        Overseer.DELETECORE, ZkStateReader.CORE_NAME_PROP, coreName,
         ZkStateReader.NODE_NAME_PROP, getNodeName(),
-        ZkStateReader.COLLECTION_PROP, cd.getCloudDescriptor().getCollectionName());
+        ZkStateReader.COLLECTION_PROP, cloudDescriptor.getCollectionName(),
+        ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
     overseerJobQueue.offer(ZkStateReader.toJSON(m));
   }
   
@@ -1206,13 +1212,60 @@ public final class ZkController {
     return zkStateReader;
   }
 
-  private String doGetShardIdProcess(String coreName, CoreDescriptor descriptor) {
-    final String coreNodeName = getCoreNodeName(descriptor);
+  private void doGetShardIdAndNodeNameProcess(CoreDescriptor cd) {
+    final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
+
+    if (coreNodeName != null) {
+      waitForShardId(cd);
+    } else {
+      // if no explicit coreNodeName, we want to match by base url and core name
+      waitForCoreNodeName(cd);
+      waitForShardId(cd);
+    }
+  }
+
+  private void waitForCoreNodeName(CoreDescriptor descriptor) {
     int retryCount = 320;
+    log.info("look for our core node name");
     while (retryCount-- > 0) {
-      final String shardId = zkStateReader.getClusterState().getShardId(coreNodeName);
+      Map<String,Slice> slicesMap = zkStateReader.getClusterState()
+          .getSlicesMap(descriptor.getCloudDescriptor().getCollectionName());
+      if (slicesMap != null) {
+        
+        for (Slice slice : slicesMap.values()) {
+          for (Replica replica : slice.getReplicas()) {
+            // TODO: for really large clusters, we could 'index' on this
+            
+            String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+            String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
+            
+            String msgBaseUrl = getBaseUrl();
+            String msgCore = descriptor.getName();
+
+            if (baseUrl.equals(msgBaseUrl) && core.equals(msgCore)) {
+              descriptor.getCloudDescriptor()
+                  .setCoreNodeName(replica.getName());
+              return;
+            }
+          }
+        }
+      }
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  private void waitForShardId(CoreDescriptor cd) {
+    log.info("waiting to find shard id in clusterstate for " + cd.getName());
+    int retryCount = 320;
+    while (retryCount-- > 0) {
+      final String shardId = zkStateReader.getClusterState().getShardId(getBaseUrl(), cd.getName());
       if (shardId != null) {
-        return shardId;
+        cd.getCloudDescriptor().setShardId(shardId);
+        return;
       }
       try {
         Thread.sleep(1000);
@@ -1222,7 +1275,7 @@ public final class ZkController {
     }
     
     throw new SolrException(ErrorCode.SERVER_ERROR,
-        "Could not get shard_id for core: " + coreName + " coreNodeName:" + coreNodeName);
+        "Could not get shard id for core: " + cd.getName());
   }
   
   public static void uploadToZK(SolrZkClient zkClient, File dir, String zkPath) throws IOException, KeeperException, InterruptedException {
@@ -1261,7 +1314,7 @@ public final class ZkController {
   
   public String getCoreNodeName(CoreDescriptor descriptor){
     String coreNodeName = descriptor.getCloudDescriptor().getCoreNodeName();
-    if (coreNodeName == null) {
+    if (coreNodeName == null && !genericCoreNodeNames) {
       // it's the default
       return getNodeName() + "_" + descriptor.getName();
     }
@@ -1277,34 +1330,33 @@ public final class ZkController {
     downloadFromZK(zkClient, ZkController.CONFIGS_ZKNODE + "/" + configName, dir);
   }
 
-  public void preRegister(SolrCore core) throws KeeperException, InterruptedException {
-    CoreDescriptor cd = core.getCoreDescriptor();
-    if (Slice.CONSTRUCTION.equals(cd.getCloudDescriptor().getShardState())) {
-      // set update log to buffer before publishing the core
-      core.getUpdateHandler().getUpdateLog().bufferUpdates();
-    }
-    // before becoming available, make sure we are not live and active
-    // this also gets us our assigned shard id if it was not specified
-    publish(cd, ZkStateReader.DOWN, false);
-    // shardState and shardRange are for one-time use only, thereafter the actual values in the Slice should be used
-    if (Slice.CONSTRUCTION.equals(cd.getCloudDescriptor().getShardState())) {
-      cd.getCloudDescriptor().setShardState(null);
-      cd.getCloudDescriptor().setShardRange(null);
-    }
-    String coreNodeName = getCoreNodeName(cd);
+  public void preRegister(CoreDescriptor cd ) {
     
+    String coreNodeName = getCoreNodeName(cd);
+
     // make sure the node name is set on the descriptor
     if (cd.getCloudDescriptor().getCoreNodeName() == null) {
       cd.getCloudDescriptor().setCoreNodeName(coreNodeName);
     }
+
+    // before becoming available, make sure we are not live and active
+    // this also gets us our assigned shard id if it was not specified
+    try {
+      publish(cd, ZkStateReader.DOWN, false);
+    } catch (KeeperException e) {
+      log.error("", e);
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      log.error("", e);
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+    }
     
     if (cd.getCloudDescriptor().getShardId() == null && needsToBeAssignedShardId(cd, zkStateReader.getClusterState(), coreNodeName)) {
-      String shardId;
-      shardId = doGetShardIdProcess(cd.getName(), cd);
-      cd.getCloudDescriptor().setShardId(shardId);
+      doGetShardIdAndNodeNameProcess(cd);
     } else {
       // still wait till we see us in local state
-      doGetShardIdProcess(cd.getName(), cd);
+      doGetShardIdAndNodeNameProcess(cd);
     }
 
   }
@@ -1339,7 +1391,7 @@ public final class ZkController {
         }
       }
     }
-    
+
     String leaderBaseUrl = leaderProps.getBaseUrl();
     String leaderCoreName = leaderProps.getCoreName();
     
@@ -1438,27 +1490,22 @@ public final class ZkController {
   /**
    * If in SolrCloud mode, upload config sets for each SolrCore in solr.xml.
    */
-  public static void bootstrapConf(SolrZkClient zkClient, ConfigSolr cfg, String solrHome) throws IOException,
+  public static void bootstrapConf(SolrZkClient zkClient, CoreContainer cc, String solrHome) throws IOException,
       KeeperException, InterruptedException {
 
-    List<String> allCoreNames = cfg.getAllCoreNames();
+    //List<String> allCoreNames = cfg.getAllCoreNames();
+    List<CoreDescriptor> cds = cc.getCoresLocator().discover(cc);
     
-    log.info("bootstraping config for " + allCoreNames.size() + " cores into ZooKeeper using solr.xml from " + solrHome);
+    log.info("bootstrapping config for " + cds.size() + " cores into ZooKeeper using solr.xml from " + solrHome);
 
-    for (String coreName : allCoreNames) {
-      String rawName = PropertiesUtil.substituteProperty(cfg.getProperty(coreName, "name", null), new Properties());
-      String instanceDir = cfg.getProperty(coreName, "instanceDir", null);
-      File idir = new File(instanceDir);
-      System.out.println("idir:" + idir);
-      if (!idir.isAbsolute()) {
-        idir = new File(solrHome, instanceDir);
-      }
-      String confName = PropertiesUtil.substituteProperty(cfg.getProperty(coreName, "collection", null), new Properties());
-      if (confName == null) {
-        confName = rawName;
-      }
-      File udir = new File(idir, "conf");
-      log.info("Uploading directory " + udir + " with name " + confName + " for SolrCore " + rawName);
+    for (CoreDescriptor cd : cds) {
+      String coreName = cd.getName();
+      String confName = cd.getCollectionName();
+      if (StringUtils.isEmpty(confName))
+        confName = coreName;
+      String instanceDir = cd.getInstanceDir();
+      File udir = new File(instanceDir, "conf");
+      log.info("Uploading directory " + udir + " with name " + confName + " for SolrCore " + coreName);
       ZkController.uploadConfigDir(zkClient, udir, confName);
     }
   }
@@ -1501,11 +1548,11 @@ public final class ZkController {
   }
   
   /**
-   * utilitiy method fro trimming and leading and/or trailing slashes from 
+   * Utility method for trimming and leading and/or trailing slashes from 
    * it's input.  May return the empty string.  May return null if and only 
    * if the input is null.
    */
-  private static String trimLeadingAndTrailingSlashes(final String in) {
+  public static String trimLeadingAndTrailingSlashes(final String in) {
     if (null == in) return in;
     
     String out = in;

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java?rev=1512909&r1=1512908&r2=1512909&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java Sun Aug 11 12:19:13 2013
@@ -32,6 +32,7 @@ import java.util.Set;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext.Context;
+import org.apache.lucene.store.NRTCachingDirectory;
 import org.apache.lucene.store.NativeFSLockFactory;
 import org.apache.lucene.store.NoLockFactory;
 import org.apache.lucene.store.RateLimitedDirectoryWrapper;
@@ -40,6 +41,9 @@ import org.apache.lucene.store.SingleIns
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.store.blockcache.BlockDirectory;
+import org.apache.solr.store.hdfs.HdfsDirectory;
+import org.apache.solr.store.hdfs.HdfsLockFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -498,6 +502,24 @@ public abstract class CachingDirectoryFa
     } else if ("single".equals(lockType)) {
       if (!(dir.getLockFactory() instanceof SingleInstanceLockFactory)) dir
           .setLockFactory(new SingleInstanceLockFactory());
+    } else if ("hdfs".equals(lockType)) {
+      Directory del = dir;
+      
+      if (dir instanceof NRTCachingDirectory) {
+        del = ((NRTCachingDirectory) del).getDelegate();
+      }
+      
+      if (del instanceof BlockDirectory) {
+        del = ((BlockDirectory) del).getDirectory();
+      }
+      
+      if (!(del instanceof HdfsDirectory)) {
+        throw new SolrException(ErrorCode.FORBIDDEN, "Directory: "
+            + del.getClass().getName()
+            + ", but hdfs lock factory can only be used with HdfsDirectory");
+      }
+
+      dir.setLockFactory(new HdfsLockFactory(((HdfsDirectory)del).getHdfsDirPath(), ((HdfsDirectory)del).getConfiguration()));
     } else if ("none".equals(lockType)) {
       // Recipe for disaster
       log.error("CONFIGURATION WARNING: locks are disabled on " + dir);
@@ -519,7 +541,7 @@ public abstract class CachingDirectoryFa
     return path;
   }
   
-  private String stripTrailingSlash(String path) {
+  protected String stripTrailingSlash(String path) {
     if (path.endsWith("/")) {
       path = path.substring(0, path.length() - 1);
     }

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/Config.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/Config.java?rev=1512909&r1=1512908&r2=1512909&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/Config.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/Config.java Sun Aug 11 12:19:13 2013
@@ -68,6 +68,7 @@ public class Config {
   static final XPathFactory xpathFactory = XPathFactory.newInstance();
 
   private final Document doc;
+  private final Document origDoc; // with unsubstituted properties
   private final String prefix;
   private final String name;
   private final SolrResourceLoader loader;
@@ -131,6 +132,7 @@ public class Config {
       db.setErrorHandler(xmllog);
       try {
         doc = db.parse(is);
+        origDoc = copyDoc(doc);
       } finally {
         // some XML parsers are broken and don't close the byte stream (but they should according to spec)
         IOUtils.closeQuietly(is.getByteStream());
@@ -140,19 +142,24 @@ public class Config {
       }
     } catch (ParserConfigurationException e)  {
       SolrException.log(log, "Exception during parsing file: " + name, e);
-      throw e;
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
     } catch (SAXException e)  {
       SolrException.log(log, "Exception during parsing file: " + name, e);
-      throw e;
-    } catch( SolrException e ){
-      SolrException.log(log,"Error in "+name,e);
-      throw e;
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    } catch (TransformerException e) {
+      SolrException.log(log, "Exception during parsing file: " + name, e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
     }
   }
   
   public Config(SolrResourceLoader loader, String name, Document doc) {
     this.prefix = null;
     this.doc = doc;
+    try {
+      this.origDoc = copyDoc(doc);
+    } catch (TransformerException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
     this.name = name;
     this.loader = loader;
   }
@@ -217,15 +224,22 @@ public class Config {
     }
   }
 
-  public Node getNode(String path, boolean errIfMissing) {
-   XPath xpath = xpathFactory.newXPath();
-   Node nd = null;
-   String xstr = normalize(path);
+  public Node getNode(String path, boolean errifMissing) {
+    return getNode(path, doc, errifMissing);
+  }
 
-    try {
-      nd = (Node)xpath.evaluate(xstr, doc, XPathConstants.NODE);
+  public Node getUnsubstitutedNode(String path, boolean errIfMissing) {
+    return getNode(path, origDoc, errIfMissing);
+  }
 
-      if (nd==null) {
+  public Node getNode(String path, Document doc, boolean errIfMissing) {
+    XPath xpath = xpathFactory.newXPath();
+    String xstr = normalize(path);
+
+    try {
+      NodeList nodes = (NodeList)xpath.evaluate(xstr, doc, 
+                                                XPathConstants.NODESET);
+      if (nodes==null || 0 == nodes.getLength() ) {
         if (errIfMissing) {
           throw new RuntimeException(name + " missing "+path);
         } else {
@@ -233,7 +247,11 @@ public class Config {
           return null;
         }
       }
-
+      if ( 1 < nodes.getLength() ) {
+        throw new SolrException( SolrException.ErrorCode.SERVER_ERROR,
+                                 name + " contains more than one value for config path: " + path);
+      }
+      Node nd = nodes.item(0);
       log.trace(name + ":" + path + "=" + nd);
       return nd;
 
@@ -441,4 +459,9 @@ public class Config {
     
     return version;
   }
+
+  public Config getOriginalConfig() {
+    return new Config(loader, null, origDoc);
+  }
+
 }

Modified: lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/ConfigSolr.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/ConfigSolr.java?rev=1512909&r1=1512908&r2=1512909&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/ConfigSolr.java (original)
+++ lucene/dev/branches/lucene4956/solr/core/src/java/org/apache/solr/core/ConfigSolr.java Sun Aug 11 12:19:13 2013
@@ -17,31 +17,190 @@ package org.apache.solr.core;
  * limitations under the License.
  */
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import javax.xml.xpath.XPath;
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathExpressionException;
-
+import com.google.common.base.Charsets;
+import com.google.common.io.ByteStreams;
+import org.apache.commons.io.IOUtils;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.logging.LogWatcherConfig;
 import org.apache.solr.util.DOMUtil;
 import org.apache.solr.util.PropertiesUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
 
 
 public abstract class ConfigSolr {
   protected static Logger log = LoggerFactory.getLogger(ConfigSolr.class);
   
   public final static String SOLR_XML_FILE = "solr.xml";
+
+  public static ConfigSolr fromFile(SolrResourceLoader loader, File configFile) {
+    log.info("Loading container configuration from {}", configFile.getAbsolutePath());
+
+    InputStream inputStream = null;
+
+    try {
+      if (!configFile.exists()) {
+        log.info("{} does not exist, using default configuration", configFile.getAbsolutePath());
+        inputStream = new ByteArrayInputStream(ConfigSolrXmlOld.DEF_SOLR_XML.getBytes(Charsets.UTF_8));
+      }
+      else {
+        inputStream = new FileInputStream(configFile);
+      }
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      ByteStreams.copy(inputStream, baos);
+      String originalXml = IOUtils.toString(new ByteArrayInputStream(baos.toByteArray()), "UTF-8");
+      return fromInputStream(loader, new ByteArrayInputStream(baos.toByteArray()), configFile, originalXml);
+    }
+    catch (Exception e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "Could not load SOLR configuration", e);
+    }
+    finally {
+      IOUtils.closeQuietly(inputStream);
+    }
+  }
+
+  public static ConfigSolr fromString(String xml) {
+    return fromInputStream(null, new ByteArrayInputStream(xml.getBytes(Charsets.UTF_8)), null, xml);
+  }
+
+  public static ConfigSolr fromInputStream(SolrResourceLoader loader, InputStream is, File file, String originalXml) {
+    try {
+      Config config = new Config(loader, null, new InputSource(is), null, false);
+      return fromConfig(config, file, originalXml);
+    }
+    catch (Exception e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+  public static ConfigSolr fromSolrHome(SolrResourceLoader loader, String solrHome) {
+    return fromFile(loader, new File(solrHome, SOLR_XML_FILE));
+  }
+
+  public static ConfigSolr fromConfig(Config config, File file, String originalXml) {
+    boolean oldStyle = (config.getNode("solr/cores", false) != null);
+    return oldStyle ? new ConfigSolrXmlOld(config, file, originalXml)
+                    : new ConfigSolrXml(config);
+  }
   
+  public abstract CoresLocator getCoresLocator();
+
+  public PluginInfo getShardHandlerFactoryPluginInfo() {
+    Node node = config.getNode(getShardHandlerFactoryConfigPath(), false);
+    return (node == null) ? null : new PluginInfo(node, "shardHandlerFactory", false, true);
+  }
+
+  protected abstract String getShardHandlerFactoryConfigPath();
+
+  public String getZkHost() {
+    String sysZkHost = System.getProperty("zkHost");
+    if (sysZkHost != null)
+      return sysZkHost;
+    return get(CfgProp.SOLR_ZKHOST, null);
+  }
+
+  public int getZkClientTimeout() {
+    String sysProp = System.getProperty("zkClientTimeout");
+    if (sysProp != null)
+      return Integer.parseInt(sysProp);
+    return getInt(CfgProp.SOLR_ZKCLIENTTIMEOUT, DEFAULT_ZK_CLIENT_TIMEOUT);
+  }
+
+  private static final int DEFAULT_ZK_CLIENT_TIMEOUT = 15000;
+  private static final int DEFAULT_LEADER_VOTE_WAIT = 180000;  // 3 minutes
+  private static final int DEFAULT_CORE_LOAD_THREADS = 3;
+
+  protected static final String DEFAULT_CORE_ADMIN_PATH = "/admin/cores";
+
+  public String getZkHostPort() {
+    return get(CfgProp.SOLR_HOSTPORT, null);
+  }
+
+  public String getZkHostContext() {
+    return get(CfgProp.SOLR_HOSTCONTEXT, null);
+  }
+
+  public String getHost() {
+    return get(CfgProp.SOLR_HOST, null);
+  }
+
+  public int getLeaderVoteWait() {
+    return getInt(CfgProp.SOLR_LEADERVOTEWAIT, DEFAULT_LEADER_VOTE_WAIT);
+  }
+
+  public boolean getGenericCoreNodeNames() {
+    return getBool(CfgProp.SOLR_GENERICCORENODENAMES, false);
+  }
+
+  public int getDistributedConnectionTimeout() {
+    return getInt(CfgProp.SOLR_DISTRIBUPDATECONNTIMEOUT, 0);
+  }
+
+  public int getDistributedSocketTimeout() {
+    return getInt(CfgProp.SOLR_DISTRIBUPDATESOTIMEOUT, 0);
+  }
+
+  public int getCoreLoadThreadCount() {
+    return getInt(ConfigSolr.CfgProp.SOLR_CORELOADTHREADS, DEFAULT_CORE_LOAD_THREADS);
+  }
+
+  public String getSharedLibDirectory() {
+    return get(ConfigSolr.CfgProp.SOLR_SHAREDLIB , null);
+  }
+
+  public String getDefaultCoreName() {
+    return get(CfgProp.SOLR_CORES_DEFAULT_CORE_NAME, null);
+  }
+
+  public abstract boolean isPersistent();
+
+  public String getAdminPath() {
+    return get(CfgProp.SOLR_ADMINPATH, DEFAULT_CORE_ADMIN_PATH);
+  }
+
+  public String getCoreAdminHandlerClass() {
+    return get(CfgProp.SOLR_ADMINHANDLER, "org.apache.solr.handler.admin.CoreAdminHandler");
+  }
+
+  public boolean hasSchemaCache() {
+    return getBool(ConfigSolr.CfgProp.SOLR_SHARESCHEMA, false);
+  }
+
+  public String getManagementPath() {
+    return get(CfgProp.SOLR_MANAGEMENTPATH, null);
+  }
+
+  public LogWatcherConfig getLogWatcherConfig() {
+    return new LogWatcherConfig(
+        getBool(CfgProp.SOLR_LOGGING_ENABLED, false),
+        get(CfgProp.SOLR_LOGGING_CLASS, null),
+        get(CfgProp.SOLR_LOGGING_WATCHER_THRESHOLD, null),
+        getInt(CfgProp.SOLR_LOGGING_WATCHER_SIZE, 50)
+    );
+  }
+
+  public int getTransientCacheSize() {
+    return getInt(CfgProp.SOLR_TRANSIENTCACHESIZE, Integer.MAX_VALUE);
+  }
+
   // Ugly for now, but we'll at least be able to centralize all of the differences between 4x and 5x.
-  public static enum CfgProp {
+  protected static enum CfgProp {
     SOLR_ADMINHANDLER,
     SOLR_CORELOADTHREADS,
     SOLR_COREROOTDIRECTORY,
@@ -57,12 +216,9 @@ public abstract class ConfigSolr {
     SOLR_LOGGING_WATCHER_THRESHOLD,
     SOLR_MANAGEMENTPATH,
     SOLR_SHAREDLIB,
-    SOLR_SHARDHANDLERFACTORY_CLASS,
-    SOLR_SHARDHANDLERFACTORY_CONNTIMEOUT,
-    SOLR_SHARDHANDLERFACTORY_NAME,
-    SOLR_SHARDHANDLERFACTORY_SOCKETTIMEOUT,
     SOLR_SHARESCHEMA,
     SOLR_TRANSIENTCACHESIZE,
+    SOLR_GENERICCORENODENAMES,
     SOLR_ZKCLIENTTIMEOUT,
     SOLR_ZKHOST,
 
@@ -77,6 +233,12 @@ public abstract class ConfigSolr {
 
   public ConfigSolr(Config config) {
     this.config = config;
+
+  }
+
+  // for extension & testing.
+  protected ConfigSolr() {
+
   }
   
   public Config getConfig() {
@@ -101,12 +263,6 @@ public abstract class ConfigSolr {
     return (val == null) ? def : val;
   }
 
-  // For saving the original property, ${} syntax and all.
-  public String getOrigProp(CfgProp prop, String def) {
-    String val = propMap.get(prop);
-    return (val == null) ? def : val;
-  }
-
   public Properties getSolrProperties(String path) {
     try {
       return readProperties(((NodeList) config.evaluate(
@@ -124,20 +280,11 @@ public abstract class ConfigSolr {
     Properties properties = new Properties();
     for (int i = 0; i < props.getLength(); i++) {
       Node prop = props.item(i);
-      properties.setProperty(DOMUtil.getAttr(prop, "name"), DOMUtil.getAttr(prop, "value"));
+      properties.setProperty(DOMUtil.getAttr(prop, "name"),
+          PropertiesUtil.substituteProperty(DOMUtil.getAttr(prop, "value"), null));
     }
     return properties;
   }
 
-  public abstract void substituteProperties();
-
-  public abstract List<String> getAllCoreNames();
-
-  public abstract String getProperty(String coreName, String property, String defaultVal);
-
-  public abstract Properties readCoreProperties(String coreName);
-
-  public abstract Map<String, String> readCoreAttributes(String coreName);
-
 }