You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by rm...@apache.org on 2012/12/10 17:37:54 UTC

svn commit: r1419570 [10/14] - in /lucene/dev/branches/lucene4547: ./ dev-tools/ dev-tools/eclipse/ dev-tools/idea/.idea/libraries/ dev-tools/idea/lucene/analysis/icu/ dev-tools/idea/solr/contrib/dataimporthandler/ dev-tools/maven/ dev-tools/maven/luce...

Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/SolrLogFormatter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/SolrLogFormatter.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/SolrLogFormatter.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/SolrLogFormatter.java Mon Dec 10 16:36:47 2012
@@ -263,7 +263,7 @@ sb.append("(group_name=").append(tg.getN
 
   private Map<String,Object> getCoreProps(ZkController zkController, SolrCore core) {
     final String collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
-    Replica props = zkController.getClusterState().getShardProps(collection,  ZkStateReader.getCoreNodeName(zkController.getNodeName(), core.getName()));
+    Replica props = zkController.getClusterState().getReplica(collection, ZkStateReader.getCoreNodeName(zkController.getNodeName(), core.getName()));
     if(props!=null) {
       return props.getProperties(); 
     }

Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/AssignShard.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/AssignShard.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/AssignShard.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/AssignShard.java Mon Dec 10 16:36:47 2012
@@ -39,7 +39,10 @@ public class AssignShard {
       numShards = 1;
     }
     String returnShardId = null;
-    Map<String, Slice> sliceMap = state.getSlices(collection);
+    Map<String, Slice> sliceMap = state.getSlicesMap(collection);
+
+
+    // TODO: now that we create shards ahead of time, is this code needed?  Esp since hash ranges aren't assigned when creating via this method?
 
     if (sliceMap == null) {
       return "shard1";
@@ -51,6 +54,8 @@ public class AssignShard {
       return "shard" + (shardIdNames.size() + 1);
     }
 
+    // TODO: don't need to sort to find shard with fewest replicas!
+
     // else figure out which shard needs more replicas
     final Map<String, Integer> map = new HashMap<String, Integer>();
     for (String shardId : shardIdNames) {

Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java Mon Dec 10 16:36:47 2012
@@ -21,7 +21,6 @@ package org.apache.solr.cloud;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.TreeMap;
-import java.util.concurrent.CountDownLatch;
 
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.zookeeper.CreateMode;
@@ -40,6 +39,8 @@ public class DistributedQueue {
   private static final Logger LOG = LoggerFactory
       .getLogger(DistributedQueue.class);
   
+  private static long DEFAULT_TIMEOUT = 5*60*1000;
+  
   private final String dir;
   
   private SolrZkClient zookeeper;
@@ -163,20 +164,22 @@ public class DistributedQueue {
   
   private class LatchChildWatcher implements Watcher {
     
-    CountDownLatch latch;
+    Object lock = new Object();
     
-    public LatchChildWatcher() {
-      latch = new CountDownLatch(1);
-    }
+    public LatchChildWatcher() {}
     
     public void process(WatchedEvent event) {
-      LOG.debug("Watcher fired on path: " + event.getPath() + " state: "
+      LOG.info("Watcher fired on path: " + event.getPath() + " state: "
           + event.getState() + " type " + event.getType());
-      latch.countDown();
+      synchronized (lock) {
+        lock.notifyAll();
+      }
     }
     
-    public void await() throws InterruptedException {
-      latch.await();
+    public void await(long timeout) throws InterruptedException {
+      synchronized (lock) {
+        lock.wait(timeout);
+      }
     }
   }
   
@@ -197,7 +200,7 @@ public class DistributedQueue {
         continue;
       }
       if (orderedChildren.size() == 0) {
-        childWatcher.await();
+        childWatcher.await(DEFAULT_TIMEOUT);
         continue;
       }
       
@@ -274,7 +277,7 @@ public class DistributedQueue {
         continue;
       }
       if (orderedChildren.size() == 0) {
-        childWatcher.await();
+        childWatcher.await(DEFAULT_TIMEOUT);
         continue;
       }
       

Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Mon Dec 10 16:36:47 2012
@@ -163,7 +163,7 @@ final class ShardLeaderElectionContext e
       }
       
       // should I be leader?
-      if (weAreReplacement && !shouldIBeLeader(leaderProps, core)) {
+      if (weAreReplacement && !shouldIBeLeader(leaderProps, core, weAreReplacement)) {
         rejoinLeaderElection(leaderSeqPath, core);
         return;
       }
@@ -249,7 +249,7 @@ final class ShardLeaderElectionContext e
         core.getCoreDescriptor().getCloudDescriptor().isLeader = false;
         
         // we could not publish ourselves as leader - rejoin election
-        rejoinLeaderElection(coreName, core);
+        rejoinLeaderElection(leaderSeqPath, core);
       } finally {
         if (core != null) {
           core.close();
@@ -263,7 +263,7 @@ final class ShardLeaderElectionContext e
       ZkNodeProps leaderProps, String collection, String shardId) {
     ClusterState clusterState = zkController.getZkStateReader()
         .getClusterState();
-    Map<String,Slice> slices = clusterState.getSlices(collection);
+    Map<String,Slice> slices = clusterState.getSlicesMap(collection);
     Slice slice = slices.get(shardId);
     Map<String,Replica> replicasMap = slice.getReplicasMap();
     for (Map.Entry<String,Replica> shard : replicasMap.entrySet()) {
@@ -293,7 +293,7 @@ final class ShardLeaderElectionContext e
     final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE;
     
     Slice slices = zkController.getClusterState().getSlice(collection, shardId);
-    
+    int cnt = 0;
     while (true && !isClosed) {
       // wait for everyone to be up
       if (slices != null) {
@@ -310,9 +310,11 @@ final class ShardLeaderElectionContext e
           log.info("Enough replicas found to continue.");
           return;
         } else {
-          log.info("Waiting until we see more replicas up: total="
+          if (cnt % 40 == 0) {
+            log.info("Waiting until we see more replicas up: total="
               + slices.getReplicasMap().size() + " found=" + found
               + " timeoutin=" + (timeoutAt - System.currentTimeMillis()));
+          }
         }
         
         if (System.currentTimeMillis() > timeoutAt) {
@@ -323,6 +325,8 @@ final class ShardLeaderElectionContext e
       
       Thread.sleep(500);
       slices = zkController.getClusterState().getSlice(collection, shardId);
+      // System.out.println("###### waitForReplicasToComeUp  : slices=" + slices + " all=" + zkController.getClusterState().getCollectionStates() );
+      cnt++;
     }
   }
 
@@ -347,7 +351,7 @@ final class ShardLeaderElectionContext e
     leaderElector.joinElection(this, true);
   }
 
-  private boolean shouldIBeLeader(ZkNodeProps leaderProps, SolrCore core) {
+  private boolean shouldIBeLeader(ZkNodeProps leaderProps, SolrCore core, boolean weAreReplacement) {
     log.info("Checking if I should try and be the leader.");
     
     if (isClosed) {
@@ -355,6 +359,12 @@ final class ShardLeaderElectionContext e
       return false;
     }
     
+    if (!weAreReplacement) {
+      // we are the first node starting in the shard - there is a configurable wait
+      // to make sure others participate in sync and leader election, we can be leader
+      return true;
+    }
+    
     if (core.getCoreDescriptor().getCloudDescriptor().getLastPublished()
         .equals(ZkStateReader.ACTIVE)) {
       log.info("My last published State was Active, it's okay to be the leader.");

Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/Overseer.java Mon Dec 10 16:36:47 2012
@@ -17,6 +17,7 @@ package org.apache.solr.cloud;
  * the License.
  */
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -25,7 +26,10 @@ import java.util.Map;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.ClosableThread;
-import org.apache.solr.common.cloud.HashPartitioner;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
@@ -221,14 +225,19 @@ public class Overseer {
           String nodeName = message.getStr(ZkStateReader.NODE_NAME_PROP);
           //get shardId from ClusterState
           sliceName = getAssignedId(state, nodeName, message);
+          if (sliceName != null) {
+            log.info("shard=" + sliceName + " is already registered");
+          }
         }
         if(sliceName == null) {
           //request new shardId 
           if (collectionExists) {
             // use existing numShards
-            numShards = state.getCollectionStates().get(collection).size();
+            numShards = state.getCollectionStates().get(collection).getSlices().size();
+            log.info("Collection already exists with " + ZkStateReader.NUM_SHARDS_PROP + "=" + numShards);
           }
           sliceName = AssignShard.assignShard(collection, state, numShards);
+          log.info("Assigning new node to shard shard=" + sliceName);
         }
 
         Slice slice = state.getSlice(collection, sliceName);
@@ -269,16 +278,23 @@ public class Overseer {
           return newClusterState;
       }
 
+    private  Map<String,Object> defaultCollectionProps() {
+      HashMap<String,Object> props = new HashMap<String, Object>(2);
+      props.put(DocCollection.DOC_ROUTER, DocRouter.DEFAULT_NAME);
+      return props;
+    }
+
       private ClusterState createCollection(ClusterState state, String collectionName, int numShards) {
         log.info("Create collection {} with numShards {}", collectionName, numShards);
-        
-        HashPartitioner hp = new HashPartitioner();
-        List<HashPartitioner.Range> ranges = hp.partitionRange(numShards, hp.fullRange());
+
+        DocRouter router = DocRouter.DEFAULT;
+        List<DocRouter.Range> ranges = router.partitionRange(numShards, router.fullRange());
+
+        Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>();
 
 
-        Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String, Slice>>();
         Map<String, Slice> newSlices = new LinkedHashMap<String,Slice>();
-        newStates.putAll(state.getCollectionStates());
+        newCollections.putAll(state.getCollectionStates());
         for (int i = 0; i < numShards; i++) {
           final String sliceName = "shard" + (i+1);
 
@@ -287,20 +303,26 @@ public class Overseer {
 
           newSlices.put(sliceName, new Slice(sliceName, null, sliceProps));
         }
-        newStates.put(collectionName, newSlices);
-        ClusterState newClusterState = new ClusterState(state.getLiveNodes(), newStates);
+
+        // TODO: fill in with collection properties read from the /collections/<collectionName> node
+        Map<String,Object> collectionProps = defaultCollectionProps();
+
+        DocCollection newCollection = new DocCollection(collectionName, newSlices, collectionProps, router);
+
+        newCollections.put(collectionName, newCollection);
+        ClusterState newClusterState = new ClusterState(state.getLiveNodes(), newCollections);
         return newClusterState;
       }
-      
+
       /*
        * Return an already assigned id or null if not assigned
        */
       private String getAssignedId(final ClusterState state, final String nodeName,
           final ZkNodeProps coreState) {
         final String key = coreState.getStr(ZkStateReader.NODE_NAME_PROP) + "_" +  coreState.getStr(ZkStateReader.CORE_NAME_PROP);
-        Map<String, Slice> slices = state.getSlices(coreState.getStr(ZkStateReader.COLLECTION_PROP));
+        Collection<Slice> slices = state.getSlices(coreState.getStr(ZkStateReader.COLLECTION_PROP));
         if (slices != null) {
-          for (Slice slice : slices.values()) {
+          for (Slice slice : slices) {
             if (slice.getReplicasMap().get(key) != null) {
               return slice.getName();
             }
@@ -309,40 +331,49 @@ public class Overseer {
         return null;
       }
       
-      private ClusterState updateSlice(ClusterState state, String collection, Slice slice) {
+      private ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) {
         // System.out.println("###!!!### OLD CLUSTERSTATE: " + JSONUtil.toJSON(state.getCollectionStates()));
         // System.out.println("Updating slice:" + slice);
 
-        Map<String, Map<String, Slice>> newCollections = new LinkedHashMap<String,Map<String,Slice>>(state.getCollectionStates());  // make a shallow copy
-        Map<String, Slice> slices = newCollections.get(collection);
-        if (slices == null) {
+        Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(state.getCollectionStates());  // make a shallow copy
+        DocCollection coll = newCollections.get(collectionName);
+        Map<String,Slice> slices;
+        Map<String,Object> props;
+        DocRouter router;
+
+        if (coll == null) {
+          //  when updateSlice is called on a collection that doesn't exist, it's currently when a core is publishing itself
+          // without explicitly creating a collection.  In this current case, we assume custom sharding with an "implicit" router.
           slices = new HashMap<String, Slice>(1);
+          props = new HashMap<String,Object>(1);
+          props.put(DocCollection.DOC_ROUTER, ImplicitDocRouter.NAME);
+          router = new ImplicitDocRouter();
         } else {
-          slices = new LinkedHashMap<String, Slice>(slices); // make a shallow copy
-        }
-        slices.put(slice.getName(),  slice);
-        newCollections.put(collection, slices);
+          props = coll.getProperties();
+          router = coll.getRouter();
+          slices = new LinkedHashMap<String, Slice>(coll.getSlicesMap()); // make a shallow copy
+        }
+        slices.put(slice.getName(), slice);
+        DocCollection newCollection = new DocCollection(collectionName, slices, props, router);
+        newCollections.put(collectionName, newCollection);
 
         // System.out.println("###!!!### NEW CLUSTERSTATE: " + JSONUtil.toJSON(newCollections));
 
         return new ClusterState(state.getLiveNodes(), newCollections);
       }
       
-      private ClusterState setShardLeader(ClusterState state, String collection, String sliceName, String leaderUrl) {
-
-        final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>(state.getCollectionStates());
+      private ClusterState setShardLeader(ClusterState state, String collectionName, String sliceName, String leaderUrl) {
 
-        Map<String, Slice> slices = newStates.get(collection);
-
-        if(slices==null) {
-          log.error("Could not mark shard leader for non existing collection:" + collection);
+        final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(state.getCollectionStates());
+        DocCollection coll = newCollections.get(collectionName);
+        if(coll == null) {
+          log.error("Could not mark shard leader for non existing collection:" + collectionName);
           return state;
         }
 
+        Map<String, Slice> slices = coll.getSlicesMap();
         // make a shallow copy and add it to the new collection
         slices = new LinkedHashMap<String,Slice>(slices);
-        newStates.put(collection, slices);
-
 
         Slice slice = slices.get(sliceName);
         if (slice == null) {
@@ -378,7 +409,11 @@ public class Overseer {
           Slice newSlice = new Slice(slice.getName(), newReplicas, slice.getProperties());
           slices.put(newSlice.getName(), newSlice);
         }
-        return new ClusterState(state.getLiveNodes(), newStates);
+
+
+        DocCollection newCollection = new DocCollection(coll.getName(), slices, coll.getProperties(), coll.getRouter());
+        newCollections.put(collectionName, newCollection);
+        return new ClusterState(state.getLiveNodes(), newCollections);
       }
 
       /*
@@ -389,48 +424,57 @@ public class Overseer {
         final String coreNodeName = message.getStr(ZkStateReader.NODE_NAME_PROP) + "_" + message.getStr(ZkStateReader.CORE_NAME_PROP);
         final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
 
-        final LinkedHashMap<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
-        for(String collectionName: clusterState.getCollections()) {
-          if(collection.equals(collectionName)) {
-            Map<String, Slice> slices = clusterState.getSlices(collection);
-            LinkedHashMap<String, Slice> newSlices = new LinkedHashMap<String, Slice>();
-            for(Slice slice: slices.values()) {
-              if(slice.getReplicasMap().containsKey(coreNodeName)) {
-                Map<String, Replica> newReplicas = slice.getReplicasCopy();
-                newReplicas.remove(coreNodeName);
-                Slice newSlice = new Slice(slice.getName(), newReplicas, slice.getProperties());
-                newSlices.put(slice.getName(), newSlice);
-              } else {
-                newSlices.put(slice.getName(), slice);
-              }
-            }
-            int cnt = 0;
-            for (Slice slice : newSlices.values()) {
-              cnt+=slice.getReplicasMap().size();
-            }
-            // TODO: if no nodes are left after this unload
-            // remove from zk - do we have a race where Overseer
-            // see's registered nodes and publishes though?
-            if (cnt > 0) {
-              newStates.put(collectionName, newSlices);
+        final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
+        DocCollection coll = newCollections.get(collection);
+        if (coll == null) {
+          // TODO: log/error that we didn't find it?
+          return clusterState;
+        }
+
+        Map<String, Slice> newSlices = new LinkedHashMap<String, Slice>();
+        for (Slice slice : coll.getSlices()) {
+          Replica replica = slice.getReplica(coreNodeName);
+          if (replica != null) {
+            Map<String, Replica> newReplicas = slice.getReplicasCopy();
+            newReplicas.remove(coreNodeName);
+            // TODO TODO TODO!!! if there are no replicas left for the slice, and the slice has no hash range, remove it
+            // if (newReplicas.size() == 0 && slice.getRange() == null) {
+            // if there are no replicas left for the slice remove it
+            if (newReplicas.size() == 0) {
+              slice = null;
             } else {
-              // TODO: it might be better logically to have this in ZkController
-              // but for tests (it's easier) it seems better for the moment to leave CoreContainer and/or
-              // ZkController out of the Overseer.
-              try {
-                zkClient.clean("/collections/" + collectionName);
-              } catch (InterruptedException e) {
-                SolrException.log(log, "Cleaning up collection in zk was interrupted:" + collectionName, e);
-                Thread.currentThread().interrupt();
-              } catch (KeeperException e) {
-                SolrException.log(log, "Problem cleaning up collection in zk:" + collectionName, e);
-              }
+              slice = new Slice(slice.getName(), newReplicas, slice.getProperties());
             }
-          } else {
-            newStates.put(collectionName, clusterState.getSlices(collectionName));
           }
+
+          if (slice != null) {
+            newSlices.put(slice.getName(), slice);
+          }
+        }
+
+        // if there are no slices left in the collection, remove it?
+        if (newSlices.size() == 0) {
+          newCollections.remove(coll.getName());
+
+          // TODO: it might be better logically to have this in ZkController
+          // but for tests (it's easier) it seems better for the moment to leave CoreContainer and/or
+          // ZkController out of the Overseer.
+          try {
+            zkClient.clean("/collections/" + collection);
+          } catch (InterruptedException e) {
+            SolrException.log(log, "Cleaning up collection in zk was interrupted:" + collection, e);
+            Thread.currentThread().interrupt();
+          } catch (KeeperException e) {
+            SolrException.log(log, "Problem cleaning up collection in zk:" + collection, e);
+          }
+
+
+        } else {
+          DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter());
+          newCollections.put(newCollection.getName(), newCollection);
         }
-        ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newStates);
+
+        ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newCollections);
         return newState;
      }
 

Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Mon Dec 10 16:36:47 2012
@@ -26,6 +26,7 @@ import java.util.Set;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -42,8 +43,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class OverseerCollectionProcessor implements Runnable {
+  
+  public static final String NUM_SLICES = "numShards";
+  
   public static final String REPLICATION_FACTOR = "replicationFactor";
-
+  
+  public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";
+  
   public static final String DELETECOLLECTION = "deletecollection";
 
   public static final String CREATECOLLECTION = "createcollection";
@@ -158,124 +164,178 @@ public class OverseerCollectionProcessor
   }
 
   private boolean createCollection(ClusterState clusterState, ZkNodeProps message) {
-    
-    // look at the replication factor and see if it matches reality
-    // if it does not, find best nodes to create more cores
-    
-    String numReplicasString = message.getStr(REPLICATION_FACTOR);
-    int numReplicas;
-    try {
-      numReplicas = numReplicasString == null ? 0 : Integer.parseInt(numReplicasString);
-    } catch (Exception ex) {
-      SolrException.log(log, "Could not parse " + REPLICATION_FACTOR, ex);
-      return false;
-    }
-    String numShardsString = message.getStr("numShards");
-    int numShards;
-    try {
-      numShards = numShardsString == null ? 0 : Integer.parseInt(numShardsString);
-    } catch (Exception ex) {
-      SolrException.log(log, "Could not parse numShards", ex);
+    String collectionName = message.getStr("name");
+    if (clusterState.getCollections().contains(collectionName)) {
+      SolrException.log(log, "collection already exists: " + collectionName);
       return false;
     }
     
-    String name = message.getStr("name");
-    String configName = message.getStr("collection.configName");
-    
-    // we need to look at every node and see how many cores it serves
-    // add our new cores to existing nodes serving the least number of cores
-    // but (for now) require that each core goes on a distinct node.
-    
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
-    
-    
-    // TODO: add smarter options that look at the current number of cores per node?
-    // for now we just go random
-    Set<String> nodes = clusterState.getLiveNodes();
-    List<String> nodeList = new ArrayList<String>(nodes.size());
-    nodeList.addAll(nodes);
-    Collections.shuffle(nodeList);
-    
-    int numNodes = numShards * (numReplicas + 1);
-    List<String> createOnNodes = nodeList.subList(0, Math.min(nodeList.size(), numNodes));
-    
-    log.info("Create collection " + name + " on " + createOnNodes);
-    
-    for (String replica : createOnNodes) {
-      // TODO: this does not work if original url had _ in it
-      // We should have a master list
-      replica = replica.replaceAll("_", "/");
-      params.set(CoreAdminParams.NAME, name);
-      params.set("collection.configName", configName);
-      params.set("numShards", numShards);
-      ShardRequest sreq = new ShardRequest();
-      params.set("qt", adminPath);
-      sreq.purpose = 1;
-      // TODO: this sucks
-      if (replica.startsWith("http://")) replica = replica.substring(7);
-      sreq.shards = new String[] {replica};
-      sreq.actualShards = sreq.shards;
-      sreq.params = params;
+    try {
+      // look at the replication factor and see if it matches reality
+      // if it does not, find best nodes to create more cores
       
-      shardHandler.submit(sreq, replica, sreq.params);
-    }
-    
-    int failed = 0;
-    ShardResponse srsp;
-    do {
-      srsp = shardHandler.takeCompletedOrError();
-      if (srsp != null) {
-        Throwable e = srsp.getException();
-        if (e != null) {
-          // should we retry?
-          // TODO: we should return errors to the client
-          // TODO: what if one fails and others succeed?
-          failed++;
-          log.error("Error talking to shard: " + srsp.getShard(), e);
+      int numReplica = msgStrToInt(message, REPLICATION_FACTOR, 0);
+      int numSlices = msgStrToInt(message, NUM_SLICES, 0);
+      int maxShardsPerNode = msgStrToInt(message, MAX_SHARDS_PER_NODE, 1);
+      
+      if (numReplica < 0) {
+        SolrException.log(log, REPLICATION_FACTOR + " must be > 0");
+        return false;
+      }
+      
+      if (numSlices < 0) {
+        SolrException.log(log, NUM_SLICES + " must be > 0");
+        return false;
+      }
+      
+      String configName = message.getStr("collection.configName");
+      
+      // we need to look at every node and see how many cores it serves
+      // add our new cores to existing nodes serving the least number of cores
+      // but (for now) require that each core goes on a distinct node.
+      
+      // TODO: add smarter options that look at the current number of cores per
+      // node?
+      // for now we just go random
+      Set<String> nodes = clusterState.getLiveNodes();
+      List<String> nodeList = new ArrayList<String>(nodes.size());
+      nodeList.addAll(nodes);
+      Collections.shuffle(nodeList);
+      
+      if (nodeList.size() <= 0) {
+        log.error("Cannot create collection " + collectionName
+            + ". No live Solr-instaces");
+        return false;
+      }
+      
+      int numShardsPerSlice = numReplica + 1;
+      if (numShardsPerSlice > nodeList.size()) {
+        log.warn("Specified "
+            + REPLICATION_FACTOR
+            + " of "
+            + numReplica
+            + " on collection "
+            + collectionName
+            + " is higher than or equal to the number of Solr instances currently live ("
+            + nodeList.size()
+            + "). Its unusual to run two replica of the same slice on the same Solr-instance.");
+      }
+      
+      int maxShardsAllowedToCreate = maxShardsPerNode * nodeList.size();
+      int requestedShardsToCreate = numSlices * numShardsPerSlice;
+      if (maxShardsAllowedToCreate < requestedShardsToCreate) {
+        log.error("Cannot create collection " + collectionName + ". Value of "
+            + MAX_SHARDS_PER_NODE + " is " + maxShardsPerNode
+            + ", and the number of live nodes is " + nodeList.size()
+            + ". This allows a maximum of " + maxShardsAllowedToCreate
+            + " to be created. Value of " + NUM_SLICES + " is " + numSlices
+            + " and value of " + REPLICATION_FACTOR + " is " + numReplica
+            + ". This requires " + requestedShardsToCreate
+            + " shards to be created (higher than the allowed number)");
+        return false;
+      }
+      
+      for (int i = 1; i <= numSlices; i++) {
+        for (int j = 1; j <= numShardsPerSlice; j++) {
+          String nodeName = nodeList.get(((i - 1) + (j - 1)) % nodeList.size());
+          String sliceName = "shard" + i;
+          String shardName = collectionName + "_" + sliceName + "_replica" + j;
+          log.info("Creating shard " + shardName + " as part of slice "
+              + sliceName + " of collection " + collectionName + " on "
+              + nodeName);
+          
+          // Need to create new params for each request
+          ModifiableSolrParams params = new ModifiableSolrParams();
+          params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
+          
+          params.set(CoreAdminParams.NAME, shardName);
+          params.set("collection.configName", configName);
+          params.set(CoreAdminParams.COLLECTION, collectionName);
+          params.set(CoreAdminParams.SHARD, sliceName);
+          params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
+          
+          ShardRequest sreq = new ShardRequest();
+          params.set("qt", adminPath);
+          sreq.purpose = 1;
+          // TODO: this does not work if original url had _ in it
+          // We should have a master list
+          String replica = nodeName.replaceAll("_", "/");
+          if (replica.startsWith("http://")) replica = replica.substring(7);
+          sreq.shards = new String[] {replica};
+          sreq.actualShards = sreq.shards;
+          sreq.params = params;
+          
+          shardHandler.submit(sreq, replica, sreq.params);
+          
         }
       }
-    } while (srsp != null);
-
-    
-    // if all calls succeeded, return true
-    if (failed > 0) {
+      
+      int failed = 0;
+      ShardResponse srsp;
+      do {
+        srsp = shardHandler.takeCompletedOrError();
+        if (srsp != null) {
+          Throwable e = srsp.getException();
+          if (e != null) {
+            // should we retry?
+            // TODO: we should return errors to the client
+            // TODO: what if one fails and others succeed?
+            failed++;
+            log.error("Error talking to shard: " + srsp.getShard(), e);
+          }
+        }
+      } while (srsp != null);
+      
+      // if all calls succeeded, return true
+      if (failed > 0) {
+        return false;
+      }
+      log.info("Successfully created all shards for collection "
+          + collectionName);
+      return true;
+    } catch (Exception ex) {
+      // Expecting that the necessary logging has already been performed
       return false;
     }
-    return true;
   }
   
   private boolean collectionCmd(ClusterState clusterState, ZkNodeProps message, ModifiableSolrParams params) {
     log.info("Executing Collection Cmd : " + params);
-    String name = message.getStr("name");
+    String collectionName = message.getStr("name");
     
-    Map<String,Slice> slices = clusterState.getCollectionStates().get(name);
+    DocCollection coll = clusterState.getCollection(collectionName);
     
-    if (slices == null) {
-      throw new SolrException(ErrorCode.BAD_REQUEST, "Could not find collection:" + name);
+    if (coll == null) {
+      throw new SolrException(ErrorCode.BAD_REQUEST,
+          "Could not find collection:" + collectionName);
     }
     
-    for (Map.Entry<String,Slice> entry : slices.entrySet()) {
+    for (Map.Entry<String,Slice> entry : coll.getSlicesMap().entrySet()) {
       Slice slice = entry.getValue();
       Map<String,Replica> shards = slice.getReplicasMap();
       Set<Map.Entry<String,Replica>> shardEntries = shards.entrySet();
       for (Map.Entry<String,Replica> shardEntry : shardEntries) {
         final ZkNodeProps node = shardEntry.getValue();
         if (clusterState.liveNodesContain(node.getStr(ZkStateReader.NODE_NAME_PROP))) {
-          params.set(CoreAdminParams.CORE, node.getStr(ZkStateReader.CORE_NAME_PROP));
-
+          // For thread safety, only simple clone the ModifiableSolrParams
+          ModifiableSolrParams cloneParams = new ModifiableSolrParams();
+          cloneParams.add(params);
+          cloneParams.set(CoreAdminParams.CORE,
+              node.getStr(ZkStateReader.CORE_NAME_PROP));
+          
           String replica = node.getStr(ZkStateReader.BASE_URL_PROP);
           ShardRequest sreq = new ShardRequest();
+          
           // yes, they must use same admin handler path everywhere...
-          params.set("qt", adminPath);
-
+          cloneParams.set("qt", adminPath);
           sreq.purpose = 1;
           // TODO: this sucks
           if (replica.startsWith("http://")) replica = replica.substring(7);
           sreq.shards = new String[] {replica};
           sreq.actualShards = sreq.shards;
-          sreq.params = params;
-          log.info("Collection Admin sending CoreAdmin cmd to " + replica);
+          sreq.params = cloneParams;
+          log.info("Collection Admin sending CoreAdmin cmd to " + replica
+              + " params:" + sreq.params);
           shardHandler.submit(sreq, replica, sreq.params);
         }
       }
@@ -304,4 +364,15 @@ public class OverseerCollectionProcessor
     }
     return true;
   }
+  
+  private int msgStrToInt(ZkNodeProps message, String key, Integer def)
+      throws Exception {
+    String str = message.getStr(key);
+    try {
+      return str == null ? def : Integer.parseInt(str);
+    } catch (Exception ex) {
+      SolrException.log(log, "Could not parse " + key, ex);
+      throw ex;
+    }
+  }
 }

Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/cloud/ZkController.java Mon Dec 10 16:36:47 2012
@@ -20,7 +20,9 @@ package org.apache.solr.cloud;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.NetworkInterface;
 import java.util.Collections;
+import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -42,6 +44,7 @@ import org.apache.solr.client.solrj.requ
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.OnReconnect;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
@@ -137,14 +140,6 @@ public final class ZkController {
 
   private int clientTimeout;
 
-
-  public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
-      String localHostContext, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
-      TimeoutException, IOException {
-    this(cc, zkServerAddress, zkClientTimeout, zkClientConnectTimeout, localHost, locaHostPort, localHostContext, null, registerOnReconnect);
-  }
-  
-
   public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
       String localHostContext, String leaderVoteWait, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
       TimeoutException, IOException {
@@ -358,7 +353,29 @@ public final class ZkController {
   private String getHostAddress(String host) throws IOException {
 
     if (host == null) {
-      host = "http://" + InetAddress.getLocalHost().getHostName();
+      String hostaddress = InetAddress.getLocalHost().getHostAddress();
+      // Re-get the IP again for "127.0.0.1", the other case we trust the hosts
+      // file is right.
+      if ("127.0.0.1".equals(hostaddress)) {
+        Enumeration<NetworkInterface> netInterfaces = null;
+        try {
+          netInterfaces = NetworkInterface.getNetworkInterfaces();
+          while (netInterfaces.hasMoreElements()) {
+            NetworkInterface ni = netInterfaces.nextElement();
+            Enumeration<InetAddress> ips = ni.getInetAddresses();
+            while (ips.hasMoreElements()) {
+              InetAddress ip = ips.nextElement();
+              if (ip.isSiteLocalAddress()) {
+                hostaddress = ip.getHostAddress();
+              }
+            }
+          }
+        } catch (Throwable e) {
+          SolrException.log(log,
+              "Error while looking for a better host name than 127.0.0.1", e);
+        }
+      }
+      host = "http://" + hostaddress;
     } else {
       Matcher m = URL_PREFIX.matcher(host);
       if (m.matches()) {
@@ -576,7 +593,10 @@ public final class ZkController {
       throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
     }
     
-    String leaderUrl = getLeader(cloudDesc);
+
+    // in this case, we want to wait for the leader as long as the leader might 
+    // wait for a vote, at least
+    String leaderUrl = getLeader(cloudDesc, Integer.parseInt(leaderVoteWait) + 1000);
     
     String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
     log.info("We are " + ourUrl + " and leader is " + leaderUrl);
@@ -628,7 +648,9 @@ public final class ZkController {
     return shardId;
   }
 
-  private String getLeader(final CloudDescriptor cloudDesc) {
+  // timeoutms is the timeout for the first call to get the leader - there is then
+  // a longer wait to make sure that leader matches our local state
+  private String getLeader(final CloudDescriptor cloudDesc, int timeoutms) {
     
     String collection = cloudDesc.getCollectionName();
     String shardId = cloudDesc.getShardId();
@@ -637,7 +659,7 @@ public final class ZkController {
     // cluster state node that won't be updated for a moment
     String leaderUrl;
     try {
-      leaderUrl = getLeaderProps(collection, cloudDesc.getShardId())
+      leaderUrl = getLeaderProps(collection, cloudDesc.getShardId(), timeoutms)
           .getCoreUrl();
       
       // now wait until our currently cloud state contains the latest leader
@@ -655,7 +677,7 @@ public final class ZkController {
         tries++;
         clusterStateLeader = zkStateReader.getLeaderUrl(collection, shardId,
             30000);
-        leaderUrl = getLeaderProps(collection, cloudDesc.getShardId())
+        leaderUrl = getLeaderProps(collection, cloudDesc.getShardId(), timeoutms)
             .getCoreUrl();
       }
       
@@ -671,8 +693,8 @@ public final class ZkController {
    * Get leader props directly from zk nodes.
    */
   public ZkCoreNodeProps getLeaderProps(final String collection,
-      final String slice) throws InterruptedException {
-    return getLeaderProps(collection, slice, false);
+      final String slice, int timeoutms) throws InterruptedException {
+    return getLeaderProps(collection, slice, timeoutms, false);
   }
   
   /**
@@ -681,8 +703,8 @@ public final class ZkController {
    * @return leader props
    */
   public ZkCoreNodeProps getLeaderProps(final String collection,
-      final String slice, boolean failImmediatelyOnExpiration) throws InterruptedException {
-    int iterCount = 60;
+      final String slice, int timeoutms, boolean failImmediatelyOnExpiration) throws InterruptedException {
+    int iterCount = timeoutms / 1000;
     Exception exp = null;
     while (iterCount-- > 0) {
       try {
@@ -699,10 +721,10 @@ public final class ZkController {
           throw new RuntimeException("Session has expired - could not get leader props", exp);
         }
         exp = e;
-        Thread.sleep(500);
+        Thread.sleep(1000);
       }  catch (Exception e) {
         exp = e;
-        Thread.sleep(500);
+        Thread.sleep(1000);
       }
       if (cc.isShutDown()) {
         throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "CoreContainer is shutdown");
@@ -780,6 +802,7 @@ public final class ZkController {
    * Publish core state to overseer.
    */
   public void publish(final CoreDescriptor cd, final String state, boolean updateLastState) throws KeeperException, InterruptedException {
+    log.info("publishing core={} state={}", cd.getName(), state);
     //System.out.println(Thread.currentThread().getStackTrace()[3]);
     Integer numShards = cd.getCloudDescriptor().getNumShards();
     if (numShards == null) { //XXX sys prop hack
@@ -865,6 +888,10 @@ public final class ZkController {
 
         try {
           Map<String,Object> collectionProps = new HashMap<String,Object>();
+
+          // set defaults
+          collectionProps.put(DocCollection.DOC_ROUTER, "compositeId");
+
           // TODO: if collection.configName isn't set, and there isn't already a conf in zk, just use that?
           String defaultConfigName = System.getProperty(COLLECTION_PARAM_PREFIX+CONFIGNAME_PROP, collection);
 
@@ -879,8 +906,10 @@ public final class ZkController {
             }
 
             // if the config name wasn't passed in, use the default
-            if (!collectionProps.containsKey(CONFIGNAME_PROP))
+            if (!collectionProps.containsKey(CONFIGNAME_PROP)) {
+              // TODO: getting the configName from the collectionPath should fail since we already know it doesn't exist?
               getConfName(collection, collectionPath, collectionProps);
+            }
             
           } else if(System.getProperty("bootstrap_confdir") != null) {
             // if we are bootstrapping a collection, default the config for
@@ -904,7 +933,6 @@ public final class ZkController {
           } else {
             getConfName(collection, collectionPath, collectionProps);
           }
-          
           ZkNodeProps zkProps = new ZkNodeProps(collectionProps);
           zkClient.makePath(collectionPath, ZkStateReader.toJSON(zkProps), CreateMode.PERSISTENT, null, true);
 
@@ -996,7 +1024,7 @@ public final class ZkController {
     }
     
     throw new SolrException(ErrorCode.SERVER_ERROR,
-        "Could not get shard_id for core: " + coreName);
+        "Could not get shard_id for core: " + coreName + " coreNodeName:" + shardZkNodeName);
   }
   
   public static void uploadToZK(SolrZkClient zkClient, File dir, String zkPath) throws IOException, KeeperException, InterruptedException {
@@ -1070,7 +1098,7 @@ public final class ZkController {
     for (int i = 0; i < retries; i++) {
       try {
         // go straight to zk, not the cloud state - we must have current info
-        leaderProps = getLeaderProps(collection, shard);
+        leaderProps = getLeaderProps(collection, shard, 30000);
         break;
       } catch (Exception e) {
         SolrException.log(log, "There was a problem finding the leader in zk", e);

Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java Mon Dec 10 16:36:47 2012
@@ -116,7 +116,7 @@ public abstract class CachingDirectoryFa
           while(val.refCnt != 0) {
             wait(100);
             
-            if (cnt++ >= 300) {
+            if (cnt++ >= 12000) {
               log.error("Timeout waiting for all directory ref counts to be released");
               break;
             }

Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/CoreContainer.java Mon Dec 10 16:36:47 2012
@@ -28,12 +28,23 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import javax.xml.transform.Transformer;
@@ -55,6 +66,7 @@ import org.apache.solr.common.SolrExcept
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.core.SolrXMLSerializer.SolrCoreXMLDef;
 import org.apache.solr.core.SolrXMLSerializer.SolrXMLDef;
 import org.apache.solr.handler.admin.CollectionsHandler;
@@ -66,7 +78,9 @@ import org.apache.solr.logging.LogWatche
 import org.apache.solr.logging.jul.JulWatcher;
 import org.apache.solr.schema.IndexSchema;
 import org.apache.solr.update.SolrCoreState;
+import org.apache.solr.util.AdjustableSemaphore;
 import org.apache.solr.util.DOMUtil;
+import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.util.FileUtils;
 import org.apache.solr.util.SystemIdResolver;
 import org.apache.zookeeper.KeeperException;
@@ -86,6 +100,7 @@ import org.xml.sax.InputSource;
 public class CoreContainer 
 {
   private static final String LEADER_VOTE_WAIT = "180000";  // 3 minutes
+  private static final int CORE_LOAD_THREADS = 3;
   private static final String DEFAULT_HOST_CONTEXT = "solr";
   private static final String DEFAULT_HOST_PORT = "8983";
   private static final int DEFAULT_ZK_CLIENT_TIMEOUT = 15000;
@@ -143,8 +158,9 @@ public class CoreContainer 
   protected LogWatcher logging = null;
   private String zkHost;
   private Map<SolrCore,String> coreToOrigName = new ConcurrentHashMap<SolrCore,String>();
-  private String leaderVoteWait;
+  private String leaderVoteWait = LEADER_VOTE_WAIT;
   protected int swappableCacheSize = Integer.MAX_VALUE; // Use as a flag too, if swappableCacheSize set in solr.xml this will be changed
+  private int coreLoadThreads;
   
   {
     log.info("New CoreContainer " + System.identityHashCode(this));
@@ -382,13 +398,13 @@ public class CoreContainer 
    * @param cfgis the configuration file InputStream
    */
   public void load(String dir, InputSource cfgis)  {
-
+    ThreadPoolExecutor coreLoadExecutor = null;
     if (null == dir) {
       // don't rely on SolrResourceLoader(), determine explicitly first
       dir = SolrResourceLoader.locateSolrHome();
     }
     log.info("Loading CoreContainer using Solr Home: '{}'", dir);
-
+    
     this.loader = new SolrResourceLoader(dir);
     solrHome = loader.getInstanceDir();
     
@@ -401,89 +417,97 @@ public class CoreContainer 
     } catch (Exception e) {
       throw new SolrException(ErrorCode.SERVER_ERROR, "", e);
     }
-    // Since the cores  var is now initialized to null, let's set it up right now.
+    // Since the cores var is now initialized to null, let's set it up right
+    // now.
     cfg.substituteProperties();
-
+    
     allocateLazyCores(cfg);
-
+    
     // Initialize Logging
-    if(cfg.getBool("solr/logging/@enabled",true)) {
+    if (cfg.getBool("solr/logging/@enabled", true)) {
       String slf4jImpl = null;
       String fname = cfg.get("solr/logging/watcher/@class", null);
       try {
-        slf4jImpl = StaticLoggerBinder.getSingleton().getLoggerFactoryClassStr();
-        if(fname==null) {
-          if( slf4jImpl.indexOf("Log4j") > 0) {
-            log.warn("Log watching is not yet implemented for log4j" );
-          }
-          else if( slf4jImpl.indexOf("JDK") > 0) {
+        slf4jImpl = StaticLoggerBinder.getSingleton()
+            .getLoggerFactoryClassStr();
+        if (fname == null) {
+          if (slf4jImpl.indexOf("Log4j") > 0) {
+            log.warn("Log watching is not yet implemented for log4j");
+          } else if (slf4jImpl.indexOf("JDK") > 0) {
             fname = "JUL";
           }
         }
-      }
-      catch(Throwable ex) {
-        log.warn("Unable to read SLF4J version.  LogWatcher will be disabled: "+ex);
+      } catch (Throwable ex) {
+        log.warn("Unable to read SLF4J version.  LogWatcher will be disabled: "
+            + ex);
       }
       
       // Now load the framework
-      if(fname!=null) {
-        if("JUL".equalsIgnoreCase(fname)) {
+      if (fname != null) {
+        if ("JUL".equalsIgnoreCase(fname)) {
           logging = new JulWatcher(slf4jImpl);
-        }
-        else {
+        } else {
           try {
             logging = loader.newInstance(fname, LogWatcher.class);
-          }
-          catch (Throwable e) {
+          } catch (Throwable e) {
             log.warn("Unable to load LogWatcher", e);
           }
         }
         
-        if( logging != null ) {
+        if (logging != null) {
           ListenerConfig v = new ListenerConfig();
-          v.size = cfg.getInt("solr/logging/watcher/@size",50);
-          v.threshold = cfg.get("solr/logging/watcher/@threshold",null);
-          if(v.size>0) {
+          v.size = cfg.getInt("solr/logging/watcher/@size", 50);
+          v.threshold = cfg.get("solr/logging/watcher/@threshold", null);
+          if (v.size > 0) {
             log.info("Registering Log Listener");
             logging.registerListener(v, this);
           }
         }
       }
     }
-
+    
     String dcoreName = cfg.get("solr/cores/@defaultCoreName", null);
-    if(dcoreName != null && !dcoreName.isEmpty()) {
+    if (dcoreName != null && !dcoreName.isEmpty()) {
       defaultCoreName = dcoreName;
     }
     persistent = cfg.getBool("solr/@persistent", false);
     libDir = cfg.get("solr/@sharedLib", null);
-    zkHost = cfg.get("solr/@zkHost" , null);
+    zkHost = cfg.get("solr/@zkHost", null);
+    coreLoadThreads = cfg.getInt("solr/@coreLoadThreads", CORE_LOAD_THREADS);
+    
     adminPath = cfg.get("solr/cores/@adminPath", null);
     shareSchema = cfg.getBool("solr/cores/@shareSchema", DEFAULT_SHARE_SCHEMA);
-    zkClientTimeout = cfg.getInt("solr/cores/@zkClientTimeout", DEFAULT_ZK_CLIENT_TIMEOUT);
-
+    zkClientTimeout = cfg.getInt("solr/cores/@zkClientTimeout",
+        DEFAULT_ZK_CLIENT_TIMEOUT);
+    
     hostPort = cfg.get("solr/cores/@hostPort", DEFAULT_HOST_PORT);
-
+    
     hostContext = cfg.get("solr/cores/@hostContext", DEFAULT_HOST_CONTEXT);
     host = cfg.get("solr/cores/@host", null);
     
     leaderVoteWait = cfg.get("solr/cores/@leaderVoteWait", LEADER_VOTE_WAIT);
-
-    if(shareSchema){
-      indexSchemaCache = new ConcurrentHashMap<String ,IndexSchema>();
+    
+    if (shareSchema) {
+      indexSchemaCache = new ConcurrentHashMap<String,IndexSchema>();
     }
-    adminHandler  = cfg.get("solr/cores/@adminHandler", null );
-    managementPath  = cfg.get("solr/cores/@managementPath", null );
+    adminHandler = cfg.get("solr/cores/@adminHandler", null);
+    managementPath = cfg.get("solr/cores/@managementPath", null);
     
-    zkClientTimeout = Integer.parseInt(System.getProperty("zkClientTimeout", Integer.toString(zkClientTimeout)));
+    zkClientTimeout = Integer.parseInt(System.getProperty("zkClientTimeout",
+        Integer.toString(zkClientTimeout)));
     initZooKeeper(zkHost, zkClientTimeout);
-
+    
+    if (isZooKeeperAware() && coreLoadThreads <= 1) {
+      throw new SolrException(ErrorCode.SERVER_ERROR,
+          "SolrCloud requires a value of at least 2 in solr.xml for coreLoadThreads");
+    }
+    
     if (libDir != null) {
       File f = FileUtils.resolvePath(new File(dir), libDir);
-      log.info( "loading shared library: "+f.getAbsolutePath() );
+      log.info("loading shared library: " + f.getAbsolutePath());
       libLoader = SolrResourceLoader.createClassLoader(f, null);
     }
-
+    
     if (adminPath != null) {
       if (adminHandler == null) {
         coreAdminHandler = new CoreAdminHandler(this);
@@ -491,91 +515,164 @@ public class CoreContainer 
         coreAdminHandler = this.createMultiCoreHandler(adminHandler);
       }
     }
-
+    
     collectionsHandler = new CollectionsHandler(this);
     
     try {
-      containerProperties = readProperties(cfg, ((NodeList) cfg.evaluate(DEFAULT_HOST_CONTEXT, XPathConstants.NODESET)).item(0));
+      containerProperties = readProperties(cfg, ((NodeList) cfg.evaluate(
+          DEFAULT_HOST_CONTEXT, XPathConstants.NODESET)).item(0));
     } catch (Throwable e) {
-      SolrException.log(log,null,e);
+      SolrException.log(log, null, e);
     }
-
-    NodeList nodes = (NodeList)cfg.evaluate("solr/cores/core", XPathConstants.NODESET);
-
-    for (int i=0; i<nodes.getLength(); i++) {
-      Node node = nodes.item(i);
-      SolrCore core = null;
-      try {
-        String rawName = DOMUtil.getAttr(node, CORE_NAME, null);
-        if (null == rawName) {
-          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-                                  "Each core in solr.xml must have a 'name'");
-        }
-        String name = rawName;
-        CoreDescriptor p = new CoreDescriptor(this, name, DOMUtil.getAttr(node, CORE_INSTDIR, null));
-
-        // deal with optional settings
-        String opt = DOMUtil.getAttr(node, CORE_CONFIG, null);
-
-        if (opt != null) {
-          p.setConfigName(opt);
-        }
-        opt = DOMUtil.getAttr(node, CORE_SCHEMA, null);
-        if (opt != null) {
-          p.setSchemaName(opt);
-        }
-
-        if (zkController != null) {
-          opt = DOMUtil.getAttr(node, CORE_SHARD, null);
-          if (opt != null && opt.length() > 0) {
-            p.getCloudDescriptor().setShardId(opt);
+    
+    NodeList nodes = (NodeList) cfg.evaluate("solr/cores/core",
+        XPathConstants.NODESET);
+    
+    // setup executor to load cores in parallel
+    coreLoadExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
+        TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+        new DefaultSolrThreadFactory("coreLoadExecutor"));
+    try {
+      // 4 threads at a time max
+      final AdjustableSemaphore semaphore = new AdjustableSemaphore(
+          coreLoadThreads);
+      
+      CompletionService<SolrCore> completionService = new ExecutorCompletionService<SolrCore>(
+          coreLoadExecutor);
+      Set<Future<SolrCore>> pending = new HashSet<Future<SolrCore>>();
+      
+      for (int i = 0; i < nodes.getLength(); i++) {
+        Node node = nodes.item(i);
+        try {
+          String rawName = DOMUtil.getAttr(node, CORE_NAME, null);
+          if (null == rawName) {
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+                "Each core in solr.xml must have a 'name'");
+          }
+          final String name = rawName;
+          final CoreDescriptor p = new CoreDescriptor(this, name,
+              DOMUtil.getAttr(node, CORE_INSTDIR, null));
+          
+          // deal with optional settings
+          String opt = DOMUtil.getAttr(node, CORE_CONFIG, null);
+          
+          if (opt != null) {
+            p.setConfigName(opt);
           }
-          opt = DOMUtil.getAttr(node, CORE_COLLECTION, null);
+          opt = DOMUtil.getAttr(node, CORE_SCHEMA, null);
           if (opt != null) {
-            p.getCloudDescriptor().setCollectionName(opt);
+            p.setSchemaName(opt);
           }
-          opt = DOMUtil.getAttr(node, CORE_ROLES, null);
-          if(opt != null){
-            p.getCloudDescriptor().setRoles(opt);
+          
+          if (zkController != null) {
+            opt = DOMUtil.getAttr(node, CORE_SHARD, null);
+            if (opt != null && opt.length() > 0) {
+              p.getCloudDescriptor().setShardId(opt);
+            }
+            opt = DOMUtil.getAttr(node, CORE_COLLECTION, null);
+            if (opt != null) {
+              p.getCloudDescriptor().setCollectionName(opt);
+            }
+            opt = DOMUtil.getAttr(node, CORE_ROLES, null);
+            if (opt != null) {
+              p.getCloudDescriptor().setRoles(opt);
+            }
           }
-        }
-        opt = DOMUtil.getAttr(node, CORE_PROPERTIES, null);
-        if (opt != null) {
-          p.setPropertiesName(opt);
-        }
-        opt = DOMUtil.getAttr(node, CORE_DATADIR, null);
-        if (opt != null) {
-          p.setDataDir(opt);
-        }
-
-        p.setCoreProperties(readProperties(cfg, node));
-
-        opt = DOMUtil.getAttr(node, CORE_LOADONSTARTUP, null);
-        if (opt != null) {
-          p.setLoadOnStartup(("true".equalsIgnoreCase(opt) || "on".equalsIgnoreCase(opt)) ? true : false);
-        }
-
-        opt = DOMUtil.getAttr(node, CORE_SWAPPABLE, null);
-        if (opt != null) {
-          p.setSwappable(("true".equalsIgnoreCase(opt) || "on".equalsIgnoreCase(opt)) ? true : false);
-        }
-
-        if (! p.isSwappable() && p.isLoadOnStartup()) { // Just like current case.
-          core = create(p);
-          register(name, core, false);
-          // track original names
-          coreToOrigName.put(core, rawName);
-        } else {
-          // Store it away for later use. includes non-swappable but not loaded at startup cores.
-          dynamicDescriptors.put(rawName, p);
+          opt = DOMUtil.getAttr(node, CORE_PROPERTIES, null);
+          if (opt != null) {
+            p.setPropertiesName(opt);
+          }
+          opt = DOMUtil.getAttr(node, CORE_DATADIR, null);
+          if (opt != null) {
+            p.setDataDir(opt);
+          }
+          
+          p.setCoreProperties(readProperties(cfg, node));
+          
+          opt = DOMUtil.getAttr(node, CORE_LOADONSTARTUP, null);
+          if (opt != null) {
+            p.setLoadOnStartup(("true".equalsIgnoreCase(opt) || "on"
+                .equalsIgnoreCase(opt)) ? true : false);
+          }
+          
+          opt = DOMUtil.getAttr(node, CORE_SWAPPABLE, null);
+          if (opt != null) {
+            p.setSwappable(("true".equalsIgnoreCase(opt) || "on"
+                .equalsIgnoreCase(opt)) ? true : false);
+          }
+          
+          if (!p.isSwappable() && p.isLoadOnStartup()) { // Just like current
+                                                         // case.
+            Callable<SolrCore> task = new Callable<SolrCore>() {
+              public SolrCore call() {
+                SolrCore c = null;
+                try {
+                  c = create(p);
+                  register(name, c, false);
+                } catch (Throwable t) {
+                  SolrException.log(log, null, t);
+                  if (c != null) {
+                    c.close();
+                  }
+                }
+                semaphore.release();
+                
+                return c;
+              }
+            };
+            
+            try {
+              semaphore.acquire();
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              throw new SolrException(ErrorCode.SERVER_ERROR,
+                  "Interrupted while loading SolrCore(s)", e);
+            }
+            
+            try {
+              pending.add(completionService.submit(task));
+            } catch (RejectedExecutionException e) {
+              semaphore.release();
+              throw e;
+            }
+            
+          } else {
+            // Store it away for later use. includes non-swappable but not
+            // loaded at startup cores.
+            dynamicDescriptors.put(rawName, p);
+          }
+        } catch (Throwable ex) {
+          SolrException.log(log, null, ex);
         }
       }
-      catch (Throwable ex) {
-        SolrException.log(log,null,ex);
-        if (core != null) {
-          core.close();
+      
+      while (pending != null && pending.size() > 0) {
+        try {
+          Future<SolrCore> future = completionService.take();
+          if (future == null) return;
+          pending.remove(future);
+          
+          try {
+            SolrCore c = future.get();
+            // track original names
+            if (c != null) {
+              coreToOrigName.put(c, c.getName());
+            }
+          } catch (ExecutionException e) {
+            // shouldn't happen since we catch exceptions ourselves
+            SolrException.log(SolrCore.log,
+                "error sending update request to shard", e);
+          }
+          
+        } catch (InterruptedException e) {
+          throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+              "interrupted waiting for shard update response", e);
         }
       }
+    } finally {
+      if (coreLoadExecutor != null) {
+        ExecutorUtil.shutdownNowAndAwaitTermination(coreLoadExecutor);
+      }
     }
   }
 
@@ -1276,7 +1373,7 @@ public class CoreContainer 
         Integer.toString(DEFAULT_ZK_CLIENT_TIMEOUT));
     addCoresAttrib(coresAttribs, "hostContext", this.hostContext, DEFAULT_HOST_CONTEXT);
     addCoresAttrib(coresAttribs, "leaderVoteWait", this.leaderVoteWait, LEADER_VOTE_WAIT);
-
+    addCoresAttrib(coresAttribs, "coreLoadThreads", Integer.toString(this.coreLoadThreads), Integer.toString(CORE_LOAD_THREADS));
 
     List<SolrCoreXMLDef> solrCoreXMLDefs = new ArrayList<SolrCoreXMLDef>();
     

Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java Mon Dec 10 16:36:47 2012
@@ -158,7 +158,7 @@ public abstract class DirectoryFactory i
   
   public static long sizeOf(Directory directory, String file) throws IOException {
     if (!directory.fileExists(file)) {
-      throw new IllegalArgumentException(file + " does not exist");
+      return 0;
     }
     
     return directory.fileLength(file);

Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/NRTCachingDirectoryFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/NRTCachingDirectoryFactory.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/NRTCachingDirectoryFactory.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/NRTCachingDirectoryFactory.java Mon Dec 10 16:36:47 2012
@@ -23,15 +23,32 @@ import java.io.IOException;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
 import org.apache.lucene.store.NRTCachingDirectory;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
 
 /**
  * Factory to instantiate {@link org.apache.lucene.store.NRTCachingDirectory}
  */
 public class NRTCachingDirectoryFactory extends StandardDirectoryFactory {
+  private double maxMergeSizeMB;
+  private double maxCachedMB;
+
+  @Override
+  public void init(NamedList args) {
+    SolrParams params = SolrParams.toSolrParams(args);
+    maxMergeSizeMB = params.getDouble("maxMergeSizeMB", 4);
+    if (maxMergeSizeMB <= 0){
+      throw new IllegalArgumentException("maxMergeSizeMB must be greater than 0");
+    }
+    maxCachedMB = params.getDouble("maxCachedMB", 48);
+    if (maxCachedMB <= 0){
+      throw new IllegalArgumentException("maxCachedMB must be greater than 0");
+    }
+  }
 
   @Override
   protected Directory create(String path) throws IOException {
-    return new NRTCachingDirectory(FSDirectory.open(new File(path)), 4, 48);
+    return new NRTCachingDirectory(FSDirectory.open(new File(path)), maxMergeSizeMB, maxCachedMB);
   }
 
 }

Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/SolrCore.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/core/SolrCore.java Mon Dec 10 16:36:47 2012
@@ -17,6 +17,40 @@
 
 package org.apache.solr.core;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Writer;
+import java.lang.reflect.Constructor;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.xml.parsers.ParserConfigurationException;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.lucene.codecs.Codec;
 import org.apache.lucene.index.DirectoryReader;
@@ -67,16 +101,17 @@ import org.apache.solr.search.ValueSourc
 import org.apache.solr.update.DefaultSolrCoreState;
 import org.apache.solr.update.DirectUpdateHandler2;
 import org.apache.solr.update.SolrCoreState;
+import org.apache.solr.update.SolrCoreState.IndexWriterCloser;
 import org.apache.solr.update.SolrIndexWriter;
 import org.apache.solr.update.UpdateHandler;
 import org.apache.solr.update.VersionInfo;
-import org.apache.solr.update.SolrCoreState.IndexWriterCloser;
 import org.apache.solr.update.processor.DistributedUpdateProcessorFactory;
 import org.apache.solr.update.processor.LogUpdateProcessorFactory;
 import org.apache.solr.update.processor.RunUpdateProcessorFactory;
 import org.apache.solr.update.processor.UpdateRequestProcessorChain;
 import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
 import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.PropertiesInputStream;
 import org.apache.solr.util.RefCounted;
 import org.apache.solr.util.plugin.NamedListInitializedPlugin;
 import org.apache.solr.util.plugin.PluginInfoInitialized;
@@ -85,41 +120,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xml.sax.SAXException;
 
-import javax.xml.parsers.ParserConfigurationException;
-
-import java.io.EOFException;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Writer;
-import java.lang.reflect.Constructor;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.IdentityHashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.StringTokenizer;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
-
 
 /**
  *
@@ -235,30 +235,11 @@ public final class SolrCore implements S
     Properties p = new Properties();
     Directory dir = null;
     try {
-      dir = getDirectoryFactory().get(getDataDir(), null);
+      dir = getDirectoryFactory().get(getDataDir(), getSolrConfig().indexConfig.lockType);
       if (dir.fileExists("index.properties")){
         final IndexInput input = dir.openInput("index.properties", IOContext.DEFAULT);
   
-        final InputStream is = new InputStream() {
-          
-          @Override
-          public int read() throws IOException {
-            byte next;
-            try {
-              next = input.readByte();
-            } catch (EOFException e) {
-              return -1;
-            }
-            return next;
-          }
-          
-          @Override
-          public void close() throws IOException {
-            super.close();
-            input.close();
-          }
-        };
-        
+        final InputStream is = new PropertiesInputStream(input);
         try {
           p.load(is);
           

Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/MoreLikeThisHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/MoreLikeThisHandler.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/MoreLikeThisHandler.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/MoreLikeThisHandler.java Mon Dec 10 16:36:47 2012
@@ -29,11 +29,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
 
-import org.apache.lucene.document.Document;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.StoredDocument;
 import org.apache.lucene.index.Term;
-import org.apache.lucene.queryparser.classic.ParseException;
 import org.apache.lucene.search.*;
 import org.apache.lucene.queries.mlt.MoreLikeThis;
 import org.apache.solr.common.SolrException;
@@ -108,7 +106,7 @@ public class MoreLikeThisHandler extends
           }
         }
       }
-    } catch (ParseException e) {
+    } catch (SyntaxError e) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
     }
 
@@ -300,8 +298,10 @@ public class MoreLikeThisHandler extends
       mlt.setAnalyzer( searcher.getSchema().getAnalyzer() );
       
       // configurable params
+      
       mlt.setMinTermFreq(       params.getInt(MoreLikeThisParams.MIN_TERM_FREQ,         MoreLikeThis.DEFAULT_MIN_TERM_FREQ));
       mlt.setMinDocFreq(        params.getInt(MoreLikeThisParams.MIN_DOC_FREQ,          MoreLikeThis.DEFAULT_MIN_DOC_FREQ));
+      mlt.setMaxDocFreq(        params.getInt(MoreLikeThisParams.MAX_DOC_FREQ,          MoreLikeThis.DEFAULT_MAX_DOC_FREQ));
       mlt.setMinWordLen(        params.getInt(MoreLikeThisParams.MIN_WORD_LEN,          MoreLikeThis.DEFAULT_MIN_WORD_LENGTH));
       mlt.setMaxWordLen(        params.getInt(MoreLikeThisParams.MAX_WORD_LEN,          MoreLikeThis.DEFAULT_MAX_WORD_LENGTH));
       mlt.setMaxQueryTerms(     params.getInt(MoreLikeThisParams.MAX_QUERY_TERMS,       MoreLikeThis.DEFAULT_MAX_QUERY_TERMS));

Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java Mon Dec 10 16:36:47 2012
@@ -361,7 +361,7 @@ public class ReplicationHandler extends 
       // use a set to workaround possible Lucene bug which returns same file
       // name multiple times
       Collection<String> files = new HashSet<String>(commit.getFileNames());
-      dir = core.getDirectoryFactory().get(core.getNewIndexDir(), null);
+      dir = core.getDirectoryFactory().get(core.getNewIndexDir(), core.getSolrConfig().indexConfig.lockType);
       try {
         
         for (String fileName : files) {
@@ -467,7 +467,7 @@ public class ReplicationHandler extends 
     Directory dir;
     long size = 0;
     try {
-      dir = core.getDirectoryFactory().get(core.getIndexDir(), null);
+      dir = core.getDirectoryFactory().get(core.getNewIndexDir(), core.getSolrConfig().indexConfig.lockType);
       try {
         size = DirectoryFactory.sizeOfDirectory(dir);
       } finally {
@@ -1062,7 +1062,7 @@ public class ReplicationHandler extends 
         while (true) {
           offset = offset == -1 ? 0 : offset;
           int read = (int) Math.min(buf.length, filelen - offset);
-          in.readBytes(buf, offset == -1 ? 0 : (int) offset, read);
+          in.readBytes(buf, 0, read);
           
           fos.writeInt((int) read);
           if (useChecksum) {
@@ -1082,6 +1082,8 @@ public class ReplicationHandler extends 
             fos.close();
             break;
           }
+          offset += read;
+          in.seek(offset);
         }
       } catch (IOException e) {
         LOG.warn("Exception while writing response for params: " + params, e);

Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java Mon Dec 10 16:36:47 2012
@@ -22,7 +22,6 @@ import com.yammer.metrics.core.Counter;
 import com.yammer.metrics.core.Timer;
 import com.yammer.metrics.core.TimerContext;
 import com.yammer.metrics.stats.Snapshot;
-import org.apache.lucene.queryparser.classic.ParseException;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
@@ -32,6 +31,7 @@ import org.apache.solr.core.SolrInfoMBea
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.request.SolrRequestHandler;
 import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.search.SyntaxError;
 import org.apache.solr.util.SolrPluginUtils;
 
 import java.net.URL;
@@ -163,7 +163,7 @@ public abstract class RequestHandlerBase
         }
       } else {
         SolrException.log(SolrCore.log,e);
-        if (e instanceof ParseException) {
+        if (e instanceof SyntaxError) {
           e = new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
         }
       }

Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/SnapPuller.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/SnapPuller.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/SnapPuller.java Mon Dec 10 16:36:47 2012
@@ -36,10 +36,7 @@ import static org.apache.solr.handler.Re
 import static org.apache.solr.handler.ReplicationHandler.OFFSET;
 import static org.apache.solr.handler.ReplicationHandler.SIZE;
 
-import java.io.EOFException;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -99,6 +96,8 @@ import org.apache.solr.search.SolrIndexS
 import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.util.FileUtils;
+import org.apache.solr.util.PropertiesInputStream;
+import org.apache.solr.util.PropertiesOutputStream;
 import org.apache.solr.util.RefCounted;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -111,6 +110,8 @@ import org.slf4j.LoggerFactory;
  * @since solr 1.4
  */
 public class SnapPuller {
+  private static final String INDEX_PEROPERTIES = "index.peroperties";
+
   private static final Logger LOG = LoggerFactory.getLogger(SnapPuller.class.getName());
 
   private final String masterUrl;
@@ -296,6 +297,7 @@ public class SnapPuller {
     successfulInstall = false;
     replicationStartTime = System.currentTimeMillis();
     Directory tmpIndexDir = null;
+    String tmpIndex = null;
     Directory indexDir = null;
     boolean deleteTmpIdxDir = true;
     try {
@@ -368,12 +370,12 @@ public class SnapPuller {
       boolean isFullCopyNeeded = IndexDeletionPolicyWrapper.getCommitTimestamp(commit) >= latestVersion || forceReplication;
       
       String tmpIdxDirName = "index." + new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date());
-      String tmpIndex = createTempindexDir(core, tmpIdxDirName);
+      tmpIndex = createTempindexDir(core, tmpIdxDirName);
 
-      tmpIndexDir = core.getDirectoryFactory().get(tmpIndex, null);
+      tmpIndexDir = core.getDirectoryFactory().get(tmpIndex, core.getSolrConfig().indexConfig.lockType);
       
       // make sure it's the newest known index dir...
-      indexDir = core.getDirectoryFactory().get(core.getNewIndexDir(), null);
+      indexDir = core.getDirectoryFactory().get(core.getNewIndexDir(), core.getSolrConfig().indexConfig.lockType);
       Directory oldDirectory = null;
 
       try {
@@ -466,7 +468,9 @@ public class SnapPuller {
       } finally {
         if (deleteTmpIdxDir) {
           LOG.info("removing temporary index download directory files " + tmpIndexDir);
-          DirectoryFactory.empty(tmpIndexDir);
+          if (tmpIndex != null && core.getDirectoryFactory().exists(tmpIndex)) {
+            DirectoryFactory.empty(tmpIndexDir);
+          }
         } 
       }
     } finally {
@@ -519,9 +523,9 @@ public class SnapPuller {
   /**
    * Helper method to record the last replication's details so that we can show them on the statistics page across
    * restarts.
+   * @throws IOException on IO error
    */
-  private void logReplicationTimeAndConfFiles(Collection<Map<String, Object>> modifiedConfFiles, boolean successfulInstall) {
-    FileOutputStream outFile = null;
+  private void logReplicationTimeAndConfFiles(Collection<Map<String, Object>> modifiedConfFiles, boolean successfulInstall) throws IOException {
     List<String> confFiles = new ArrayList<String>();
     if (modifiedConfFiles != null && !modifiedConfFiles.isEmpty())
       for (Map<String, Object> map1 : modifiedConfFiles)
@@ -530,7 +534,10 @@ public class SnapPuller {
     Properties props = replicationHandler.loadReplicationProperties();
     long replicationTime = System.currentTimeMillis();
     long replicationTimeTaken = (replicationTime - getReplicationStartTime()) / 1000;
+    Directory dir = null;
     try {
+      dir = solrCore.getDirectoryFactory().get(solrCore.getDataDir(), solrCore.getSolrConfig().indexConfig.lockType);
+      
       int indexCount = 1, confFilesCount = 1;
       if (props.containsKey(TIMES_INDEX_REPLICATED)) {
         indexCount = Integer.valueOf(props.getProperty(TIMES_INDEX_REPLICATED)) + 1;
@@ -560,15 +567,21 @@ public class SnapPuller {
         sb = readToStringBuffer(replicationTime, props.getProperty(REPLICATION_FAILED_AT_LIST));
         props.setProperty(REPLICATION_FAILED_AT_LIST, sb.toString());
       }
-      File f = new File(solrCore.getDataDir(), REPLICATION_PROPERTIES);
-      outFile = new FileOutputStream(f);
-      props.store(outFile, "Replication details");
-      outFile.close();
+
+      final IndexOutput out = dir.createOutput(REPLICATION_PROPERTIES, IOContext.DEFAULT);
+      OutputStream outFile = new PropertiesOutputStream(out);
+      try {
+        props.store(outFile, "Replication details");
+        dir.sync(Collections.singleton(REPLICATION_PROPERTIES));
+      } finally {
+        IOUtils.closeQuietly(outFile);
+      }
     } catch (Exception e) {
       LOG.warn("Exception while updating statistics", e);
-    }
-    finally {
-      IOUtils.closeQuietly(outFile);
+    } finally {
+      if (dir != null) {
+        solrCore.getDirectoryFactory().release(dir);
+      }
     }
   }
 
@@ -706,7 +719,7 @@ public class SnapPuller {
     String indexDir = solrCore.getIndexDir();
     
     // it's okay to use null for lock factory since we know this dir will exist
-    Directory dir = solrCore.getDirectoryFactory().get(indexDir, null);
+    Directory dir = solrCore.getDirectoryFactory().get(indexDir, solrCore.getSolrConfig().indexConfig.lockType);
     try {
       for (Map<String,Object> file : filesToDownload) {
         if (!dir.fileExists((String) file.get(NAME)) || downloadCompleteIndex) {
@@ -829,30 +842,11 @@ public class SnapPuller {
     Properties p = new Properties();
     Directory dir = null;
     try {
-      dir = solrCore.getDirectoryFactory().get(solrCore.getDataDir(), null);
+      dir = solrCore.getDirectoryFactory().get(solrCore.getDataDir(), solrCore.getSolrConfig().indexConfig.lockType);
       if (dir.fileExists("index.properties")){
         final IndexInput input = dir.openInput("index.properties", IOContext.DEFAULT);
   
-        final InputStream is = new InputStream() {
-          
-          @Override
-          public int read() throws IOException {
-            byte next;
-            try {
-              next = input.readByte();
-            } catch (EOFException e) {
-              return -1;
-            }
-            return next;
-          }
-          
-          @Override
-          public void close() throws IOException {
-            super.close();
-            input.close();
-          }
-        };
-        
+        final InputStream is = new PropertiesInputStream(input);
         try {
           p.load(is);
         } catch (Exception e) {
@@ -870,27 +864,16 @@ public class SnapPuller {
       p.put("index", tmpIdxDirName);
       OutputStream os = null;
       try {
-        os = new OutputStream() {
-          
-          @Override
-          public void write(int b) throws IOException {
-            out.writeByte((byte) b);
-          }
-          
-          @Override
-          public void close() throws IOException {
-            super.close();
-            out.close();
-          }
-        };
+        os = new PropertiesOutputStream(out);
         p.store(os, "index properties");
+        dir.sync(Collections.singleton(INDEX_PEROPERTIES));
       } catch (Exception e) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
             "Unable to write index.properties", e);
       } finally {
         IOUtils.closeQuietly(os);
       }
-        return true;
+      return true;
 
     } catch (IOException e1) {
       throw new RuntimeException(e1);

Modified: lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/SnapShooter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/SnapShooter.java?rev=1419570&r1=1419569&r2=1419570&view=diff
==============================================================================
--- lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/SnapShooter.java (original)
+++ lucene/dev/branches/lucene4547/solr/core/src/java/org/apache/solr/handler/SnapShooter.java Mon Dec 10 16:36:47 2012
@@ -101,7 +101,7 @@ public class SnapShooter {
       Collection<String> files = indexCommit.getFileNames();
       FileCopier fileCopier = new FileCopier();
       
-      Directory dir = solrCore.getDirectoryFactory().get(solrCore.getIndexDir(), null);
+      Directory dir = solrCore.getDirectoryFactory().get(solrCore.getIndexDir(), solrCore.getSolrConfig().indexConfig.lockType);
       try {
         fileCopier.copyFiles(dir, files, snapShotDir);
       } finally {