You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/01/25 20:49:30 UTC

svn commit: r1235888 [3/12] - in /lucene/dev/trunk: dev-tools/eclipse/ dev-tools/maven/ solr/ solr/cloud-dev/ solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/da...

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java Wed Jan 25 19:49:26 2012
@@ -23,6 +23,8 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.TimeoutException;
 import java.text.SimpleDateFormat;
 
+import org.apache.solr.handler.component.HttpShardHandlerFactory;
+import org.apache.solr.handler.component.ShardHandlerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,10 +34,12 @@ import javax.xml.xpath.XPath;
 import javax.xml.xpath.XPathExpressionException;
 
 import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.CurrentCoreDescriptorProvider;
 import org.apache.solr.cloud.SolrZkServer;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.ZkSolrResourceLoader;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.util.DOMUtil;
@@ -85,6 +89,7 @@ public class CoreContainer 
   private SolrXMLSerializer solrXMLSerializer = new SolrXMLSerializer();
   private ZkController zkController;
   private SolrZkServer zkServer;
+  private ShardHandlerFactory shardHandlerFactory;
 
   private String zkHost;
 
@@ -151,7 +156,7 @@ public class CoreContainer 
       zookeeperHost = zkServer.getClientString();
     }
 
-    int zkClientConnectTimeout = 5000;
+    int zkClientConnectTimeout = 15000;
 
     if (zookeeperHost != null) {
       // we are ZooKeeper enabled
@@ -163,7 +168,17 @@ public class CoreContainer 
         } else {
           log.info("Zookeeper client=" + zookeeperHost);          
         }
-        zkController = new ZkController(zookeeperHost, zkClientTimeout, zkClientConnectTimeout, host, hostPort, hostContext);
+        zkController = new ZkController(this, zookeeperHost, zkClientTimeout, zkClientConnectTimeout, host, hostPort, hostContext, new CurrentCoreDescriptorProvider() {
+          
+          @Override
+          public List<CoreDescriptor> getCurrentDescriptors() {
+            List<CoreDescriptor> descriptors = new ArrayList<CoreDescriptor>(getCoreNames().size());
+            for (SolrCore core : getCores()) {
+              descriptors.add(core.getCoreDescriptor());
+            }
+            return descriptors;
+          }
+        });
         
         String confDir = System.getProperty("bootstrap_confdir");
         if(confDir != null) {
@@ -203,7 +218,7 @@ public class CoreContainer 
 
   // Helper class to initialize the CoreContainer
   public static class Initializer {
-    protected String solrConfigFilename = null;
+    protected String containerConfigFilename = null;  // normally "solr.xml"
     protected String dataDir = null; // override datadir for single core mode
 
     // core container instantiation
@@ -211,9 +226,8 @@ public class CoreContainer 
         ParserConfigurationException, SAXException {
       CoreContainer cores = null;
       String solrHome = SolrResourceLoader.locateSolrHome();
-      // TODO : fix broken logic confusing solr.xml with solrconfig.xml
-      File fconf = new File(solrHome, solrConfigFilename == null ? "solr.xml"
-          : solrConfigFilename);
+      File fconf = new File(solrHome, containerConfigFilename == null ? "solr.xml"
+          : containerConfigFilename);
       log.info("looking for solr.xml: " + fconf.getAbsolutePath());
       cores = new CoreContainer();
       
@@ -225,10 +239,7 @@ public class CoreContainer 
         cores.configFile = fconf;
       }
       
-      solrConfigFilename = cores.getConfigFile().getName();
-      if (cores.cores.isEmpty()){
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "No cores were created, please check the logs for errors");
-      }
+      containerConfigFilename = cores.getConfigFile().getName();
       
       return cores;
     }
@@ -300,10 +311,7 @@ public class CoreContainer 
     shareSchema = cfg.getBool("solr/cores/@shareSchema", false);
     zkClientTimeout = cfg.getInt("solr/cores/@zkClientTimeout", 10000);
 
-    hostPort = System.getProperty("hostPort");
-    if (hostPort == null) {
-      hostPort = cfg.get("solr/cores/@hostPort", "8983");
-    }
+    hostPort = cfg.get("solr/cores/@hostPort", "8983");
 
     hostContext = cfg.get("solr/cores/@hostContext", "solr");
     host = cfg.get("solr/cores/@host", null);
@@ -338,7 +346,7 @@ public class CoreContainer 
     }
 
     NodeList nodes = (NodeList)cfg.evaluate("solr/cores/core", XPathConstants.NODESET);
-    boolean defaultCoreFound = false;
+
     for (int i=0; i<nodes.getLength(); i++) {
       Node node = nodes.item(i);
       try {
@@ -374,6 +382,10 @@ public class CoreContainer 
           if (opt != null) {
             p.getCloudDescriptor().setCollectionName(opt);
           }
+          opt = DOMUtil.getAttr(node, "roles", null);
+          if(opt != null){
+        	  p.getCloudDescriptor().setRoles(opt);
+          }
         }
         opt = DOMUtil.getAttr(node, "properties", null);
         if (opt != null) {
@@ -393,29 +405,6 @@ public class CoreContainer 
         SolrException.log(log,null,ex);
       }
     }
-    
-    if(zkController != null) {
-      try {
-        synchronized (zkController.getZkStateReader().getUpdateLock()) {
-          zkController.getZkStateReader().makeShardZkNodeWatches(false);
-          zkController.getZkStateReader().updateCloudState(true);
-        }
-      } catch (InterruptedException e) {
-        // Restore the interrupted status
-        Thread.currentThread().interrupt();
-        log.error("", e);
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-            "", e);
-      } catch (KeeperException e) {
-        log.error("", e);
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-            "", e);
-      } catch (IOException e) {
-        log.error("", e);
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-            "", e);
-      }
-    }
   }
 
   private Properties readProperties(Config cfg, Node node) throws XPathExpressionException {
@@ -428,7 +417,13 @@ public class CoreContainer 
     }
     return properties;
   }
-  private boolean isShutDown = false;
+  
+  private volatile boolean isShutDown = false;
+  
+  public boolean isShutDown() {
+    return isShutDown;
+  }
+
   /**
    * Stops all cores.
    */
@@ -436,8 +431,14 @@ public class CoreContainer 
     log.info("Shutting down CoreContainer instance="+System.identityHashCode(this));    
     synchronized(cores) {
       try {
-        for(SolrCore core : cores.values()) {
-          core.close();
+        for (SolrCore core : cores.values()) {
+          try {
+            if (!core.isClosed()) {
+              core.close();
+            }
+          } catch (Throwable t) {
+            SolrException.log(log, "Error shutting down core", t);
+          }
         }
         cores.clear();
       } finally {
@@ -447,6 +448,9 @@ public class CoreContainer 
         if (zkServer != null) {
           zkServer.stop();
         }
+        if (shardHandlerFactory != null) {
+          shardHandlerFactory.close();
+        }
         isShutDown = true;
       }
     }
@@ -457,7 +461,6 @@ public class CoreContainer 
     try {
       if(!isShutDown){
         log.error("CoreContainer was not shutdown prior to finalize(), indicates a bug -- POSSIBLE RESOURCE LEAK!!!  instance=" + System.identityHashCode(this));
-        shutdown();
       }
     } finally {
       super.finalize();
@@ -480,6 +483,12 @@ public class CoreContainer 
       throw new RuntimeException( "Invalid core name: "+name );
     }
 
+    if (zkController != null) {
+      // before becoming available, make sure we are not live and active
+      // this also gets us our assigned shard id if it was not specified
+      zkController.publish(core, ZkStateReader.DOWN);
+    }
+    
     SolrCore old = null;
     synchronized (cores) {
       old = cores.put(name, core);
@@ -491,39 +500,44 @@ public class CoreContainer 
       core.getCoreDescriptor().name = name;
     }
 
+    if( old == null || old == core) {
+      log.info( "registering core: "+name );
+      registerInZk(core);
+      return null;
+    }
+    else {
+      log.info( "replacing core: "+name );
+      if (!returnPrevNotClosed) {
+        old.close();
+      }
+      registerInZk(core);
+      return old;
+    }
+  }
+
+
+  private void registerInZk(SolrCore core) {
     if (zkController != null) {
       try {
-        zkController.register(core.getName(), core.getCoreDescriptor().getCloudDescriptor(), true);
+        zkController.register(core.getName(), core.getCoreDescriptor());
       } catch (InterruptedException e) {
         // Restore the interrupted status
         Thread.currentThread().interrupt();
         log.error("", e);
         throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
             e);
-      } catch (KeeperException e) {
+      } catch (Exception e) {
+        // if register fails, this is really bad - close the zkController to
+        // minimize any damage we can cause
+        zkController.publish(core, ZkStateReader.DOWN);
+        zkController.close();
         log.error("", e);
         throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
             e);
-      } catch (IOException e) {
-        log.error("", e);
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", e);
       }
     }
-
-    if( old == null || old == core) {
-      log.info( "registering core: "+name );
-      return null;
-    }
-    else {
-      log.info( "replacing core: "+name );
-      if (!returnPrevNotClosed) {
-        old.close();
-      }
-      return old;
-    }
   }
 
-
   /**
    * Registers a SolrCore descriptor in the registry using the core's name.
    * If returnPrev==false, the old core, if different, is closed.
@@ -562,7 +576,7 @@ public class CoreContainer 
       try {
         String collection = dcore.getCloudDescriptor().getCollectionName();
         zkController.createCollectionZkNode(dcore.getCloudDescriptor());
-        // zkController.createCollectionZkNode(collection);
+
         zkConfigName = zkController.readConfigName(collection);
         if (zkConfigName == null) {
           log.error("Could not find config name for collection:" + collection);
@@ -632,6 +646,12 @@ public class CoreContainer 
     }
 
     SolrCore core = new SolrCore(dcore.getName(), null, config, schema, dcore);
+
+    if (zkController == null && core.getUpdateHandler().getUpdateLog() != null) {
+      // always kick off recovery if we are in standalone mode.
+      core.getUpdateHandler().getUpdateLog().recoverFromLog();
+    }
+
     return core;
   }
     
@@ -948,6 +968,22 @@ public class CoreContainer 
   public ZkController getZkController() {
     return zkController;
   }
+
+  /** The default ShardHandlerFactory used to communicate with other solr instances */
+  public ShardHandlerFactory getShardHandlerFactory() {
+    synchronized (this) {
+      if (shardHandlerFactory == null) {
+        Map m = new HashMap();
+        m.put("class",HttpShardHandlerFactory.class.getName());
+        PluginInfo info = new PluginInfo("shardHandlerFactory", m,null,Collections.<PluginInfo>emptyList());
+
+        HttpShardHandlerFactory fac = new HttpShardHandlerFactory();
+        fac.init(info);
+        shardHandlerFactory = fac;
+      }
+      return shardHandlerFactory;
+    }
+  }
   
   private SolrConfig getSolrConfigFromZk(String zkConfigName, String solrConfigFileName,
       SolrResourceLoader resourceLoader) throws IOException,
@@ -976,7 +1012,7 @@ public class CoreContainer 
   private static final String DEF_SOLR_XML ="<?xml version=\"1.0\" encoding=\"UTF-8\" ?>\n" +
           "<solr persistent=\"false\">\n" +
           "  <cores adminPath=\"/admin/cores\" defaultCoreName=\"" + DEFAULT_DEFAULT_CORE_NAME + "\">\n" +
-          "    <core name=\""+ DEFAULT_DEFAULT_CORE_NAME + "\" instanceDir=\".\" />\n" +
+          "    <core name=\""+ DEFAULT_DEFAULT_CORE_NAME + "\" shard=\"${shard:}\" instanceDir=\".\" />\n" +
           "  </cores>\n" +
           "</solr>";
 }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java Wed Jan 25 19:49:26 2012
@@ -43,11 +43,10 @@ public class CoreDescriptor {
     this.coreContainer = coreContainer;
     this.name = name;
     
-    if(coreContainer.getZkController() != null) {
+    if(coreContainer != null && coreContainer.getZkController() != null) {
       this.cloudDesc = new CloudDescriptor();
       // cloud collection defaults to core name
       cloudDesc.setCollectionName(name.isEmpty() ? coreContainer.getDefaultCoreName() : name);
-      this.cloudDesc.setShardId(coreContainer.getZkController().getNodeName() + "_" + name);
     }
     
     if (name == null) {

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/RequestHandlers.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/RequestHandlers.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/RequestHandlers.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/RequestHandlers.java Wed Jan 25 19:49:26 2012
@@ -26,11 +26,8 @@ import java.util.concurrent.ConcurrentHa
 
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.CommonParams.EchoParamStyle;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
-import org.apache.solr.handler.component.SearchHandler;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.request.SolrRequestHandler;
 import org.apache.solr.response.SolrQueryResponse;
@@ -41,7 +38,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  */
-final class RequestHandlers {
+public final class RequestHandlers {
   public static Logger log = LoggerFactory.getLogger(RequestHandlers.class);
 
   public static final String DEFAULT_HANDLER_NAME="standard";
@@ -208,7 +205,7 @@ final class RequestHandlers {
    * 
    * @since solr 1.2
    */
-  private static final class LazyRequestHandlerWrapper implements SolrRequestHandler, SolrInfoMBean
+  public static final class LazyRequestHandlerWrapper implements SolrRequestHandler, SolrInfoMBean
   {
     private final SolrCore core;
     private String _className;

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java Wed Jan 25 19:49:26 2012
@@ -42,10 +42,7 @@ import org.apache.solr.search.ValueSourc
 import org.apache.solr.update.DirectUpdateHandler2;
 import org.apache.solr.update.SolrIndexWriter;
 import org.apache.solr.update.UpdateHandler;
-import org.apache.solr.update.processor.LogUpdateProcessorFactory;
-import org.apache.solr.update.processor.RunUpdateProcessorFactory;
-import org.apache.solr.update.processor.UpdateRequestProcessorChain;
-import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
+import org.apache.solr.update.processor.*;
 import org.apache.solr.util.RefCounted;
 import org.apache.solr.util.plugin.NamedListInitializedPlugin;
 import org.apache.solr.util.plugin.SolrCoreAware;
@@ -63,6 +60,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.net.URL;
 import java.lang.reflect.Constructor;
+import java.util.concurrent.locks.ReentrantLock;
 
 
 /**
@@ -77,6 +75,8 @@ public final class SolrCore implements S
   private String logid; // used to show what name is set
   private final CoreDescriptor coreDescriptor;
 
+  private boolean isReloaded = false;
+
   private final SolrConfig solrConfig;
   private final SolrResourceLoader resourceLoader;
   private final IndexSchema schema;
@@ -476,20 +476,6 @@ public final class SolrCore implements S
   }
   
   /**
-   * 
-   * @param dataDir
-   * @param schema
-   * @throws SAXException 
-   * @throws IOException 
-   * @throws ParserConfigurationException 
-   * 
-   * @since solr 1.0
-   */
-  public SolrCore(String dataDir, IndexSchema schema) throws ParserConfigurationException, IOException, SAXException {
-    this(null, dataDir, new SolrConfig(), schema, null);
-  }
-  
-  /**
    * Creates a new core and register it in the list of cores.
    * If a core with the same name already exists, it will be stopped and replaced by this one.
    *
@@ -558,7 +544,8 @@ public final class SolrCore implements S
     if (updateHandler == null) {
       initDirectoryFactory();
     } else {
-      directoryFactory = updateHandler.getIndexWriterProvider().getDirectoryFactory();
+      directoryFactory = updateHandler.getSolrCoreState().getDirectoryFactory();
+      this.isReloaded = true;
     }
     
     initIndex();
@@ -658,6 +645,7 @@ public final class SolrCore implements S
       // construct the default chain
       UpdateRequestProcessorFactory[] factories = new UpdateRequestProcessorFactory[]{
               new LogUpdateProcessorFactory(),
+              new DistributedUpdateProcessorFactory(),
               new RunUpdateProcessorFactory()
       };
       def = new UpdateRequestProcessorChain(factories, this);
@@ -762,7 +750,7 @@ public final class SolrCore implements S
 
     try {
       updateHandler.close();
-    } catch (Exception e) {
+    } catch (Throwable e) {
       SolrException.log(log,e);
     }
     
@@ -960,15 +948,21 @@ public final class SolrCore implements S
   // This reference is protected by searcherLock.
   private RefCounted<SolrIndexSearcher> _searcher;
 
-  // All of the open searchers.  Don't access this directly.
+  // All of the normal open searchers.  Don't access this directly.
   // protected by synchronizing on searcherLock.
   private final LinkedList<RefCounted<SolrIndexSearcher>> _searchers = new LinkedList<RefCounted<SolrIndexSearcher>>();
+  private final LinkedList<RefCounted<SolrIndexSearcher>> _realtimeSearchers = new LinkedList<RefCounted<SolrIndexSearcher>>();
 
   final ExecutorService searcherExecutor = Executors.newSingleThreadExecutor();
   private int onDeckSearchers;  // number of searchers preparing
+  // Lock ordering: one can acquire the openSearcherLock and then the searcherLock, but not vice-versa.
   private Object searcherLock = new Object();  // the sync object for the searcher
+  private ReentrantLock openSearcherLock = new ReentrantLock(true);     // used to serialize opens/reopens for absolute ordering
   private final int maxWarmingSearchers;  // max number of on-deck searchers allowed
 
+  private RefCounted<SolrIndexSearcher> realtimeSearcher;
+
+
   /**
   * Return a registered {@link RefCounted}&lt;{@link SolrIndexSearcher}&gt; with
   * the reference count incremented.  It <b>must</b> be decremented when no longer needed.
@@ -987,29 +981,162 @@ public final class SolrCore implements S
   }
 
   /**
-  * Return the newest {@link RefCounted}&lt;{@link SolrIndexSearcher}&gt; with
+  * Return the newest normal {@link RefCounted}&lt;{@link SolrIndexSearcher}&gt; with
   * the reference count incremented.  It <b>must</b> be decremented when no longer needed.
   * If no searcher is currently open, then if openNew==true a new searcher will be opened,
   * or null is returned if openNew==false.
   */
   public RefCounted<SolrIndexSearcher> getNewestSearcher(boolean openNew) {
     synchronized (searcherLock) {
-      if (_searchers.isEmpty()) {
-        if (!openNew) return null;
-        // Not currently implemented since simply calling getSearcher during inform()
-        // can result in a deadlock.  Right now, solr always opens a searcher first
-        // before calling inform() anyway, so this should never happen.
-        throw new UnsupportedOperationException();
-      }
-      RefCounted<SolrIndexSearcher> newest = _searchers.getLast();
-      newest.incref();
-      return newest;
+      if (!_searchers.isEmpty()) {
+        RefCounted<SolrIndexSearcher> newest = _searchers.getLast();
+        newest.incref();
+        return newest;
+      }
+    }
+
+    return openNew ? getRealtimeSearcher() : null;
+  }
+
+
+  /** Gets the latest real-time searcher w/o forcing open a new searcher if one already exists.
+   * The reference count will be incremented.
+   */
+  public RefCounted<SolrIndexSearcher> getRealtimeSearcher() {
+    synchronized (searcherLock) {
+      if (realtimeSearcher != null) {
+        realtimeSearcher.incref();
+        return realtimeSearcher;
+      }
+    }
+
+    // use the searcher lock to prevent multiple people from trying to open at once
+    openSearcherLock.lock();
+    try {
+
+      // try again
+      synchronized (searcherLock) {
+        if (realtimeSearcher != null) {
+          realtimeSearcher.incref();
+          return realtimeSearcher;
+        }
+      }
+
+      // force a new searcher open
+      return openNewSearcher(true, true);
+    } finally {
+      openSearcherLock.unlock();
     }
   }
 
+
   public RefCounted<SolrIndexSearcher> getSearcher(boolean forceNew, boolean returnSearcher, final Future[] waitSearcher) throws IOException {
     return getSearcher(forceNew, returnSearcher, waitSearcher, false);
   }
+
+
+  /** Opens a new searcher and returns a RefCounted<SolrIndexSearcher> with it's reference incremented.
+   *
+   * "realtime" means that we need to open quickly for a realtime view of the index, hence don't do any
+   * autowarming and add to the _realtimeSearchers queue rather than the _searchers queue (so it won't
+   * be used for autowarming by a future normal searcher).  A "realtime" searcher will currently never
+   * become "registered" (since it currently lacks caching).
+   *
+   * realtimeSearcher is updated to the latest opened searcher, regardless of the value of "realtime".
+   *
+   * This method aquires openSearcherLock - do not call with searckLock held!
+   */
+  public RefCounted<SolrIndexSearcher> openNewSearcher(boolean updateHandlerReopens, boolean realtime) {
+    SolrIndexSearcher tmp;
+    RefCounted<SolrIndexSearcher> newestSearcher = null;
+    boolean nrt = solrConfig.reopenReaders && updateHandlerReopens;
+
+    openSearcherLock.lock();
+    try {
+      String newIndexDir = null;
+      File indexDirFile = null;
+      File newIndexDirFile = null;
+
+      // if it's not a normal near-realtime update, check that paths haven't changed.
+      if (!nrt) {
+        indexDirFile = new File(getIndexDir()).getCanonicalFile();
+        newIndexDir = getNewIndexDir();
+        newIndexDirFile = new File(newIndexDir).getCanonicalFile();
+      }
+
+      synchronized (searcherLock) {
+        newestSearcher = realtimeSearcher;
+        if (newestSearcher != null) {
+          newestSearcher.incref();      // the matching decref is in the finally block
+        }
+      }
+
+      if (newestSearcher != null && solrConfig.reopenReaders
+          && (nrt || indexDirFile.equals(newIndexDirFile))) {
+
+        IndexReader newReader;
+        IndexReader currentReader = newestSearcher.get().getIndexReader();
+
+        if (updateHandlerReopens) {
+          // SolrCore.verbose("start reopen from",previousSearcher,"writer=",writer);
+          IndexWriter writer = getUpdateHandler().getSolrCoreState().getIndexWriter(this);
+          newReader = IndexReader.openIfChanged(currentReader, writer, true);
+
+        } else {
+          // verbose("start reopen without writer, reader=", currentReader);
+          newReader = IndexReader.openIfChanged(currentReader);
+          // verbose("reopen result", newReader);
+        }
+
+        if (newReader == null) {
+          // if this is a request for a realtime searcher, just return the same searcher if there haven't been any changes.
+          if (realtime) {
+            newestSearcher.incref();
+            return newestSearcher;
+          }
+
+          currentReader.incRef();
+          newReader = currentReader;
+        }
+
+       // for now, turn off caches if this is for a realtime reader (caches take a little while to instantiate)
+        tmp = new SolrIndexSearcher(this, schema, (realtime ? "realtime":"main"), newReader, true, !realtime, true, directoryFactory);
+
+      } else {
+        // verbose("non-reopen START:");
+        tmp = new SolrIndexSearcher(this, newIndexDir, schema, getSolrConfig().mainIndexConfig, "main", true, directoryFactory);
+        // verbose("non-reopen DONE: searcher=",tmp);
+      }
+
+      List<RefCounted<SolrIndexSearcher>> searcherList = realtime ? _realtimeSearchers : _searchers;
+      RefCounted<SolrIndexSearcher> newSearcher = newHolder(tmp, searcherList);    // refcount now at 1
+
+      // Increment reference again for "realtimeSearcher" variable.  It should be at 2 after.
+      // When it's decremented by both the caller of this method, and by realtimeSearcher being replaced,
+      // it will be closed.
+      newSearcher.incref();
+
+      synchronized (searcherLock) {
+        if (realtimeSearcher != null) {
+          realtimeSearcher.decref();
+        }
+        realtimeSearcher = newSearcher;
+        searcherList.add(realtimeSearcher);
+      }
+
+      return newSearcher;
+
+    } catch (Exception e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error opening new searcher", e);
+    }
+    finally {
+      openSearcherLock.unlock();
+      if (newestSearcher != null) {
+        newestSearcher.decref();
+      }
+    }
+
+  }
   
   /**
    * Get a {@link SolrIndexSearcher} or start the process of creating a new one.
@@ -1105,80 +1232,28 @@ public final class SolrCore implements S
       }
     }
 
-    // open the index synchronously
-    // if this fails, we need to decrement onDeckSearchers again.
-    SolrIndexSearcher tmp;
-    RefCounted<SolrIndexSearcher> newestSearcher = null;
+    // a signal to decrement onDeckSearchers if something goes wrong.
+    final boolean[] decrementOnDeckCount=new boolean[]{true};
+    RefCounted<SolrIndexSearcher> currSearcherHolder = null;     // searcher we are autowarming from
+    RefCounted<SolrIndexSearcher> searchHolder = null;
+    boolean success = false;
 
+    openSearcherLock.lock();
     try {
-      newestSearcher = getNewestSearcher(false);
-      String newIndexDir = getNewIndexDir();
-      File indexDirFile = new File(getIndexDir()).getCanonicalFile();
-      File newIndexDirFile = new File(newIndexDir).getCanonicalFile();
-      
-      if (newestSearcher != null && solrConfig.reopenReaders
-          && indexDirFile.equals(newIndexDirFile)) {
-
-        if (updateHandlerReopens) {
-          
-          tmp = getUpdateHandler().reopenSearcher(newestSearcher.get());
-        } else {
-          
-          IndexReader currentReader = newestSearcher.get().getIndexReader();
-          IndexReader newReader;
-          
-          // verbose("start reopen without writer, reader=", currentReader);
-          newReader = IndexReader.openIfChanged(currentReader);
-          // verbose("reopen result", newReader);
-
-
-          if (newReader == null) {
-            currentReader.incRef();
-            newReader = currentReader;
-          }
-          
-          tmp = new SolrIndexSearcher(this, schema, "main", newReader, true, true, true, directoryFactory);
-        }
-
-
-      } else {
-        // verbose("non-reopen START:");
-        tmp = new SolrIndexSearcher(this, newIndexDir, schema, getSolrConfig().mainIndexConfig, "main", true, directoryFactory);
-        // verbose("non-reopen DONE: searcher=",tmp);
-      }
-    } catch (Throwable th) {
-      synchronized(searcherLock) {
-        onDeckSearchers--;
-        // notify another waiter to continue... it may succeed
-        // and wake any others.
-        searcherLock.notify();
-      }
-      // need to close the searcher here??? we shouldn't have to.
-      throw new RuntimeException(th);
-    } finally {
-      if (newestSearcher != null) {
-        newestSearcher.decref();
+      searchHolder = openNewSearcher(updateHandlerReopens, false);
+       // the searchHolder will be incremented once already (and it will eventually be assigned to _searcher when registered)
+       // increment it again if we are going to return it to the caller.
+      if (returnSearcher) {
+        searchHolder.incref();
       }
-    }
-    
-    final SolrIndexSearcher newSearcher=tmp;
 
-    RefCounted<SolrIndexSearcher> currSearcherHolder=null;
-    final RefCounted<SolrIndexSearcher> newSearchHolder=newHolder(newSearcher);
 
-    if (returnSearcher) newSearchHolder.incref();
+      final RefCounted<SolrIndexSearcher> newSearchHolder = searchHolder;
+      final SolrIndexSearcher newSearcher = newSearchHolder.get();
 
-    // a signal to decrement onDeckSearchers if something goes wrong.
-    final boolean[] decrementOnDeckCount=new boolean[1];
-    decrementOnDeckCount[0]=true;
-
-    try {
 
       boolean alreadyRegistered = false;
       synchronized (searcherLock) {
-        _searchers.add(newSearchHolder);
-        // verbose("added searcher ",newSearchHolder.get()," to _searchers");
-
         if (_searcher == null) {
           // if there isn't a current searcher then we may
           // want to register this one before warming is complete instead of waiting.
@@ -1197,174 +1272,133 @@ public final class SolrCore implements S
 
       final SolrIndexSearcher currSearcher = currSearcherHolder==null ? null : currSearcherHolder.get();
 
-      //
-      // Note! if we registered the new searcher (but didn't increment it's
-      // reference count because returnSearcher==false, it's possible for
-      // someone else to register another searcher, and thus cause newSearcher
-      // to close while we are warming.
-      //
-      // Should we protect against that by incrementing the reference count?
-      // Maybe we should just let it fail?   After all, if returnSearcher==false
-      // and newSearcher has been de-registered, what's the point of continuing?
-      //
-
       Future future=null;
 
       // warm the new searcher based on the current searcher.
       // should this go before the other event handlers or after?
       if (currSearcher != null) {
-        try {
-          future = searcherExecutor.submit(
-                  new Callable() {
-                    public Object call() throws Exception {
-                      try {
-                        newSearcher.warm(currSearcher);
-                      } catch (Throwable e) {
-                        SolrException.log(log, null, e);
-                      }
-                      return null;
-                    }
-                  }
-          );
-        } catch(Exception e) {
-          // if submit fails, newSearchHolder does not get decref'd
-          if (newSearchHolder != null) {
-            newSearchHolder.decref();
-            if (returnSearcher) {
-              newSearchHolder.decref();
+        future = searcherExecutor.submit(
+            new Callable() {
+              public Object call() throws Exception {
+                try {
+                  newSearcher.warm(currSearcher);
+                } catch (Throwable e) {
+                  SolrException.log(log,e);
+                }
+                return null;
+              }
             }
-          }
-          throw e;
-        }
+        );
       }
-      
+
       if (currSearcher==null && firstSearcherListeners.size() > 0) {
-        try {
-          future = searcherExecutor.submit(
-                  new Callable() {
-                    public Object call() throws Exception {
-                      try {
-                        for (SolrEventListener listener : firstSearcherListeners) {
-                          listener.newSearcher(newSearcher,null);
-                        }
-                      } catch (Throwable e) {
-                        SolrException.log(log, null, e);
-                      }
-                      return null;
-                    }
+        future = searcherExecutor.submit(
+            new Callable() {
+              public Object call() throws Exception {
+                try {
+                  for (SolrEventListener listener : firstSearcherListeners) {
+                    listener.newSearcher(newSearcher,null);
                   }
-          );
-        } catch(Exception e) {
-          // if submit fails, newSearchHolder does not get decref'd
-          if (newSearchHolder != null) {
-            newSearchHolder.decref();
-            if (returnSearcher) {
-              newSearchHolder.decref();
+                } catch (Throwable e) {
+                  SolrException.log(log,null,e);
+                }
+                return null;
+              }
             }
-          }
-          throw e;
-        }
+        );
       }
 
       if (currSearcher!=null && newSearcherListeners.size() > 0) {
-        try {
-          future = searcherExecutor.submit(
-                  new Callable() {
-                    public Object call() throws Exception {
-                      try {
-                        for (SolrEventListener listener : newSearcherListeners) {
-                          listener.newSearcher(newSearcher, currSearcher);
-                        }
-                      } catch (Throwable e) {
-                        SolrException.log(log, null, e);
-                      }
-                      return null;
-                    }
+        future = searcherExecutor.submit(
+            new Callable() {
+              public Object call() throws Exception {
+                try {
+                  for (SolrEventListener listener : newSearcherListeners) {
+                    listener.newSearcher(newSearcher, currSearcher);
                   }
-          );
-      } catch(Exception e) {
-        // if submit fails, newSearchHolder does not get decref'd
-        if (newSearchHolder != null) {
-          newSearchHolder.decref();
-          if (returnSearcher) {
-            newSearchHolder.decref();
-          }
-        }
-        throw e;
-      }
+                } catch (Throwable e) {
+                  SolrException.log(log,null,e);
+                }
+                return null;
+              }
+            }
+        );
       }
 
       // WARNING: this code assumes a single threaded executor (that all tasks
       // queued will finish first).
       final RefCounted<SolrIndexSearcher> currSearcherHolderF = currSearcherHolder;
       if (!alreadyRegistered) {
-        try {
-          future = searcherExecutor.submit(
-                  new Callable() {
-                    public Object call() throws Exception {
-                      try {
-                        // signal that we no longer need to decrement
-                        // the count *before* registering the searcher since
-                        // registerSearcher will decrement even if it errors.
-                        decrementOnDeckCount[0]=false;
-                        registerSearcher(newSearchHolder);
-                      } catch (Throwable e) {
-                        SolrException.log(log, null, e);
-                      } finally {
-                        // we are all done with the old searcher we used
-                        // for warming...
-                        if (currSearcherHolderF!=null) currSearcherHolderF.decref();
-                      }
-                      return null;
-                    }
-                  }
-          );
-        } catch(Exception e) {
-          // if submit fails, newSearchHolder does not get decref'd
-          if (newSearchHolder != null) {
-            newSearchHolder.decref();
-            if (returnSearcher) {
-              newSearchHolder.decref();
+        future = searcherExecutor.submit(
+            new Callable() {
+              public Object call() throws Exception {
+                try {
+                  // registerSearcher will decrement onDeckSearchers and
+                  // do a notify, even if it fails.
+                  registerSearcher(newSearchHolder);
+                } catch (Throwable e) {
+                  SolrException.log(log, e);
+                } finally {
+                  // we are all done with the old searcher we used
+                  // for warming...
+                  if (currSearcherHolderF!=null) currSearcherHolderF.decref();
+                }
+                return null;
+              }
             }
-          }
-          throw e;
-        }
+        );
       }
 
       if (waitSearcher != null) {
         waitSearcher[0] = future;
       }
 
+      success = true;
+
       // Return the searcher as the warming tasks run in parallel
       // callers may wait on the waitSearcher future returned.
       return returnSearcher ? newSearchHolder : null;
 
     } catch (Exception e) {
-      SolrException.log(log, null, e);
-      if (currSearcherHolder != null) currSearcherHolder.decref();
+      if (e instanceof SolrException) throw (SolrException)e;
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    } finally {
 
-      synchronized (searcherLock) {
-        if (decrementOnDeckCount[0]) {
+      if (!success) {
+        synchronized (searcherLock) {
           onDeckSearchers--;
+
+          if (onDeckSearchers < 0) {
+            // sanity check... should never happen
+            log.error(logid+"ERROR!!! onDeckSearchers after decrement=" + onDeckSearchers);
+            onDeckSearchers=0; // try and recover
+          }
+          // if we failed, we need to wake up at least one waiter to continue the process
+          searcherLock.notify();
         }
-        if (onDeckSearchers < 0) {
-          // sanity check... should never happen
-          log.error(logid+"ERROR!!! onDeckSearchers after decrement=" + onDeckSearchers);
-          onDeckSearchers=0; // try and recover
+
+        if (currSearcherHolder != null) {
+          currSearcherHolder.decref();
+        }
+
+        if (searchHolder != null) {
+          searchHolder.decref();      // decrement 1 for _searcher (searchHolder will never become _searcher now)
+          if (returnSearcher) {
+            searchHolder.decref();    // decrement 1 because we won't be returning the searcher to the user
+          }
         }
-        // if we failed, we need to wake up at least one waiter to continue the process
-        searcherLock.notify();
       }
 
-      // since the indexreader was already opened, assume we can continue on
-      // even though we got an exception.
-      return returnSearcher ? newSearchHolder : null;
+      // we want to do this after we decrement onDeckSearchers so another thread
+      // doesn't increment first and throw a false warning.
+      openSearcherLock.unlock();
+
     }
 
   }
 
 
-  private RefCounted<SolrIndexSearcher> newHolder(SolrIndexSearcher newSearcher) {
+  private RefCounted<SolrIndexSearcher> newHolder(SolrIndexSearcher newSearcher, final List<RefCounted<SolrIndexSearcher>> searcherList) {
     RefCounted<SolrIndexSearcher> holder = new RefCounted<SolrIndexSearcher>(newSearcher) {
       @Override
       public void close() {
@@ -1376,11 +1410,13 @@ public final class SolrCore implements S
             // This relies on the RefCounted class allowing close() to be called every
             // time the counter hits zero.
             if (refcount.get() > 0) return;
-            _searchers.remove(this);
+            searcherList.remove(this);
           }
           resource.close();
-        } catch (IOException e) {
-          log.error("Error closing searcher:" + SolrException.toStr(e));
+        } catch (Throwable e) {
+          // do not allow decref() operations to fail since they are typically called in finally blocks
+          // and throwing another exception would be very unexpected.
+          SolrException.log(log, "Error closing searcher:", e);
         }
       }
     };
@@ -1388,6 +1424,9 @@ public final class SolrCore implements S
     return holder;
   }
 
+  public boolean isReloaded() {
+    return isReloaded;
+  }
 
   // Take control of newSearcherHolder (which should have a reference count of at
   // least 1 already.  If the caller wishes to use the newSearcherHolder directly
@@ -1423,6 +1462,7 @@ public final class SolrCore implements S
         log.info(logid+"Registered new searcher " + newSearcher);
 
       } catch (Throwable e) {
+        // an exception in register() shouldn't be fatal.
         log(e);
       } finally {
         // wake up anyone waiting for a searcher
@@ -1438,9 +1478,13 @@ public final class SolrCore implements S
   public void closeSearcher() {
     log.info(logid+"Closing main searcher on request.");
     synchronized (searcherLock) {
+      if (realtimeSearcher != null) {
+        realtimeSearcher.decref();
+        realtimeSearcher = null;
+      }
       if (_searcher != null) {
         _searcher.decref();   // dec refcount for this._searcher
-        _searcher=null; // isClosed() does check this
+        _searcher = null; // isClosed() does check this
         infoRegistry.remove("currentSearcher");
       }
     }
@@ -1470,7 +1514,7 @@ public final class SolrCore implements S
     
     handler.handleRequest(req,rsp);
     setResponseHeaderValues(handler,req,rsp);
-    
+
     if (log.isInfoEnabled()) {
       StringBuilder sb = new StringBuilder(logid);
       for (int i=0; i<toLog.size(); i++) {
@@ -1525,7 +1569,7 @@ public final class SolrCore implements S
 
 
   final public static void log(Throwable e) {
-    SolrException.log(log, null, e);
+    SolrException.log(log,null,e);
   }
 
   
@@ -1579,8 +1623,10 @@ public final class SolrCore implements S
         }
         log.info("created "+info.name+": " + info.className);
       } catch (Exception ex) {
-          throw new SolrException
+          SolrException e = new SolrException
             (SolrException.ErrorCode.SERVER_ERROR, "QueryResponseWriter init failure", ex);
+          SolrException.log(log,null,e);
+          throw e;
       }
     }
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/BinaryUpdateRequestHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/BinaryUpdateRequestHandler.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/BinaryUpdateRequestHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/BinaryUpdateRequestHandler.java Wed Jan 25 19:49:26 2012
@@ -140,6 +140,7 @@ public class BinaryUpdateRequestHandler 
       }
     }
   }
+
   @Override
   public String getDescription() {
     return "Add/Update multiple documents with javabin format";

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/JsonLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/JsonLoader.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/JsonLoader.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/JsonLoader.java Wed Jan 25 19:49:26 2012
@@ -163,10 +163,10 @@ class JsonLoader extends ContentStreamLo
         String key = parser.getString();
         if( parser.wasKey() ) {
           if( "id".equals( key ) ) {
-            cmd.id = parser.getString();
+            cmd.setId(parser.getString());
           }
           else if( "query".equals(key) ) {
-            cmd.query = parser.getString();
+            cmd.setQuery(parser.getString());
           }
           else if( "commitWithin".equals(key) ) { 
             cmd.commitWithin = Integer.parseInt(parser.getString());
@@ -181,7 +181,7 @@ class JsonLoader extends ContentStreamLo
         }
       }
       else if( ev == JSONParser.OBJECT_END ) {
-        if( cmd.id == null && cmd.query == null ) {
+        if( cmd.getId() == null && cmd.getQuery() == null ) {
           throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Missing id or query for delete ["+parser.getPosition()+"]" );
         }
         return cmd;

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java Wed Jan 25 19:49:26 2012
@@ -43,6 +43,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.IndexDeletionPolicy;
 import org.apache.lucene.index.IndexReader;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
@@ -81,6 +82,7 @@ import org.slf4j.LoggerFactory;
  * @since solr 1.4
  */
 public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAware {
+  
   private static final Logger LOG = LoggerFactory.getLogger(ReplicationHandler.class.getName());
   SolrCore core;
 
@@ -128,6 +130,8 @@ public class ReplicationHandler extends 
     // It gives the current 'replicateable' index version
     if (command.equals(CMD_INDEX_VERSION)) {
       IndexCommit commitPoint = indexCommitPoint;  // make a copy so it won't change
+ 
+      //System.out.println("The latest index gen is:" + commitPoint.getGeneration() + " " + core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName());
       if (commitPoint != null && replicationEnabled.get()) {
         //
         // There is a race condition here.  The commit point may be changed / deleted by the time
@@ -162,7 +166,7 @@ public class ReplicationHandler extends 
       new Thread() {
         @Override
         public void run() {
-          doFetch(paramsCopy);
+          doFetch(paramsCopy, false);
         }
       }.start();
       rsp.add(STATUS, OK_STATUS);
@@ -270,10 +274,10 @@ public class ReplicationHandler extends 
 
   private volatile SnapPuller tempSnapPuller;
 
-  void doFetch(SolrParams solrParams) {
+  public boolean doFetch(SolrParams solrParams, boolean force) {
     String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL);
     if (!snapPullLock.tryLock())
-      return;
+      return false;
     try {
       tempSnapPuller = snapPuller;
       if (masterUrl != null) {
@@ -281,13 +285,14 @@ public class ReplicationHandler extends 
         nl.remove(SnapPuller.POLL_INTERVAL);
         tempSnapPuller = new SnapPuller(nl, this, core);
       }
-      tempSnapPuller.fetchLatestIndex(core);
+      return tempSnapPuller.fetchLatestIndex(core, force);
     } catch (Exception e) {
-      LOG.error("SnapPull failed ", e);
+      SolrException.log(LOG, "SnapPull failed ", e);
     } finally {
       tempSnapPuller = snapPuller;
       snapPullLock.unlock();
     }
+    return false;
   }
 
   boolean isReplicating() {
@@ -334,6 +339,8 @@ public class ReplicationHandler extends 
     }
     long version = Long.parseLong(v);
     IndexCommit commit = core.getDeletionPolicy().getCommitPoint(version);
+ 
+    //System.out.println("ask for files for gen:" + commit.getGeneration() + core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName());
     if (commit == null) {
       rsp.add("status", "invalid indexversion");
       return;
@@ -757,12 +764,12 @@ public class ReplicationHandler extends 
   }
 
 
-  void refreshCommitpoint() {
-    IndexCommit commitPoint = core.getDeletionPolicy().getLatestCommit();
-    if(replicateOnCommit || (replicateOnOptimize && commitPoint.getSegmentCount() == 1)) {
-      indexCommitPoint = commitPoint;
-    }
-  }
+//  void refreshCommitpoint() {
+//    IndexCommit commitPoint = core.getDeletionPolicy().getLatestCommit();
+//    if(replicateOnCommit || (replicateOnOptimize && commitPoint.getSegmentCount() == 1)) {
+//      indexCommitPoint = commitPoint;
+//    }
+//  }
 
   @SuppressWarnings("unchecked")
   public void inform(SolrCore core) {
@@ -777,6 +784,12 @@ public class ReplicationHandler extends 
     }
     NamedList master = (NamedList) initArgs.get("master");
     boolean enableMaster = isEnabled( master );
+    
+    if (!enableSlave && !enableMaster) {
+      enableMaster = true;
+      master = new NamedList<Object>();
+    }
+    
     if (enableMaster) {
       includeConfFiles = (String) master.get(CONF_FILES);
       if (includeConfFiles != null && includeConfFiles.trim().length() > 0) {
@@ -796,6 +809,10 @@ public class ReplicationHandler extends 
       replicateOnCommit = replicateAfter.contains("commit");
       replicateOnOptimize = !replicateOnCommit && replicateAfter.contains("optimize");
 
+      if (!replicateOnCommit && ! replicateOnOptimize) {
+        replicateOnCommit = true;
+      }
+      
       // if we only want to replicate on optimize, we need the deletion policy to
       // save the last optimized commit point.
       if (replicateOnOptimize) {

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java Wed Jan 25 19:49:26 2012
@@ -33,6 +33,7 @@ import static org.apache.solr.handler.Re
 import org.apache.solr.request.LocalSolrQueryRequest;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.util.RefCounted;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -158,7 +159,7 @@ public class SnapPuller {
         }
         try {
           executorStartTime = System.currentTimeMillis();
-          replicationHandler.doFetch(null);
+          replicationHandler.doFetch(null, false);
         } catch (Exception e) {
           LOG.error("Exception in fetching index", e);
         }
@@ -243,7 +244,8 @@ public class SnapPuller {
   @SuppressWarnings("unchecked")
   boolean successfulInstall = false;
 
-  boolean fetchLatestIndex(SolrCore core) throws IOException {
+  boolean fetchLatestIndex(SolrCore core, boolean force) throws IOException, InterruptedException {
+    successfulInstall = false;
     replicationStartTime = System.currentTimeMillis();
     try {
       //get the current 'replicateable' index version in the master
@@ -256,23 +258,41 @@ public class SnapPuller {
       }
       long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
       long latestGeneration = (Long) response.get(GENERATION);
-      if (latestVersion == 0L) {
-        //there is nothing to be replicated
-        return false;
-      }
+
       IndexCommit commit;
       RefCounted<SolrIndexSearcher> searcherRefCounted = null;
       try {
         searcherRefCounted = core.getNewestSearcher(false);
+        if (searcherRefCounted == null) {
+          SolrException.log(LOG, "No open searcher found - fetch aborted");
+          return false;
+        }
         commit = searcherRefCounted.get().getIndexReader().getIndexCommit();
       } finally {
         if (searcherRefCounted != null)
           searcherRefCounted.decref();
       }
+      
+      if (latestVersion == 0L) {
+        if (force && commit.getVersion() != 0) {
+          // since we won't get the files for an empty index,
+          // we just clear ours and commit
+          core.getUpdateHandler().getSolrCoreState().getIndexWriter(core).deleteAll();
+          SolrQueryRequest req = new LocalSolrQueryRequest(core,
+              new ModifiableSolrParams());
+          core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
+        }
+        
+        //there is nothing to be replicated
+        successfulInstall = true;
+        return true;
+      }
+      
       if (commit.getVersion() == latestVersion && commit.getGeneration() == latestGeneration) {
-        //master and slave are alsready in sync just return
+        //master and slave are already in sync just return
         LOG.info("Slave in sync with master.");
-        return false;
+        successfulInstall = true;
+        return true;
       }
       LOG.info("Master's version: " + latestVersion + ", generation: " + latestGeneration);
       LOG.info("Slave's version: " + commit.getVersion() + ", generation: " + commit.getGeneration());
@@ -289,7 +309,7 @@ public class SnapPuller {
       filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
       // if the generateion of master is older than that of the slave , it means they are not compatible to be copied
       // then a new index direcory to be created and all the files need to be copied
-      boolean isFullCopyNeeded = commit.getGeneration() >= latestGeneration;
+      boolean isFullCopyNeeded = commit.getVersion() >= latestVersion || force;
       File tmpIndexDir = createTempindexDir(core);
       if (isIndexStale())
         isFullCopyNeeded = true;
@@ -331,15 +351,17 @@ public class SnapPuller {
         return successfulInstall;
       } catch (ReplicationHandlerException e) {
         LOG.error("User aborted Replication");
+        return false;
       } catch (SolrException e) {
         throw e;
+      } catch (InterruptedException e) {
+        throw new InterruptedException("Index fetch interrupted");
       } catch (Exception e) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Index fetch failed : ", e);
       } finally {
         if (deleteTmpIdxDir) delTree(tmpIndexDir);
         else delTree(indexDir);
       }
-      return successfulInstall;
     } finally {
       if (!successfulInstall) {
         logReplicationTimeAndConfFiles(null, successfulInstall);
@@ -476,9 +498,9 @@ public class SnapPuller {
       
       // reboot the writer on the new index and get a new searcher
       solrCore.getUpdateHandler().newIndexWriter();
-      solrCore.getSearcher(true, false, null);
-      
-      replicationHandler.refreshCommitpoint();
+      // update our commit point to the right dir
+      solrCore.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
+
     } finally {
       req.close();
     }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/XMLLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/XMLLoader.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/XMLLoader.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/XMLLoader.java Wed Jan 25 19:49:26 2012
@@ -211,14 +211,24 @@ class XMLLoader extends ContentStreamLoa
                     "unexpected XML tag /delete/" + mode);
           }
           text.setLength(0);
+          
+          if ("id".equals(mode)) {
+            for (int i = 0; i < parser.getAttributeCount(); i++) {
+              String attrName = parser.getAttributeLocalName(i);
+              String attrVal = parser.getAttributeValue(i);
+              if (XmlUpdateRequestHandler.VERSION.equals(attrName)) {
+                deleteCmd.setVersion(Long.parseLong(attrVal));
+              }
+            }
+          }
           break;
 
         case XMLStreamConstants.END_ELEMENT:
           String currTag = parser.getLocalName();
           if ("id".equals(currTag)) {
-            deleteCmd.id = text.toString();
+            deleteCmd.setId(text.toString());         
           } else if ("query".equals(currTag)) {
-            deleteCmd.query = text.toString();
+            deleteCmd.setQuery(text.toString());
           } else if ("delete".equals(currTag)) {
             return;
           } else {

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/XmlUpdateRequestHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/XmlUpdateRequestHandler.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/XmlUpdateRequestHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/XmlUpdateRequestHandler.java Wed Jan 25 19:49:26 2012
@@ -44,9 +44,12 @@ public class XmlUpdateRequestHandler ext
 
   public static final String OVERWRITE = "overwrite";
   
+  public static final String VERSION = "version";
+  
   // NOTE: This constant is for use with the <add> XML tag, not the HTTP param with same name
   public static final String COMMIT_WITHIN = "commitWithin";
 
+
   XMLInputFactory inputFactory;
 
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Wed Jan 25 19:49:26 2012
@@ -17,39 +17,56 @@
 
 package org.apache.solr.handler.admin;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.IOUtils;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.params.UpdateParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
-import org.apache.solr.core.*;
+import org.apache.solr.core.CloseHook;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.RequestHandlerBase;
-import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.search.SolrIndexSearcher;
-import org.apache.solr.util.NumberUtils;
-import org.apache.solr.util.RefCounted;
-import org.apache.solr.util.SolrPluginUtils;
+import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.update.MergeIndexesCommand;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
 import org.apache.solr.update.processor.UpdateRequestProcessor;
 import org.apache.solr.update.processor.UpdateRequestProcessorChain;
+import org.apache.solr.util.NumberUtils;
+import org.apache.solr.util.RefCounted;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.Properties;
-
 /**
  *
  * @since solr 1.3
@@ -161,6 +178,21 @@ public class CoreAdminHandler extends Re
           break;
         }
 
+        case PREPRECOVERY: {
+          this.handlePrepRecoveryAction(req, rsp);
+          break;
+        }
+        
+        case REQUESTRECOVERY: {
+          this.handleRequestRecoveryAction(req, rsp);
+          break;
+        }
+        
+        case DISTRIBURL: {
+          this.handleDistribUrlAction(req, rsp);
+          break;
+        }
+        
         default: {
           doPersist = this.handleCustomAction(req, rsp);
           break;
@@ -554,6 +586,142 @@ public class CoreAdminHandler extends Re
     return doPersist;
 
   }
+  
+  protected void handleRequestRecoveryAction(SolrQueryRequest req,
+      SolrQueryResponse rsp) throws IOException {
+    final SolrParams params = req.getParams();
+    
+    String cname = params.get(CoreAdminParams.CORE);
+    if (cname == null) {
+      cname = "";
+    }
+    SolrCore core = null;
+    try {
+      core = coreContainer.getCore(cname);
+      core.getUpdateHandler().getSolrCoreState().doRecovery(core);
+    } finally {
+      // no recoveryStrat close for now
+      if (core != null) {
+        core.close();
+      }
+    }
+  }
+  
+  protected void handlePrepRecoveryAction(SolrQueryRequest req,
+      SolrQueryResponse rsp) throws IOException, InterruptedException {
+    final SolrParams params = req.getParams();
+    
+    String cname = params.get(CoreAdminParams.CORE);
+    if (cname == null) {
+      cname = "";
+    }
+    
+    String nodeName = params.get("nodeName");
+    String coreNodeName = params.get("coreNodeName");
+    
+ 
+    SolrCore core =  null;
+
+    try {
+      core = coreContainer.getCore(cname);
+      if (core == null) {
+        throw new SolrException(ErrorCode.BAD_REQUEST, "core not found:" + cname);
+      }
+      String state;
+      int retry = 0;
+      while (true) {
+        // wait until we are sure the recovering node is ready
+        // to accept updates
+        CloudDescriptor cloudDescriptor = core.getCoreDescriptor()
+            .getCloudDescriptor();
+        CloudState cloudState = coreContainer
+            .getZkController()
+            .getCloudState();
+        ZkNodeProps nodeProps = 
+            cloudState.getSlice(cloudDescriptor.getCollectionName(),
+                cloudDescriptor.getShardId()).getShards().get(coreNodeName);
+        state = nodeProps.get(ZkStateReader.STATE_PROP);
+        boolean live = cloudState.liveNodesContain(nodeName);
+        if (nodeProps != null && state.equals(ZkStateReader.RECOVERING)
+            && live) {
+          break;
+        }
+        
+        if (retry++ == 30) {
+          throw new SolrException(ErrorCode.BAD_REQUEST,
+              "I was asked to prep for recovery for " + nodeName
+                  + " but she is not live or not in a recovery state - state: " + state + " live:" + live);
+        }
+        
+        Thread.sleep(1000);
+      }
+      
+      // small safety net for any updates that started with state that
+      // kept it from sending the update to be buffered -
+      // pause for a while to let any outstanding updates finish
+      
+      Thread.sleep(4000);
+      
+      UpdateRequestProcessorChain processorChain = core
+          .getUpdateProcessingChain(params.get(UpdateParams.UPDATE_CHAIN));
+      
+      ModifiableSolrParams reqParams = new ModifiableSolrParams(req.getParams());
+      reqParams.set(DistributedUpdateProcessor.COMMIT_END_POINT, "true");
+      
+      SolrQueryRequest sqr = new LocalSolrQueryRequest(core, reqParams);
+      UpdateRequestProcessor processor = processorChain.createProcessor(sqr,
+          new SolrQueryResponse());
+      CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
+      
+      processor.processCommit(cuc);
+      processor.finish();
+      
+      // solrcloud_debug
+//      try {
+//        RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
+//        SolrIndexSearcher searcher = searchHolder.get();
+//        try {
+//          System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName() + " to replicate "
+//              + searcher.search(new MatchAllDocsQuery(), 1).totalHits + " gen:" + core.getDeletionPolicy().getLatestCommit().getGeneration()  + " data:" + core.getDataDir());
+//        } finally {
+//          searchHolder.decref();
+//        }
+//      } catch (Exception e) {
+//        
+//      }
+      
+    } finally {
+      if (core != null) {
+        core.close();
+      }
+    }
+  }
+  
+  protected void handleDistribUrlAction(SolrQueryRequest req,
+      SolrQueryResponse rsp) throws IOException, InterruptedException, SolrServerException {
+    // TODO: finish this and tests
+    SolrParams params = req.getParams();
+    
+    SolrParams required = params.required();
+    String path = required.get("path");
+    String shard = params.get("shard");
+    String collection = required.get("collection");
+    
+    SolrCore core = req.getCore();
+    ZkController zkController = core.getCoreDescriptor().getCoreContainer()
+        .getZkController();
+    if (shard != null) {
+      List<ZkCoreNodeProps> replicas = zkController.getZkStateReader().getReplicaProps(
+          collection, shard, zkController.getNodeName(), core.getName());
+      
+      for (ZkCoreNodeProps node : replicas) {
+        CommonsHttpSolrServer server = new CommonsHttpSolrServer(node.getCoreUrl() + path);
+        QueryRequest qr = new QueryRequest();
+        server.request(qr);
+      }
+
+    }
+  }
 
   protected NamedList<Object> getCoreStatus(CoreContainer cores, String cname) throws IOException {
     NamedList<Object> info = new SimpleOrderedMap<Object>();
@@ -594,6 +762,13 @@ public class CoreAdminHandler extends Re
     return path;
   }
 
+  public static ModifiableSolrParams params(String... params) {
+    ModifiableSolrParams msp = new ModifiableSolrParams();
+    for (int i=0; i<params.length; i+=2) {
+      msp.add(params[i], params[i+1]);
+    }
+    return msp;
+  }
 
   //////////////////////// SolrInfoMBeans methods //////////////////////
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/ShowFileRequestHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/ShowFileRequestHandler.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/ShowFileRequestHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/ShowFileRequestHandler.java Wed Jan 25 19:49:26 2012
@@ -18,26 +18,32 @@
 package org.apache.solr.handler.admin;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.solr.cloud.ZkSolrResourceLoader;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.ContentStreamBase;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.core.SolrResourceLoader;
 import org.apache.solr.handler.RequestHandlerBase;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.RawResponseWriter;
 import org.apache.solr.response.SolrQueryResponse;
+import org.apache.zookeeper.KeeperException;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
 import java.net.URISyntaxException;
 import java.util.Date;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Locale;
 import java.util.Set;
 
@@ -111,8 +117,90 @@ public class ShowFileRequestHandler exte
   }
   
   @Override
-  public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException 
+  public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException, KeeperException, InterruptedException 
   {
+    CoreContainer coreContainer = req.getCore().getCoreDescriptor().getCoreContainer();
+    if (coreContainer.isZooKeeperAware()) {
+      showFromZooKeeper(req, rsp, coreContainer);
+    } else {
+      showFromFileSystem(req, rsp);
+    }
+  }
+
+  private void showFromZooKeeper(SolrQueryRequest req, SolrQueryResponse rsp,
+      CoreContainer coreContainer) throws KeeperException,
+      InterruptedException, UnsupportedEncodingException {
+    String adminFile = null;
+    SolrCore core = req.getCore();
+    SolrZkClient zkClient = coreContainer.getZkController().getZkClient();
+    final ZkSolrResourceLoader loader = (ZkSolrResourceLoader) core
+        .getResourceLoader();
+    String confPath = loader.getCollectionZkPath();
+    
+    String fname = req.getParams().get("file", null);
+    if (fname == null) {
+      adminFile = confPath;
+    } else {
+      fname = fname.replace('\\', '/'); // normalize slashes
+      if (hiddenFiles.contains(fname.toUpperCase(Locale.ENGLISH))) {
+        throw new SolrException(ErrorCode.FORBIDDEN, "Can not access: " + fname);
+      }
+      if (fname.indexOf("..") >= 0) {
+        throw new SolrException(ErrorCode.FORBIDDEN, "Invalid path: " + fname);
+      }
+      adminFile = confPath + "/" + fname;
+    }
+    
+    // Make sure the file exists, is readable and is not a hidden file
+    if (!zkClient.exists(adminFile, true)) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "Can not find: "
+          + adminFile);
+    }
+    
+    // Show a directory listing
+    List<String> children = zkClient.getChildren(adminFile, null, true);
+    if (children.size() > 0) {
+      
+      NamedList<SimpleOrderedMap<Object>> files = new SimpleOrderedMap<SimpleOrderedMap<Object>>();
+      for (String f : children) {
+        if (hiddenFiles.contains(f.toUpperCase(Locale.ENGLISH))) {
+          continue; // don't show 'hidden' files
+        }
+        if (f.startsWith(".")) {
+          continue; // skip hidden system files...
+        }
+        
+        SimpleOrderedMap<Object> fileInfo = new SimpleOrderedMap<Object>();
+        files.add(f, fileInfo);
+        List<String> fchildren = zkClient.getChildren(adminFile, null, true);
+        if (fchildren.size() > 0) {
+          fileInfo.add("directory", true);
+        } else {
+          // TODO? content type
+          fileInfo.add("size", f.length());
+        }
+        // TODO: ?
+        // fileInfo.add( "modified", new Date( f.lastModified() ) );
+      }
+      rsp.add("files", files);
+    } else {
+      // Include the file contents
+      // The file logic depends on RawResponseWriter, so force its use.
+      ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
+      params.set(CommonParams.WT, "raw");
+      req.setParams(params);
+      
+      ContentStreamBase content = new ContentStreamBase.StringStream(
+          new String(zkClient.getData(adminFile, null, null, true), "UTF-8"));
+      content.setContentType(req.getParams().get(USE_CONTENT_TYPE));
+      
+      rsp.add(RawResponseWriter.CONTENT, content);
+    }
+    rsp.setHttpCaching(false);
+  }
+
+  private void showFromFileSystem(SolrQueryRequest req, SolrQueryResponse rsp)
+      throws IOException {
     File adminFile = null;
     
     final SolrResourceLoader loader = req.getCore().getResourceLoader();

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java Wed Jan 25 19:49:26 2012
@@ -22,11 +22,13 @@ import org.apache.solr.client.solrj.Solr
 import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
 import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
 import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.CloudState;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CommonParams;
@@ -234,7 +236,8 @@ public class HttpShardHandler extends Sh
     SolrQueryRequest req = rb.req;
     SolrParams params = req.getParams();
 
-    rb.isDistrib = params.getBool("distrib",false);
+    rb.isDistrib = params.getBool("distrib", req.getCore().getCoreDescriptor()
+        .getCoreContainer().isZooKeeperAware());
     String shards = params.get(ShardParams.SHARDS);
 
     // for back compat, a shards param with URLs like localhost:8983/solr will mean that this
@@ -272,11 +275,36 @@ public class HttpShardHandler extends Sh
 
         cloudState =  zkController.getCloudState();
 
-        // TODO: check "collection" for which collection(s) to search.. but for now, just default
-        // to the collection for this core.
-        // This can be more efficient... we only record the name, even though we have the
-        // shard info we need in the next step of mapping slice->shards
-        slices = cloudState.getSlices(cloudDescriptor.getCollectionName());
+        // This can be more efficient... we only record the name, even though we
+        // have the shard info we need in the next step of mapping slice->shards
+        
+        // Stores the comma-separated list of specified collections.
+        // Eg: "collection1,collection2,collection3"
+        String collections = params.get("collection");
+        if (collections != null) {
+          // If there were one or more collections specified in the query, split
+          // each parameter and store as a seperate member of a List.
+          List<String> collectionList = StrUtils.splitSmart(collections, ",",
+              true);
+          
+          // First create an empty HashMap to add the slice info to.
+          slices = new HashMap<String,Slice>();
+          
+          // In turn, retrieve the slices that cover each collection from the
+          // cloud state and add them to the Map 'slices'.
+          for (int i = 0; i < collectionList.size(); i++) {
+            String collection = collectionList.get(i);
+            ClientUtils.appendMap(collection, slices, cloudState.getSlices(collection));
+          }
+        } else {
+          // If no collections were specified, default to the collection for
+          // this core.
+          slices = cloudState.getSlices(cloudDescriptor.getCollectionName());
+        }
+        
+        // Store the logical slices in the ResponseBuilder and create a new
+        // String array to hold the physical shards (which will be mapped
+        // later).
         rb.slices = slices.keySet().toArray(new String[slices.size()]);
         rb.shards = new String[rb.slices.length];
 
@@ -317,14 +345,16 @@ public class HttpShardHandler extends Sh
             StringBuilder sliceShardsStr = new StringBuilder();
             boolean first = true;
             for (ZkNodeProps nodeProps : sliceShards.values()) {
-              if (!liveNodes.contains(nodeProps.get(ZkStateReader.NODE_NAME)))
-                continue;
+              ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
+              if (!liveNodes.contains(coreNodeProps.getNodeName())
+                  || !coreNodeProps.getState().equals(
+                      ZkStateReader.ACTIVE)) continue;
               if (first) {
                 first = false;
               } else {
                 sliceShardsStr.append('|');
               }
-              String url = nodeProps.get("url");
+              String url = coreNodeProps.getCoreUrl();
               if (url.startsWith("http://"))
                 url = url.substring(7);
               sliceShardsStr.append(url);

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java Wed Jan 25 19:49:26 2012
@@ -16,6 +16,13 @@ package org.apache.solr.handler.componen
  * limitations under the License.
  */
 
+import java.net.MalformedURLException;
+import java.util.Random;
+import java.util.concurrent.Executor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
@@ -23,17 +30,12 @@ import org.apache.commons.httpclient.par
 import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.core.PluginInfo;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.util.plugin.PluginInfoInitialized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.MalformedURLException;
-import java.util.Random;
-import java.util.concurrent.Executor;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
 
 public class HttpShardHandlerFactory extends ShardHandlerFactory implements PluginInfoInitialized{
   protected static Logger log = LoggerFactory.getLogger(HttpShardHandlerFactory.class);
@@ -44,12 +46,13 @@ public class HttpShardHandlerFactory ext
   //
   // Consider CallerRuns policy and a lower max threads to throttle
   // requests at some point (or should we simply return failure?)
-   Executor commExecutor = new ThreadPoolExecutor(
+   ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(
           0,
           Integer.MAX_VALUE,
           5, TimeUnit.SECONDS, // terminate idle threads after 5 sec
-          new SynchronousQueue<Runnable>()  // directly hand off tasks
-  );
+          new SynchronousQueue<Runnable>(),  // directly hand off tasks
+          new DefaultSolrThreadFactory("httpShardExecutor")
+   );
 
 
   HttpClient client;
@@ -58,6 +61,8 @@ public class HttpShardHandlerFactory ext
   int soTimeout = 0; //current default values
   int connectionTimeout = 0; //current default values
   public  String scheme = "http://"; //current default values
+
+  private MultiThreadedHttpConnectionManager mgr;
  // socket timeout measured in ms, closes a socket if read
   // takes longer than x ms to complete. throws
   // java.net.SocketTimeoutException: Read timed out exception
@@ -97,7 +102,7 @@ public class HttpShardHandlerFactory ext
           log.info("Setting shard-connection-timeout to: " + connectionTimeout);
         }
     }
-    MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
+    mgr = new MultiThreadedHttpConnectionManager();
     mgr.getParams().setDefaultMaxConnectionsPerHost(20);
     mgr.getParams().setMaxTotalConnections(10000);
     mgr.getParams().setConnectionTimeout(connectionTimeout);
@@ -118,4 +123,23 @@ public class HttpShardHandlerFactory ext
     }
 
   }
+
+  @Override
+  public void close() {
+    try {
+      mgr.shutdown();
+    } catch (Throwable e) {
+      SolrException.log(log, e);
+    }
+    try {
+      loadbalancer.shutdown();
+    } catch (Throwable e) {
+      SolrException.log(log, e);
+    }
+    try {
+      commExecutor.shutdownNow();
+    } catch (Throwable e) {
+      SolrException.log(log, e);
+    }
+  }
 }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java Wed Jan 25 19:49:26 2012
@@ -31,6 +31,7 @@ import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.CharsRef;
 import org.apache.lucene.util.ReaderUtil;
 import org.apache.lucene.util.UnicodeUtil;
+import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrDocument;
@@ -38,6 +39,7 @@ import org.apache.solr.common.SolrDocume
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.CloudState;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.*;
@@ -206,7 +208,8 @@ public class QueryComponent extends Sear
     SolrQueryRequest req = rb.req;
     SolrParams params = req.getParams();
 
-    rb.isDistrib = params.getBool("distrib",false);
+    rb.isDistrib = params.getBool("distrib", req.getCore().getCoreDescriptor()
+        .getCoreContainer().isZooKeeperAware());
     String shards = params.get(ShardParams.SHARDS);
 
     // for back compat, a shards param with URLs like localhost:8983/solr will mean that this
@@ -244,11 +247,36 @@ public class QueryComponent extends Sear
 
         cloudState =  zkController.getCloudState();
 
-        // TODO: check "collection" for which collection(s) to search.. but for now, just default
-        // to the collection for this core.
         // This can be more efficient... we only record the name, even though we have the
         // shard info we need in the next step of mapping slice->shards
-        slices = cloudState.getSlices(cloudDescriptor.getCollectionName());
+        
+        // Stores the comma-separated list of specified collections.
+        // Eg: "collection1,collection2,collection3"
+        String collections = params.get("collection");
+        if (collections != null) {
+          // If there were one or more collections specified in the query, split
+          // each parameter and store as a seperate member of a List.
+          List<String> collectionList = StrUtils.splitSmart(collections, ",",
+              true);
+          
+          // First create an empty HashMap to add the slice info to.
+          slices = new HashMap<String,Slice>();
+          
+          // In turn, retrieve the slices that cover each collection from the
+          // cloud state and add them to the Map 'slices'.
+          for (int i = 0; i < collectionList.size(); i++) {
+            String collection = collectionList.get(i);
+            ClientUtils.appendMap(collection, slices, cloudState.getSlices(collection));
+          }
+        } else {
+          // If no collections were specified, default to the collection for
+          // this core.
+          slices = cloudState.getSlices(cloudDescriptor.getCollectionName());
+        }
+        
+        // Store the logical slices in the ResponseBuilder and create a new
+        // String array to hold the physical shards (which will be mapped
+        // later).
         rb.slices = slices.keySet().toArray(new String[slices.size()]);
         rb.shards = new String[rb.slices.length];
 
@@ -289,14 +317,16 @@ public class QueryComponent extends Sear
             StringBuilder sliceShardsStr = new StringBuilder();
             boolean first = true;
             for (ZkNodeProps nodeProps : sliceShards.values()) {
-              if (!liveNodes.contains(nodeProps.get(ZkStateReader.NODE_NAME)))
-                continue;
+              ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
+              if (!liveNodes.contains(coreNodeProps.getNodeName())
+                  || !coreNodeProps.getState().equals(
+                      ZkStateReader.ACTIVE)) continue;
               if (first) {
                 first = false;
               } else {
                 sliceShardsStr.append('|');
               }
-              String url = nodeProps.get("url");
+              String url = coreNodeProps.getCoreUrl();
               if (url.startsWith("http://"))
                 url = url.substring(7);
               sliceShardsStr.append(url);