You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/01/25 20:49:30 UTC
svn commit: r1235888 [3/12] - in /lucene/dev/trunk: dev-tools/eclipse/
dev-tools/maven/ solr/ solr/cloud-dev/
solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/
solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/da...
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java Wed Jan 25 19:49:26 2012
@@ -23,6 +23,8 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.TimeoutException;
import java.text.SimpleDateFormat;
+import org.apache.solr.handler.component.HttpShardHandlerFactory;
+import org.apache.solr.handler.component.ShardHandlerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,10 +34,12 @@ import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathExpressionException;
import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.CurrentCoreDescriptorProvider;
import org.apache.solr.cloud.SolrZkServer;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.DOMUtil;
@@ -85,6 +89,7 @@ public class CoreContainer
private SolrXMLSerializer solrXMLSerializer = new SolrXMLSerializer();
private ZkController zkController;
private SolrZkServer zkServer;
+ private ShardHandlerFactory shardHandlerFactory;
private String zkHost;
@@ -151,7 +156,7 @@ public class CoreContainer
zookeeperHost = zkServer.getClientString();
}
- int zkClientConnectTimeout = 5000;
+ int zkClientConnectTimeout = 15000;
if (zookeeperHost != null) {
// we are ZooKeeper enabled
@@ -163,7 +168,17 @@ public class CoreContainer
} else {
log.info("Zookeeper client=" + zookeeperHost);
}
- zkController = new ZkController(zookeeperHost, zkClientTimeout, zkClientConnectTimeout, host, hostPort, hostContext);
+ zkController = new ZkController(this, zookeeperHost, zkClientTimeout, zkClientConnectTimeout, host, hostPort, hostContext, new CurrentCoreDescriptorProvider() {
+
+ @Override
+ public List<CoreDescriptor> getCurrentDescriptors() {
+ List<CoreDescriptor> descriptors = new ArrayList<CoreDescriptor>(getCoreNames().size());
+ for (SolrCore core : getCores()) {
+ descriptors.add(core.getCoreDescriptor());
+ }
+ return descriptors;
+ }
+ });
String confDir = System.getProperty("bootstrap_confdir");
if(confDir != null) {
@@ -203,7 +218,7 @@ public class CoreContainer
// Helper class to initialize the CoreContainer
public static class Initializer {
- protected String solrConfigFilename = null;
+ protected String containerConfigFilename = null; // normally "solr.xml"
protected String dataDir = null; // override datadir for single core mode
// core container instantiation
@@ -211,9 +226,8 @@ public class CoreContainer
ParserConfigurationException, SAXException {
CoreContainer cores = null;
String solrHome = SolrResourceLoader.locateSolrHome();
- // TODO : fix broken logic confusing solr.xml with solrconfig.xml
- File fconf = new File(solrHome, solrConfigFilename == null ? "solr.xml"
- : solrConfigFilename);
+ File fconf = new File(solrHome, containerConfigFilename == null ? "solr.xml"
+ : containerConfigFilename);
log.info("looking for solr.xml: " + fconf.getAbsolutePath());
cores = new CoreContainer();
@@ -225,10 +239,7 @@ public class CoreContainer
cores.configFile = fconf;
}
- solrConfigFilename = cores.getConfigFile().getName();
- if (cores.cores.isEmpty()){
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "No cores were created, please check the logs for errors");
- }
+ containerConfigFilename = cores.getConfigFile().getName();
return cores;
}
@@ -300,10 +311,7 @@ public class CoreContainer
shareSchema = cfg.getBool("solr/cores/@shareSchema", false);
zkClientTimeout = cfg.getInt("solr/cores/@zkClientTimeout", 10000);
- hostPort = System.getProperty("hostPort");
- if (hostPort == null) {
- hostPort = cfg.get("solr/cores/@hostPort", "8983");
- }
+ hostPort = cfg.get("solr/cores/@hostPort", "8983");
hostContext = cfg.get("solr/cores/@hostContext", "solr");
host = cfg.get("solr/cores/@host", null);
@@ -338,7 +346,7 @@ public class CoreContainer
}
NodeList nodes = (NodeList)cfg.evaluate("solr/cores/core", XPathConstants.NODESET);
- boolean defaultCoreFound = false;
+
for (int i=0; i<nodes.getLength(); i++) {
Node node = nodes.item(i);
try {
@@ -374,6 +382,10 @@ public class CoreContainer
if (opt != null) {
p.getCloudDescriptor().setCollectionName(opt);
}
+ opt = DOMUtil.getAttr(node, "roles", null);
+ if(opt != null){
+ p.getCloudDescriptor().setRoles(opt);
+ }
}
opt = DOMUtil.getAttr(node, "properties", null);
if (opt != null) {
@@ -393,29 +405,6 @@ public class CoreContainer
SolrException.log(log,null,ex);
}
}
-
- if(zkController != null) {
- try {
- synchronized (zkController.getZkStateReader().getUpdateLock()) {
- zkController.getZkStateReader().makeShardZkNodeWatches(false);
- zkController.getZkStateReader().updateCloudState(true);
- }
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- } catch (KeeperException e) {
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- } catch (IOException e) {
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- }
- }
}
private Properties readProperties(Config cfg, Node node) throws XPathExpressionException {
@@ -428,7 +417,13 @@ public class CoreContainer
}
return properties;
}
- private boolean isShutDown = false;
+
+ private volatile boolean isShutDown = false;
+
+ public boolean isShutDown() {
+ return isShutDown;
+ }
+
/**
* Stops all cores.
*/
@@ -436,8 +431,14 @@ public class CoreContainer
log.info("Shutting down CoreContainer instance="+System.identityHashCode(this));
synchronized(cores) {
try {
- for(SolrCore core : cores.values()) {
- core.close();
+ for (SolrCore core : cores.values()) {
+ try {
+ if (!core.isClosed()) {
+ core.close();
+ }
+ } catch (Throwable t) {
+ SolrException.log(log, "Error shutting down core", t);
+ }
}
cores.clear();
} finally {
@@ -447,6 +448,9 @@ public class CoreContainer
if (zkServer != null) {
zkServer.stop();
}
+ if (shardHandlerFactory != null) {
+ shardHandlerFactory.close();
+ }
isShutDown = true;
}
}
@@ -457,7 +461,6 @@ public class CoreContainer
try {
if(!isShutDown){
log.error("CoreContainer was not shutdown prior to finalize(), indicates a bug -- POSSIBLE RESOURCE LEAK!!! instance=" + System.identityHashCode(this));
- shutdown();
}
} finally {
super.finalize();
@@ -480,6 +483,12 @@ public class CoreContainer
throw new RuntimeException( "Invalid core name: "+name );
}
+ if (zkController != null) {
+ // before becoming available, make sure we are not live and active
+ // this also gets us our assigned shard id if it was not specified
+ zkController.publish(core, ZkStateReader.DOWN);
+ }
+
SolrCore old = null;
synchronized (cores) {
old = cores.put(name, core);
@@ -491,39 +500,44 @@ public class CoreContainer
core.getCoreDescriptor().name = name;
}
+ if( old == null || old == core) {
+ log.info( "registering core: "+name );
+ registerInZk(core);
+ return null;
+ }
+ else {
+ log.info( "replacing core: "+name );
+ if (!returnPrevNotClosed) {
+ old.close();
+ }
+ registerInZk(core);
+ return old;
+ }
+ }
+
+
+ private void registerInZk(SolrCore core) {
if (zkController != null) {
try {
- zkController.register(core.getName(), core.getCoreDescriptor().getCloudDescriptor(), true);
+ zkController.register(core.getName(), core.getCoreDescriptor());
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
e);
- } catch (KeeperException e) {
+ } catch (Exception e) {
+ // if register fails, this is really bad - close the zkController to
+ // minimize any damage we can cause
+ zkController.publish(core, ZkStateReader.DOWN);
+ zkController.close();
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
e);
- } catch (IOException e) {
- log.error("", e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", e);
}
}
-
- if( old == null || old == core) {
- log.info( "registering core: "+name );
- return null;
- }
- else {
- log.info( "replacing core: "+name );
- if (!returnPrevNotClosed) {
- old.close();
- }
- return old;
- }
}
-
/**
* Registers a SolrCore descriptor in the registry using the core's name.
* If returnPrev==false, the old core, if different, is closed.
@@ -562,7 +576,7 @@ public class CoreContainer
try {
String collection = dcore.getCloudDescriptor().getCollectionName();
zkController.createCollectionZkNode(dcore.getCloudDescriptor());
- // zkController.createCollectionZkNode(collection);
+
zkConfigName = zkController.readConfigName(collection);
if (zkConfigName == null) {
log.error("Could not find config name for collection:" + collection);
@@ -632,6 +646,12 @@ public class CoreContainer
}
SolrCore core = new SolrCore(dcore.getName(), null, config, schema, dcore);
+
+ if (zkController == null && core.getUpdateHandler().getUpdateLog() != null) {
+ // always kick off recovery if we are in standalone mode.
+ core.getUpdateHandler().getUpdateLog().recoverFromLog();
+ }
+
return core;
}
@@ -948,6 +968,22 @@ public class CoreContainer
public ZkController getZkController() {
return zkController;
}
+
+ /** The default ShardHandlerFactory used to communicate with other solr instances */
+ public ShardHandlerFactory getShardHandlerFactory() {
+ synchronized (this) {
+ if (shardHandlerFactory == null) {
+ Map m = new HashMap();
+ m.put("class",HttpShardHandlerFactory.class.getName());
+ PluginInfo info = new PluginInfo("shardHandlerFactory", m,null,Collections.<PluginInfo>emptyList());
+
+ HttpShardHandlerFactory fac = new HttpShardHandlerFactory();
+ fac.init(info);
+ shardHandlerFactory = fac;
+ }
+ return shardHandlerFactory;
+ }
+ }
private SolrConfig getSolrConfigFromZk(String zkConfigName, String solrConfigFileName,
SolrResourceLoader resourceLoader) throws IOException,
@@ -976,7 +1012,7 @@ public class CoreContainer
private static final String DEF_SOLR_XML ="<?xml version=\"1.0\" encoding=\"UTF-8\" ?>\n" +
"<solr persistent=\"false\">\n" +
" <cores adminPath=\"/admin/cores\" defaultCoreName=\"" + DEFAULT_DEFAULT_CORE_NAME + "\">\n" +
- " <core name=\""+ DEFAULT_DEFAULT_CORE_NAME + "\" instanceDir=\".\" />\n" +
+ " <core name=\""+ DEFAULT_DEFAULT_CORE_NAME + "\" shard=\"${shard:}\" instanceDir=\".\" />\n" +
" </cores>\n" +
"</solr>";
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java Wed Jan 25 19:49:26 2012
@@ -43,11 +43,10 @@ public class CoreDescriptor {
this.coreContainer = coreContainer;
this.name = name;
- if(coreContainer.getZkController() != null) {
+ if(coreContainer != null && coreContainer.getZkController() != null) {
this.cloudDesc = new CloudDescriptor();
// cloud collection defaults to core name
cloudDesc.setCollectionName(name.isEmpty() ? coreContainer.getDefaultCoreName() : name);
- this.cloudDesc.setShardId(coreContainer.getZkController().getNodeName() + "_" + name);
}
if (name == null) {
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/RequestHandlers.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/RequestHandlers.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/RequestHandlers.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/RequestHandlers.java Wed Jan 25 19:49:26 2012
@@ -26,11 +26,8 @@ import java.util.concurrent.ConcurrentHa
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.CommonParams.EchoParamStyle;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
-import org.apache.solr.handler.component.SearchHandler;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestHandler;
import org.apache.solr.response.SolrQueryResponse;
@@ -41,7 +38,7 @@ import org.slf4j.LoggerFactory;
/**
*/
-final class RequestHandlers {
+public final class RequestHandlers {
public static Logger log = LoggerFactory.getLogger(RequestHandlers.class);
public static final String DEFAULT_HANDLER_NAME="standard";
@@ -208,7 +205,7 @@ final class RequestHandlers {
*
* @since solr 1.2
*/
- private static final class LazyRequestHandlerWrapper implements SolrRequestHandler, SolrInfoMBean
+ public static final class LazyRequestHandlerWrapper implements SolrRequestHandler, SolrInfoMBean
{
private final SolrCore core;
private String _className;
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java Wed Jan 25 19:49:26 2012
@@ -42,10 +42,7 @@ import org.apache.solr.search.ValueSourc
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.update.SolrIndexWriter;
import org.apache.solr.update.UpdateHandler;
-import org.apache.solr.update.processor.LogUpdateProcessorFactory;
-import org.apache.solr.update.processor.RunUpdateProcessorFactory;
-import org.apache.solr.update.processor.UpdateRequestProcessorChain;
-import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
+import org.apache.solr.update.processor.*;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.plugin.NamedListInitializedPlugin;
import org.apache.solr.util.plugin.SolrCoreAware;
@@ -63,6 +60,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
import java.lang.reflect.Constructor;
+import java.util.concurrent.locks.ReentrantLock;
/**
@@ -77,6 +75,8 @@ public final class SolrCore implements S
private String logid; // used to show what name is set
private final CoreDescriptor coreDescriptor;
+ private boolean isReloaded = false;
+
private final SolrConfig solrConfig;
private final SolrResourceLoader resourceLoader;
private final IndexSchema schema;
@@ -476,20 +476,6 @@ public final class SolrCore implements S
}
/**
- *
- * @param dataDir
- * @param schema
- * @throws SAXException
- * @throws IOException
- * @throws ParserConfigurationException
- *
- * @since solr 1.0
- */
- public SolrCore(String dataDir, IndexSchema schema) throws ParserConfigurationException, IOException, SAXException {
- this(null, dataDir, new SolrConfig(), schema, null);
- }
-
- /**
* Creates a new core and register it in the list of cores.
* If a core with the same name already exists, it will be stopped and replaced by this one.
*
@@ -558,7 +544,8 @@ public final class SolrCore implements S
if (updateHandler == null) {
initDirectoryFactory();
} else {
- directoryFactory = updateHandler.getIndexWriterProvider().getDirectoryFactory();
+ directoryFactory = updateHandler.getSolrCoreState().getDirectoryFactory();
+ this.isReloaded = true;
}
initIndex();
@@ -658,6 +645,7 @@ public final class SolrCore implements S
// construct the default chain
UpdateRequestProcessorFactory[] factories = new UpdateRequestProcessorFactory[]{
new LogUpdateProcessorFactory(),
+ new DistributedUpdateProcessorFactory(),
new RunUpdateProcessorFactory()
};
def = new UpdateRequestProcessorChain(factories, this);
@@ -762,7 +750,7 @@ public final class SolrCore implements S
try {
updateHandler.close();
- } catch (Exception e) {
+ } catch (Throwable e) {
SolrException.log(log,e);
}
@@ -960,15 +948,21 @@ public final class SolrCore implements S
// This reference is protected by searcherLock.
private RefCounted<SolrIndexSearcher> _searcher;
- // All of the open searchers. Don't access this directly.
+ // All of the normal open searchers. Don't access this directly.
// protected by synchronizing on searcherLock.
private final LinkedList<RefCounted<SolrIndexSearcher>> _searchers = new LinkedList<RefCounted<SolrIndexSearcher>>();
+ private final LinkedList<RefCounted<SolrIndexSearcher>> _realtimeSearchers = new LinkedList<RefCounted<SolrIndexSearcher>>();
final ExecutorService searcherExecutor = Executors.newSingleThreadExecutor();
private int onDeckSearchers; // number of searchers preparing
+ // Lock ordering: one can acquire the openSearcherLock and then the searcherLock, but not vice-versa.
private Object searcherLock = new Object(); // the sync object for the searcher
+ private ReentrantLock openSearcherLock = new ReentrantLock(true); // used to serialize opens/reopens for absolute ordering
private final int maxWarmingSearchers; // max number of on-deck searchers allowed
+ private RefCounted<SolrIndexSearcher> realtimeSearcher;
+
+
/**
* Return a registered {@link RefCounted}<{@link SolrIndexSearcher}> with
* the reference count incremented. It <b>must</b> be decremented when no longer needed.
@@ -987,29 +981,162 @@ public final class SolrCore implements S
}
/**
- * Return the newest {@link RefCounted}<{@link SolrIndexSearcher}> with
+ * Return the newest normal {@link RefCounted}<{@link SolrIndexSearcher}> with
* the reference count incremented. It <b>must</b> be decremented when no longer needed.
* If no searcher is currently open, then if openNew==true a new searcher will be opened,
* or null is returned if openNew==false.
*/
public RefCounted<SolrIndexSearcher> getNewestSearcher(boolean openNew) {
synchronized (searcherLock) {
- if (_searchers.isEmpty()) {
- if (!openNew) return null;
- // Not currently implemented since simply calling getSearcher during inform()
- // can result in a deadlock. Right now, solr always opens a searcher first
- // before calling inform() anyway, so this should never happen.
- throw new UnsupportedOperationException();
- }
- RefCounted<SolrIndexSearcher> newest = _searchers.getLast();
- newest.incref();
- return newest;
+ if (!_searchers.isEmpty()) {
+ RefCounted<SolrIndexSearcher> newest = _searchers.getLast();
+ newest.incref();
+ return newest;
+ }
+ }
+
+ return openNew ? getRealtimeSearcher() : null;
+ }
+
+
+ /** Gets the latest real-time searcher w/o forcing open a new searcher if one already exists.
+ * The reference count will be incremented.
+ */
+ public RefCounted<SolrIndexSearcher> getRealtimeSearcher() {
+ synchronized (searcherLock) {
+ if (realtimeSearcher != null) {
+ realtimeSearcher.incref();
+ return realtimeSearcher;
+ }
+ }
+
+ // use the searcher lock to prevent multiple people from trying to open at once
+ openSearcherLock.lock();
+ try {
+
+ // try again
+ synchronized (searcherLock) {
+ if (realtimeSearcher != null) {
+ realtimeSearcher.incref();
+ return realtimeSearcher;
+ }
+ }
+
+ // force a new searcher open
+ return openNewSearcher(true, true);
+ } finally {
+ openSearcherLock.unlock();
}
}
+
public RefCounted<SolrIndexSearcher> getSearcher(boolean forceNew, boolean returnSearcher, final Future[] waitSearcher) throws IOException {
return getSearcher(forceNew, returnSearcher, waitSearcher, false);
}
+
+
+ /** Opens a new searcher and returns a RefCounted<SolrIndexSearcher> with it's reference incremented.
+ *
+ * "realtime" means that we need to open quickly for a realtime view of the index, hence don't do any
+ * autowarming and add to the _realtimeSearchers queue rather than the _searchers queue (so it won't
+ * be used for autowarming by a future normal searcher). A "realtime" searcher will currently never
+ * become "registered" (since it currently lacks caching).
+ *
+ * realtimeSearcher is updated to the latest opened searcher, regardless of the value of "realtime".
+ *
+ * This method aquires openSearcherLock - do not call with searckLock held!
+ */
+ public RefCounted<SolrIndexSearcher> openNewSearcher(boolean updateHandlerReopens, boolean realtime) {
+ SolrIndexSearcher tmp;
+ RefCounted<SolrIndexSearcher> newestSearcher = null;
+ boolean nrt = solrConfig.reopenReaders && updateHandlerReopens;
+
+ openSearcherLock.lock();
+ try {
+ String newIndexDir = null;
+ File indexDirFile = null;
+ File newIndexDirFile = null;
+
+ // if it's not a normal near-realtime update, check that paths haven't changed.
+ if (!nrt) {
+ indexDirFile = new File(getIndexDir()).getCanonicalFile();
+ newIndexDir = getNewIndexDir();
+ newIndexDirFile = new File(newIndexDir).getCanonicalFile();
+ }
+
+ synchronized (searcherLock) {
+ newestSearcher = realtimeSearcher;
+ if (newestSearcher != null) {
+ newestSearcher.incref(); // the matching decref is in the finally block
+ }
+ }
+
+ if (newestSearcher != null && solrConfig.reopenReaders
+ && (nrt || indexDirFile.equals(newIndexDirFile))) {
+
+ IndexReader newReader;
+ IndexReader currentReader = newestSearcher.get().getIndexReader();
+
+ if (updateHandlerReopens) {
+ // SolrCore.verbose("start reopen from",previousSearcher,"writer=",writer);
+ IndexWriter writer = getUpdateHandler().getSolrCoreState().getIndexWriter(this);
+ newReader = IndexReader.openIfChanged(currentReader, writer, true);
+
+ } else {
+ // verbose("start reopen without writer, reader=", currentReader);
+ newReader = IndexReader.openIfChanged(currentReader);
+ // verbose("reopen result", newReader);
+ }
+
+ if (newReader == null) {
+ // if this is a request for a realtime searcher, just return the same searcher if there haven't been any changes.
+ if (realtime) {
+ newestSearcher.incref();
+ return newestSearcher;
+ }
+
+ currentReader.incRef();
+ newReader = currentReader;
+ }
+
+ // for now, turn off caches if this is for a realtime reader (caches take a little while to instantiate)
+ tmp = new SolrIndexSearcher(this, schema, (realtime ? "realtime":"main"), newReader, true, !realtime, true, directoryFactory);
+
+ } else {
+ // verbose("non-reopen START:");
+ tmp = new SolrIndexSearcher(this, newIndexDir, schema, getSolrConfig().mainIndexConfig, "main", true, directoryFactory);
+ // verbose("non-reopen DONE: searcher=",tmp);
+ }
+
+ List<RefCounted<SolrIndexSearcher>> searcherList = realtime ? _realtimeSearchers : _searchers;
+ RefCounted<SolrIndexSearcher> newSearcher = newHolder(tmp, searcherList); // refcount now at 1
+
+ // Increment reference again for "realtimeSearcher" variable. It should be at 2 after.
+ // When it's decremented by both the caller of this method, and by realtimeSearcher being replaced,
+ // it will be closed.
+ newSearcher.incref();
+
+ synchronized (searcherLock) {
+ if (realtimeSearcher != null) {
+ realtimeSearcher.decref();
+ }
+ realtimeSearcher = newSearcher;
+ searcherList.add(realtimeSearcher);
+ }
+
+ return newSearcher;
+
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error opening new searcher", e);
+ }
+ finally {
+ openSearcherLock.unlock();
+ if (newestSearcher != null) {
+ newestSearcher.decref();
+ }
+ }
+
+ }
/**
* Get a {@link SolrIndexSearcher} or start the process of creating a new one.
@@ -1105,80 +1232,28 @@ public final class SolrCore implements S
}
}
- // open the index synchronously
- // if this fails, we need to decrement onDeckSearchers again.
- SolrIndexSearcher tmp;
- RefCounted<SolrIndexSearcher> newestSearcher = null;
+ // a signal to decrement onDeckSearchers if something goes wrong.
+ final boolean[] decrementOnDeckCount=new boolean[]{true};
+ RefCounted<SolrIndexSearcher> currSearcherHolder = null; // searcher we are autowarming from
+ RefCounted<SolrIndexSearcher> searchHolder = null;
+ boolean success = false;
+ openSearcherLock.lock();
try {
- newestSearcher = getNewestSearcher(false);
- String newIndexDir = getNewIndexDir();
- File indexDirFile = new File(getIndexDir()).getCanonicalFile();
- File newIndexDirFile = new File(newIndexDir).getCanonicalFile();
-
- if (newestSearcher != null && solrConfig.reopenReaders
- && indexDirFile.equals(newIndexDirFile)) {
-
- if (updateHandlerReopens) {
-
- tmp = getUpdateHandler().reopenSearcher(newestSearcher.get());
- } else {
-
- IndexReader currentReader = newestSearcher.get().getIndexReader();
- IndexReader newReader;
-
- // verbose("start reopen without writer, reader=", currentReader);
- newReader = IndexReader.openIfChanged(currentReader);
- // verbose("reopen result", newReader);
-
-
- if (newReader == null) {
- currentReader.incRef();
- newReader = currentReader;
- }
-
- tmp = new SolrIndexSearcher(this, schema, "main", newReader, true, true, true, directoryFactory);
- }
-
-
- } else {
- // verbose("non-reopen START:");
- tmp = new SolrIndexSearcher(this, newIndexDir, schema, getSolrConfig().mainIndexConfig, "main", true, directoryFactory);
- // verbose("non-reopen DONE: searcher=",tmp);
- }
- } catch (Throwable th) {
- synchronized(searcherLock) {
- onDeckSearchers--;
- // notify another waiter to continue... it may succeed
- // and wake any others.
- searcherLock.notify();
- }
- // need to close the searcher here??? we shouldn't have to.
- throw new RuntimeException(th);
- } finally {
- if (newestSearcher != null) {
- newestSearcher.decref();
+ searchHolder = openNewSearcher(updateHandlerReopens, false);
+ // the searchHolder will be incremented once already (and it will eventually be assigned to _searcher when registered)
+ // increment it again if we are going to return it to the caller.
+ if (returnSearcher) {
+ searchHolder.incref();
}
- }
-
- final SolrIndexSearcher newSearcher=tmp;
- RefCounted<SolrIndexSearcher> currSearcherHolder=null;
- final RefCounted<SolrIndexSearcher> newSearchHolder=newHolder(newSearcher);
- if (returnSearcher) newSearchHolder.incref();
+ final RefCounted<SolrIndexSearcher> newSearchHolder = searchHolder;
+ final SolrIndexSearcher newSearcher = newSearchHolder.get();
- // a signal to decrement onDeckSearchers if something goes wrong.
- final boolean[] decrementOnDeckCount=new boolean[1];
- decrementOnDeckCount[0]=true;
-
- try {
boolean alreadyRegistered = false;
synchronized (searcherLock) {
- _searchers.add(newSearchHolder);
- // verbose("added searcher ",newSearchHolder.get()," to _searchers");
-
if (_searcher == null) {
// if there isn't a current searcher then we may
// want to register this one before warming is complete instead of waiting.
@@ -1197,174 +1272,133 @@ public final class SolrCore implements S
final SolrIndexSearcher currSearcher = currSearcherHolder==null ? null : currSearcherHolder.get();
- //
- // Note! if we registered the new searcher (but didn't increment it's
- // reference count because returnSearcher==false, it's possible for
- // someone else to register another searcher, and thus cause newSearcher
- // to close while we are warming.
- //
- // Should we protect against that by incrementing the reference count?
- // Maybe we should just let it fail? After all, if returnSearcher==false
- // and newSearcher has been de-registered, what's the point of continuing?
- //
-
Future future=null;
// warm the new searcher based on the current searcher.
// should this go before the other event handlers or after?
if (currSearcher != null) {
- try {
- future = searcherExecutor.submit(
- new Callable() {
- public Object call() throws Exception {
- try {
- newSearcher.warm(currSearcher);
- } catch (Throwable e) {
- SolrException.log(log, null, e);
- }
- return null;
- }
- }
- );
- } catch(Exception e) {
- // if submit fails, newSearchHolder does not get decref'd
- if (newSearchHolder != null) {
- newSearchHolder.decref();
- if (returnSearcher) {
- newSearchHolder.decref();
+ future = searcherExecutor.submit(
+ new Callable() {
+ public Object call() throws Exception {
+ try {
+ newSearcher.warm(currSearcher);
+ } catch (Throwable e) {
+ SolrException.log(log,e);
+ }
+ return null;
+ }
}
- }
- throw e;
- }
+ );
}
-
+
if (currSearcher==null && firstSearcherListeners.size() > 0) {
- try {
- future = searcherExecutor.submit(
- new Callable() {
- public Object call() throws Exception {
- try {
- for (SolrEventListener listener : firstSearcherListeners) {
- listener.newSearcher(newSearcher,null);
- }
- } catch (Throwable e) {
- SolrException.log(log, null, e);
- }
- return null;
- }
+ future = searcherExecutor.submit(
+ new Callable() {
+ public Object call() throws Exception {
+ try {
+ for (SolrEventListener listener : firstSearcherListeners) {
+ listener.newSearcher(newSearcher,null);
}
- );
- } catch(Exception e) {
- // if submit fails, newSearchHolder does not get decref'd
- if (newSearchHolder != null) {
- newSearchHolder.decref();
- if (returnSearcher) {
- newSearchHolder.decref();
+ } catch (Throwable e) {
+ SolrException.log(log,null,e);
+ }
+ return null;
+ }
}
- }
- throw e;
- }
+ );
}
if (currSearcher!=null && newSearcherListeners.size() > 0) {
- try {
- future = searcherExecutor.submit(
- new Callable() {
- public Object call() throws Exception {
- try {
- for (SolrEventListener listener : newSearcherListeners) {
- listener.newSearcher(newSearcher, currSearcher);
- }
- } catch (Throwable e) {
- SolrException.log(log, null, e);
- }
- return null;
- }
+ future = searcherExecutor.submit(
+ new Callable() {
+ public Object call() throws Exception {
+ try {
+ for (SolrEventListener listener : newSearcherListeners) {
+ listener.newSearcher(newSearcher, currSearcher);
}
- );
- } catch(Exception e) {
- // if submit fails, newSearchHolder does not get decref'd
- if (newSearchHolder != null) {
- newSearchHolder.decref();
- if (returnSearcher) {
- newSearchHolder.decref();
- }
- }
- throw e;
- }
+ } catch (Throwable e) {
+ SolrException.log(log,null,e);
+ }
+ return null;
+ }
+ }
+ );
}
// WARNING: this code assumes a single threaded executor (that all tasks
// queued will finish first).
final RefCounted<SolrIndexSearcher> currSearcherHolderF = currSearcherHolder;
if (!alreadyRegistered) {
- try {
- future = searcherExecutor.submit(
- new Callable() {
- public Object call() throws Exception {
- try {
- // signal that we no longer need to decrement
- // the count *before* registering the searcher since
- // registerSearcher will decrement even if it errors.
- decrementOnDeckCount[0]=false;
- registerSearcher(newSearchHolder);
- } catch (Throwable e) {
- SolrException.log(log, null, e);
- } finally {
- // we are all done with the old searcher we used
- // for warming...
- if (currSearcherHolderF!=null) currSearcherHolderF.decref();
- }
- return null;
- }
- }
- );
- } catch(Exception e) {
- // if submit fails, newSearchHolder does not get decref'd
- if (newSearchHolder != null) {
- newSearchHolder.decref();
- if (returnSearcher) {
- newSearchHolder.decref();
+ future = searcherExecutor.submit(
+ new Callable() {
+ public Object call() throws Exception {
+ try {
+ // registerSearcher will decrement onDeckSearchers and
+ // do a notify, even if it fails.
+ registerSearcher(newSearchHolder);
+ } catch (Throwable e) {
+ SolrException.log(log, e);
+ } finally {
+ // we are all done with the old searcher we used
+ // for warming...
+ if (currSearcherHolderF!=null) currSearcherHolderF.decref();
+ }
+ return null;
+ }
}
- }
- throw e;
- }
+ );
}
if (waitSearcher != null) {
waitSearcher[0] = future;
}
+ success = true;
+
// Return the searcher as the warming tasks run in parallel
// callers may wait on the waitSearcher future returned.
return returnSearcher ? newSearchHolder : null;
} catch (Exception e) {
- SolrException.log(log, null, e);
- if (currSearcherHolder != null) currSearcherHolder.decref();
+ if (e instanceof SolrException) throw (SolrException)e;
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ } finally {
- synchronized (searcherLock) {
- if (decrementOnDeckCount[0]) {
+ if (!success) {
+ synchronized (searcherLock) {
onDeckSearchers--;
+
+ if (onDeckSearchers < 0) {
+ // sanity check... should never happen
+ log.error(logid+"ERROR!!! onDeckSearchers after decrement=" + onDeckSearchers);
+ onDeckSearchers=0; // try and recover
+ }
+ // if we failed, we need to wake up at least one waiter to continue the process
+ searcherLock.notify();
}
- if (onDeckSearchers < 0) {
- // sanity check... should never happen
- log.error(logid+"ERROR!!! onDeckSearchers after decrement=" + onDeckSearchers);
- onDeckSearchers=0; // try and recover
+
+ if (currSearcherHolder != null) {
+ currSearcherHolder.decref();
+ }
+
+ if (searchHolder != null) {
+ searchHolder.decref(); // decrement 1 for _searcher (searchHolder will never become _searcher now)
+ if (returnSearcher) {
+ searchHolder.decref(); // decrement 1 because we won't be returning the searcher to the user
+ }
}
- // if we failed, we need to wake up at least one waiter to continue the process
- searcherLock.notify();
}
- // since the indexreader was already opened, assume we can continue on
- // even though we got an exception.
- return returnSearcher ? newSearchHolder : null;
+ // we want to do this after we decrement onDeckSearchers so another thread
+ // doesn't increment first and throw a false warning.
+ openSearcherLock.unlock();
+
}
}
- private RefCounted<SolrIndexSearcher> newHolder(SolrIndexSearcher newSearcher) {
+ private RefCounted<SolrIndexSearcher> newHolder(SolrIndexSearcher newSearcher, final List<RefCounted<SolrIndexSearcher>> searcherList) {
RefCounted<SolrIndexSearcher> holder = new RefCounted<SolrIndexSearcher>(newSearcher) {
@Override
public void close() {
@@ -1376,11 +1410,13 @@ public final class SolrCore implements S
// This relies on the RefCounted class allowing close() to be called every
// time the counter hits zero.
if (refcount.get() > 0) return;
- _searchers.remove(this);
+ searcherList.remove(this);
}
resource.close();
- } catch (IOException e) {
- log.error("Error closing searcher:" + SolrException.toStr(e));
+ } catch (Throwable e) {
+ // do not allow decref() operations to fail since they are typically called in finally blocks
+ // and throwing another exception would be very unexpected.
+ SolrException.log(log, "Error closing searcher:", e);
}
}
};
@@ -1388,6 +1424,9 @@ public final class SolrCore implements S
return holder;
}
+ public boolean isReloaded() {
+ return isReloaded;
+ }
// Take control of newSearcherHolder (which should have a reference count of at
// least 1 already. If the caller wishes to use the newSearcherHolder directly
@@ -1423,6 +1462,7 @@ public final class SolrCore implements S
log.info(logid+"Registered new searcher " + newSearcher);
} catch (Throwable e) {
+ // an exception in register() shouldn't be fatal.
log(e);
} finally {
// wake up anyone waiting for a searcher
@@ -1438,9 +1478,13 @@ public final class SolrCore implements S
public void closeSearcher() {
log.info(logid+"Closing main searcher on request.");
synchronized (searcherLock) {
+ if (realtimeSearcher != null) {
+ realtimeSearcher.decref();
+ realtimeSearcher = null;
+ }
if (_searcher != null) {
_searcher.decref(); // dec refcount for this._searcher
- _searcher=null; // isClosed() does check this
+ _searcher = null; // isClosed() does check this
infoRegistry.remove("currentSearcher");
}
}
@@ -1470,7 +1514,7 @@ public final class SolrCore implements S
handler.handleRequest(req,rsp);
setResponseHeaderValues(handler,req,rsp);
-
+
if (log.isInfoEnabled()) {
StringBuilder sb = new StringBuilder(logid);
for (int i=0; i<toLog.size(); i++) {
@@ -1525,7 +1569,7 @@ public final class SolrCore implements S
final public static void log(Throwable e) {
- SolrException.log(log, null, e);
+ SolrException.log(log,null,e);
}
@@ -1579,8 +1623,10 @@ public final class SolrCore implements S
}
log.info("created "+info.name+": " + info.className);
} catch (Exception ex) {
- throw new SolrException
+ SolrException e = new SolrException
(SolrException.ErrorCode.SERVER_ERROR, "QueryResponseWriter init failure", ex);
+ SolrException.log(log,null,e);
+ throw e;
}
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/BinaryUpdateRequestHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/BinaryUpdateRequestHandler.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/BinaryUpdateRequestHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/BinaryUpdateRequestHandler.java Wed Jan 25 19:49:26 2012
@@ -140,6 +140,7 @@ public class BinaryUpdateRequestHandler
}
}
}
+
@Override
public String getDescription() {
return "Add/Update multiple documents with javabin format";
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/JsonLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/JsonLoader.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/JsonLoader.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/JsonLoader.java Wed Jan 25 19:49:26 2012
@@ -163,10 +163,10 @@ class JsonLoader extends ContentStreamLo
String key = parser.getString();
if( parser.wasKey() ) {
if( "id".equals( key ) ) {
- cmd.id = parser.getString();
+ cmd.setId(parser.getString());
}
else if( "query".equals(key) ) {
- cmd.query = parser.getString();
+ cmd.setQuery(parser.getString());
}
else if( "commitWithin".equals(key) ) {
cmd.commitWithin = Integer.parseInt(parser.getString());
@@ -181,7 +181,7 @@ class JsonLoader extends ContentStreamLo
}
}
else if( ev == JSONParser.OBJECT_END ) {
- if( cmd.id == null && cmd.query == null ) {
+ if( cmd.getId() == null && cmd.getQuery() == null ) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Missing id or query for delete ["+parser.getPosition()+"]" );
}
return cmd;
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java Wed Jan 25 19:49:26 2012
@@ -43,6 +43,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.IndexReader;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
@@ -81,6 +82,7 @@ import org.slf4j.LoggerFactory;
* @since solr 1.4
*/
public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAware {
+
private static final Logger LOG = LoggerFactory.getLogger(ReplicationHandler.class.getName());
SolrCore core;
@@ -128,6 +130,8 @@ public class ReplicationHandler extends
// It gives the current 'replicateable' index version
if (command.equals(CMD_INDEX_VERSION)) {
IndexCommit commitPoint = indexCommitPoint; // make a copy so it won't change
+
+ //System.out.println("The latest index gen is:" + commitPoint.getGeneration() + " " + core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName());
if (commitPoint != null && replicationEnabled.get()) {
//
// There is a race condition here. The commit point may be changed / deleted by the time
@@ -162,7 +166,7 @@ public class ReplicationHandler extends
new Thread() {
@Override
public void run() {
- doFetch(paramsCopy);
+ doFetch(paramsCopy, false);
}
}.start();
rsp.add(STATUS, OK_STATUS);
@@ -270,10 +274,10 @@ public class ReplicationHandler extends
private volatile SnapPuller tempSnapPuller;
- void doFetch(SolrParams solrParams) {
+ public boolean doFetch(SolrParams solrParams, boolean force) {
String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL);
if (!snapPullLock.tryLock())
- return;
+ return false;
try {
tempSnapPuller = snapPuller;
if (masterUrl != null) {
@@ -281,13 +285,14 @@ public class ReplicationHandler extends
nl.remove(SnapPuller.POLL_INTERVAL);
tempSnapPuller = new SnapPuller(nl, this, core);
}
- tempSnapPuller.fetchLatestIndex(core);
+ return tempSnapPuller.fetchLatestIndex(core, force);
} catch (Exception e) {
- LOG.error("SnapPull failed ", e);
+ SolrException.log(LOG, "SnapPull failed ", e);
} finally {
tempSnapPuller = snapPuller;
snapPullLock.unlock();
}
+ return false;
}
boolean isReplicating() {
@@ -334,6 +339,8 @@ public class ReplicationHandler extends
}
long version = Long.parseLong(v);
IndexCommit commit = core.getDeletionPolicy().getCommitPoint(version);
+
+ //System.out.println("ask for files for gen:" + commit.getGeneration() + core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName());
if (commit == null) {
rsp.add("status", "invalid indexversion");
return;
@@ -757,12 +764,12 @@ public class ReplicationHandler extends
}
- void refreshCommitpoint() {
- IndexCommit commitPoint = core.getDeletionPolicy().getLatestCommit();
- if(replicateOnCommit || (replicateOnOptimize && commitPoint.getSegmentCount() == 1)) {
- indexCommitPoint = commitPoint;
- }
- }
+// void refreshCommitpoint() {
+// IndexCommit commitPoint = core.getDeletionPolicy().getLatestCommit();
+// if(replicateOnCommit || (replicateOnOptimize && commitPoint.getSegmentCount() == 1)) {
+// indexCommitPoint = commitPoint;
+// }
+// }
@SuppressWarnings("unchecked")
public void inform(SolrCore core) {
@@ -777,6 +784,12 @@ public class ReplicationHandler extends
}
NamedList master = (NamedList) initArgs.get("master");
boolean enableMaster = isEnabled( master );
+
+ if (!enableSlave && !enableMaster) {
+ enableMaster = true;
+ master = new NamedList<Object>();
+ }
+
if (enableMaster) {
includeConfFiles = (String) master.get(CONF_FILES);
if (includeConfFiles != null && includeConfFiles.trim().length() > 0) {
@@ -796,6 +809,10 @@ public class ReplicationHandler extends
replicateOnCommit = replicateAfter.contains("commit");
replicateOnOptimize = !replicateOnCommit && replicateAfter.contains("optimize");
+ if (!replicateOnCommit && ! replicateOnOptimize) {
+ replicateOnCommit = true;
+ }
+
// if we only want to replicate on optimize, we need the deletion policy to
// save the last optimized commit point.
if (replicateOnOptimize) {
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java Wed Jan 25 19:49:26 2012
@@ -33,6 +33,7 @@ import static org.apache.solr.handler.Re
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -158,7 +159,7 @@ public class SnapPuller {
}
try {
executorStartTime = System.currentTimeMillis();
- replicationHandler.doFetch(null);
+ replicationHandler.doFetch(null, false);
} catch (Exception e) {
LOG.error("Exception in fetching index", e);
}
@@ -243,7 +244,8 @@ public class SnapPuller {
@SuppressWarnings("unchecked")
boolean successfulInstall = false;
- boolean fetchLatestIndex(SolrCore core) throws IOException {
+ boolean fetchLatestIndex(SolrCore core, boolean force) throws IOException, InterruptedException {
+ successfulInstall = false;
replicationStartTime = System.currentTimeMillis();
try {
//get the current 'replicateable' index version in the master
@@ -256,23 +258,41 @@ public class SnapPuller {
}
long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
long latestGeneration = (Long) response.get(GENERATION);
- if (latestVersion == 0L) {
- //there is nothing to be replicated
- return false;
- }
+
IndexCommit commit;
RefCounted<SolrIndexSearcher> searcherRefCounted = null;
try {
searcherRefCounted = core.getNewestSearcher(false);
+ if (searcherRefCounted == null) {
+ SolrException.log(LOG, "No open searcher found - fetch aborted");
+ return false;
+ }
commit = searcherRefCounted.get().getIndexReader().getIndexCommit();
} finally {
if (searcherRefCounted != null)
searcherRefCounted.decref();
}
+
+ if (latestVersion == 0L) {
+ if (force && commit.getVersion() != 0) {
+ // since we won't get the files for an empty index,
+ // we just clear ours and commit
+ core.getUpdateHandler().getSolrCoreState().getIndexWriter(core).deleteAll();
+ SolrQueryRequest req = new LocalSolrQueryRequest(core,
+ new ModifiableSolrParams());
+ core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
+ }
+
+ //there is nothing to be replicated
+ successfulInstall = true;
+ return true;
+ }
+
if (commit.getVersion() == latestVersion && commit.getGeneration() == latestGeneration) {
- //master and slave are alsready in sync just return
+ //master and slave are already in sync just return
LOG.info("Slave in sync with master.");
- return false;
+ successfulInstall = true;
+ return true;
}
LOG.info("Master's version: " + latestVersion + ", generation: " + latestGeneration);
LOG.info("Slave's version: " + commit.getVersion() + ", generation: " + commit.getGeneration());
@@ -289,7 +309,7 @@ public class SnapPuller {
filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
// if the generateion of master is older than that of the slave , it means they are not compatible to be copied
// then a new index direcory to be created and all the files need to be copied
- boolean isFullCopyNeeded = commit.getGeneration() >= latestGeneration;
+ boolean isFullCopyNeeded = commit.getVersion() >= latestVersion || force;
File tmpIndexDir = createTempindexDir(core);
if (isIndexStale())
isFullCopyNeeded = true;
@@ -331,15 +351,17 @@ public class SnapPuller {
return successfulInstall;
} catch (ReplicationHandlerException e) {
LOG.error("User aborted Replication");
+ return false;
} catch (SolrException e) {
throw e;
+ } catch (InterruptedException e) {
+ throw new InterruptedException("Index fetch interrupted");
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Index fetch failed : ", e);
} finally {
if (deleteTmpIdxDir) delTree(tmpIndexDir);
else delTree(indexDir);
}
- return successfulInstall;
} finally {
if (!successfulInstall) {
logReplicationTimeAndConfFiles(null, successfulInstall);
@@ -476,9 +498,9 @@ public class SnapPuller {
// reboot the writer on the new index and get a new searcher
solrCore.getUpdateHandler().newIndexWriter();
- solrCore.getSearcher(true, false, null);
-
- replicationHandler.refreshCommitpoint();
+ // update our commit point to the right dir
+ solrCore.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
+
} finally {
req.close();
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/XMLLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/XMLLoader.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/XMLLoader.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/XMLLoader.java Wed Jan 25 19:49:26 2012
@@ -211,14 +211,24 @@ class XMLLoader extends ContentStreamLoa
"unexpected XML tag /delete/" + mode);
}
text.setLength(0);
+
+ if ("id".equals(mode)) {
+ for (int i = 0; i < parser.getAttributeCount(); i++) {
+ String attrName = parser.getAttributeLocalName(i);
+ String attrVal = parser.getAttributeValue(i);
+ if (XmlUpdateRequestHandler.VERSION.equals(attrName)) {
+ deleteCmd.setVersion(Long.parseLong(attrVal));
+ }
+ }
+ }
break;
case XMLStreamConstants.END_ELEMENT:
String currTag = parser.getLocalName();
if ("id".equals(currTag)) {
- deleteCmd.id = text.toString();
+ deleteCmd.setId(text.toString());
} else if ("query".equals(currTag)) {
- deleteCmd.query = text.toString();
+ deleteCmd.setQuery(text.toString());
} else if ("delete".equals(currTag)) {
return;
} else {
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/XmlUpdateRequestHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/XmlUpdateRequestHandler.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/XmlUpdateRequestHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/XmlUpdateRequestHandler.java Wed Jan 25 19:49:26 2012
@@ -44,9 +44,12 @@ public class XmlUpdateRequestHandler ext
public static final String OVERWRITE = "overwrite";
+ public static final String VERSION = "version";
+
// NOTE: This constant is for use with the <add> XML tag, not the HTTP param with same name
public static final String COMMIT_WITHIN = "commitWithin";
+
XMLInputFactory inputFactory;
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Wed Jan 25 19:49:26 2012
@@ -17,39 +17,56 @@
package org.apache.solr.handler.admin;
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
import org.apache.commons.io.FileUtils;
import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOUtils;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
-import org.apache.solr.core.*;
+import org.apache.solr.core.CloseHook;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.RequestHandlerBase;
-import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.search.SolrIndexSearcher;
-import org.apache.solr.util.NumberUtils;
-import org.apache.solr.util.RefCounted;
-import org.apache.solr.util.SolrPluginUtils;
+import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.MergeIndexesCommand;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
+import org.apache.solr.util.NumberUtils;
+import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.Properties;
-
/**
*
* @since solr 1.3
@@ -161,6 +178,21 @@ public class CoreAdminHandler extends Re
break;
}
+ case PREPRECOVERY: {
+ this.handlePrepRecoveryAction(req, rsp);
+ break;
+ }
+
+ case REQUESTRECOVERY: {
+ this.handleRequestRecoveryAction(req, rsp);
+ break;
+ }
+
+ case DISTRIBURL: {
+ this.handleDistribUrlAction(req, rsp);
+ break;
+ }
+
default: {
doPersist = this.handleCustomAction(req, rsp);
break;
@@ -554,6 +586,142 @@ public class CoreAdminHandler extends Re
return doPersist;
}
+
+ protected void handleRequestRecoveryAction(SolrQueryRequest req,
+ SolrQueryResponse rsp) throws IOException {
+ final SolrParams params = req.getParams();
+
+ String cname = params.get(CoreAdminParams.CORE);
+ if (cname == null) {
+ cname = "";
+ }
+ SolrCore core = null;
+ try {
+ core = coreContainer.getCore(cname);
+ core.getUpdateHandler().getSolrCoreState().doRecovery(core);
+ } finally {
+ // no recoveryStrat close for now
+ if (core != null) {
+ core.close();
+ }
+ }
+ }
+
+ protected void handlePrepRecoveryAction(SolrQueryRequest req,
+ SolrQueryResponse rsp) throws IOException, InterruptedException {
+ final SolrParams params = req.getParams();
+
+ String cname = params.get(CoreAdminParams.CORE);
+ if (cname == null) {
+ cname = "";
+ }
+
+ String nodeName = params.get("nodeName");
+ String coreNodeName = params.get("coreNodeName");
+
+
+ SolrCore core = null;
+
+ try {
+ core = coreContainer.getCore(cname);
+ if (core == null) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "core not found:" + cname);
+ }
+ String state;
+ int retry = 0;
+ while (true) {
+ // wait until we are sure the recovering node is ready
+ // to accept updates
+ CloudDescriptor cloudDescriptor = core.getCoreDescriptor()
+ .getCloudDescriptor();
+ CloudState cloudState = coreContainer
+ .getZkController()
+ .getCloudState();
+ ZkNodeProps nodeProps =
+ cloudState.getSlice(cloudDescriptor.getCollectionName(),
+ cloudDescriptor.getShardId()).getShards().get(coreNodeName);
+ state = nodeProps.get(ZkStateReader.STATE_PROP);
+ boolean live = cloudState.liveNodesContain(nodeName);
+ if (nodeProps != null && state.equals(ZkStateReader.RECOVERING)
+ && live) {
+ break;
+ }
+
+ if (retry++ == 30) {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ "I was asked to prep for recovery for " + nodeName
+ + " but she is not live or not in a recovery state - state: " + state + " live:" + live);
+ }
+
+ Thread.sleep(1000);
+ }
+
+ // small safety net for any updates that started with state that
+ // kept it from sending the update to be buffered -
+ // pause for a while to let any outstanding updates finish
+
+ Thread.sleep(4000);
+
+ UpdateRequestProcessorChain processorChain = core
+ .getUpdateProcessingChain(params.get(UpdateParams.UPDATE_CHAIN));
+
+ ModifiableSolrParams reqParams = new ModifiableSolrParams(req.getParams());
+ reqParams.set(DistributedUpdateProcessor.COMMIT_END_POINT, "true");
+
+ SolrQueryRequest sqr = new LocalSolrQueryRequest(core, reqParams);
+ UpdateRequestProcessor processor = processorChain.createProcessor(sqr,
+ new SolrQueryResponse());
+ CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
+
+ processor.processCommit(cuc);
+ processor.finish();
+
+ // solrcloud_debug
+// try {
+// RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
+// SolrIndexSearcher searcher = searchHolder.get();
+// try {
+// System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName() + " to replicate "
+// + searcher.search(new MatchAllDocsQuery(), 1).totalHits + " gen:" + core.getDeletionPolicy().getLatestCommit().getGeneration() + " data:" + core.getDataDir());
+// } finally {
+// searchHolder.decref();
+// }
+// } catch (Exception e) {
+//
+// }
+
+ } finally {
+ if (core != null) {
+ core.close();
+ }
+ }
+ }
+
+ protected void handleDistribUrlAction(SolrQueryRequest req,
+ SolrQueryResponse rsp) throws IOException, InterruptedException, SolrServerException {
+ // TODO: finish this and tests
+ SolrParams params = req.getParams();
+
+ SolrParams required = params.required();
+ String path = required.get("path");
+ String shard = params.get("shard");
+ String collection = required.get("collection");
+
+ SolrCore core = req.getCore();
+ ZkController zkController = core.getCoreDescriptor().getCoreContainer()
+ .getZkController();
+ if (shard != null) {
+ List<ZkCoreNodeProps> replicas = zkController.getZkStateReader().getReplicaProps(
+ collection, shard, zkController.getNodeName(), core.getName());
+
+ for (ZkCoreNodeProps node : replicas) {
+ CommonsHttpSolrServer server = new CommonsHttpSolrServer(node.getCoreUrl() + path);
+ QueryRequest qr = new QueryRequest();
+ server.request(qr);
+ }
+
+ }
+ }
protected NamedList<Object> getCoreStatus(CoreContainer cores, String cname) throws IOException {
NamedList<Object> info = new SimpleOrderedMap<Object>();
@@ -594,6 +762,13 @@ public class CoreAdminHandler extends Re
return path;
}
+ public static ModifiableSolrParams params(String... params) {
+ ModifiableSolrParams msp = new ModifiableSolrParams();
+ for (int i=0; i<params.length; i+=2) {
+ msp.add(params[i], params[i+1]);
+ }
+ return msp;
+ }
//////////////////////// SolrInfoMBeans methods //////////////////////
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/ShowFileRequestHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/ShowFileRequestHandler.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/ShowFileRequestHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/ShowFileRequestHandler.java Wed Jan 25 19:49:26 2012
@@ -18,26 +18,32 @@
package org.apache.solr.handler.admin;
import org.apache.commons.io.IOUtils;
+import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ContentStreamBase;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.RawResponseWriter;
import org.apache.solr.response.SolrQueryResponse;
+import org.apache.zookeeper.KeeperException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
import java.util.Date;
import java.util.HashSet;
+import java.util.List;
import java.util.Locale;
import java.util.Set;
@@ -111,8 +117,90 @@ public class ShowFileRequestHandler exte
}
@Override
- public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException
+ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException, KeeperException, InterruptedException
{
+ CoreContainer coreContainer = req.getCore().getCoreDescriptor().getCoreContainer();
+ if (coreContainer.isZooKeeperAware()) {
+ showFromZooKeeper(req, rsp, coreContainer);
+ } else {
+ showFromFileSystem(req, rsp);
+ }
+ }
+
+ private void showFromZooKeeper(SolrQueryRequest req, SolrQueryResponse rsp,
+ CoreContainer coreContainer) throws KeeperException,
+ InterruptedException, UnsupportedEncodingException {
+ String adminFile = null;
+ SolrCore core = req.getCore();
+ SolrZkClient zkClient = coreContainer.getZkController().getZkClient();
+ final ZkSolrResourceLoader loader = (ZkSolrResourceLoader) core
+ .getResourceLoader();
+ String confPath = loader.getCollectionZkPath();
+
+ String fname = req.getParams().get("file", null);
+ if (fname == null) {
+ adminFile = confPath;
+ } else {
+ fname = fname.replace('\\', '/'); // normalize slashes
+ if (hiddenFiles.contains(fname.toUpperCase(Locale.ENGLISH))) {
+ throw new SolrException(ErrorCode.FORBIDDEN, "Can not access: " + fname);
+ }
+ if (fname.indexOf("..") >= 0) {
+ throw new SolrException(ErrorCode.FORBIDDEN, "Invalid path: " + fname);
+ }
+ adminFile = confPath + "/" + fname;
+ }
+
+ // Make sure the file exists, is readable and is not a hidden file
+ if (!zkClient.exists(adminFile, true)) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Can not find: "
+ + adminFile);
+ }
+
+ // Show a directory listing
+ List<String> children = zkClient.getChildren(adminFile, null, true);
+ if (children.size() > 0) {
+
+ NamedList<SimpleOrderedMap<Object>> files = new SimpleOrderedMap<SimpleOrderedMap<Object>>();
+ for (String f : children) {
+ if (hiddenFiles.contains(f.toUpperCase(Locale.ENGLISH))) {
+ continue; // don't show 'hidden' files
+ }
+ if (f.startsWith(".")) {
+ continue; // skip hidden system files...
+ }
+
+ SimpleOrderedMap<Object> fileInfo = new SimpleOrderedMap<Object>();
+ files.add(f, fileInfo);
+ List<String> fchildren = zkClient.getChildren(adminFile, null, true);
+ if (fchildren.size() > 0) {
+ fileInfo.add("directory", true);
+ } else {
+ // TODO? content type
+ fileInfo.add("size", f.length());
+ }
+ // TODO: ?
+ // fileInfo.add( "modified", new Date( f.lastModified() ) );
+ }
+ rsp.add("files", files);
+ } else {
+ // Include the file contents
+ // The file logic depends on RawResponseWriter, so force its use.
+ ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
+ params.set(CommonParams.WT, "raw");
+ req.setParams(params);
+
+ ContentStreamBase content = new ContentStreamBase.StringStream(
+ new String(zkClient.getData(adminFile, null, null, true), "UTF-8"));
+ content.setContentType(req.getParams().get(USE_CONTENT_TYPE));
+
+ rsp.add(RawResponseWriter.CONTENT, content);
+ }
+ rsp.setHttpCaching(false);
+ }
+
+ private void showFromFileSystem(SolrQueryRequest req, SolrQueryResponse rsp)
+ throws IOException {
File adminFile = null;
final SolrResourceLoader loader = req.getCore().getResourceLoader();
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java Wed Jan 25 19:49:26 2012
@@ -22,11 +22,13 @@ import org.apache.solr.client.solrj.Solr
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.CloudState;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
@@ -234,7 +236,8 @@ public class HttpShardHandler extends Sh
SolrQueryRequest req = rb.req;
SolrParams params = req.getParams();
- rb.isDistrib = params.getBool("distrib",false);
+ rb.isDistrib = params.getBool("distrib", req.getCore().getCoreDescriptor()
+ .getCoreContainer().isZooKeeperAware());
String shards = params.get(ShardParams.SHARDS);
// for back compat, a shards param with URLs like localhost:8983/solr will mean that this
@@ -272,11 +275,36 @@ public class HttpShardHandler extends Sh
cloudState = zkController.getCloudState();
- // TODO: check "collection" for which collection(s) to search.. but for now, just default
- // to the collection for this core.
- // This can be more efficient... we only record the name, even though we have the
- // shard info we need in the next step of mapping slice->shards
- slices = cloudState.getSlices(cloudDescriptor.getCollectionName());
+ // This can be more efficient... we only record the name, even though we
+ // have the shard info we need in the next step of mapping slice->shards
+
+ // Stores the comma-separated list of specified collections.
+ // Eg: "collection1,collection2,collection3"
+ String collections = params.get("collection");
+ if (collections != null) {
+ // If there were one or more collections specified in the query, split
+ // each parameter and store as a seperate member of a List.
+ List<String> collectionList = StrUtils.splitSmart(collections, ",",
+ true);
+
+ // First create an empty HashMap to add the slice info to.
+ slices = new HashMap<String,Slice>();
+
+ // In turn, retrieve the slices that cover each collection from the
+ // cloud state and add them to the Map 'slices'.
+ for (int i = 0; i < collectionList.size(); i++) {
+ String collection = collectionList.get(i);
+ ClientUtils.appendMap(collection, slices, cloudState.getSlices(collection));
+ }
+ } else {
+ // If no collections were specified, default to the collection for
+ // this core.
+ slices = cloudState.getSlices(cloudDescriptor.getCollectionName());
+ }
+
+ // Store the logical slices in the ResponseBuilder and create a new
+ // String array to hold the physical shards (which will be mapped
+ // later).
rb.slices = slices.keySet().toArray(new String[slices.size()]);
rb.shards = new String[rb.slices.length];
@@ -317,14 +345,16 @@ public class HttpShardHandler extends Sh
StringBuilder sliceShardsStr = new StringBuilder();
boolean first = true;
for (ZkNodeProps nodeProps : sliceShards.values()) {
- if (!liveNodes.contains(nodeProps.get(ZkStateReader.NODE_NAME)))
- continue;
+ ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
+ if (!liveNodes.contains(coreNodeProps.getNodeName())
+ || !coreNodeProps.getState().equals(
+ ZkStateReader.ACTIVE)) continue;
if (first) {
first = false;
} else {
sliceShardsStr.append('|');
}
- String url = nodeProps.get("url");
+ String url = coreNodeProps.getCoreUrl();
if (url.startsWith("http://"))
url = url.substring(7);
sliceShardsStr.append(url);
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java Wed Jan 25 19:49:26 2012
@@ -16,6 +16,13 @@ package org.apache.solr.handler.componen
* limitations under the License.
*/
+import java.net.MalformedURLException;
+import java.util.Random;
+import java.util.concurrent.Executor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
@@ -23,17 +30,12 @@ import org.apache.commons.httpclient.par
import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
import org.apache.solr.common.SolrException;
import org.apache.solr.core.PluginInfo;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.plugin.PluginInfoInitialized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.MalformedURLException;
-import java.util.Random;
-import java.util.concurrent.Executor;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
public class HttpShardHandlerFactory extends ShardHandlerFactory implements PluginInfoInitialized{
protected static Logger log = LoggerFactory.getLogger(HttpShardHandlerFactory.class);
@@ -44,12 +46,13 @@ public class HttpShardHandlerFactory ext
//
// Consider CallerRuns policy and a lower max threads to throttle
// requests at some point (or should we simply return failure?)
- Executor commExecutor = new ThreadPoolExecutor(
+ ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
5, TimeUnit.SECONDS, // terminate idle threads after 5 sec
- new SynchronousQueue<Runnable>() // directly hand off tasks
- );
+ new SynchronousQueue<Runnable>(), // directly hand off tasks
+ new DefaultSolrThreadFactory("httpShardExecutor")
+ );
HttpClient client;
@@ -58,6 +61,8 @@ public class HttpShardHandlerFactory ext
int soTimeout = 0; //current default values
int connectionTimeout = 0; //current default values
public String scheme = "http://"; //current default values
+
+ private MultiThreadedHttpConnectionManager mgr;
// socket timeout measured in ms, closes a socket if read
// takes longer than x ms to complete. throws
// java.net.SocketTimeoutException: Read timed out exception
@@ -97,7 +102,7 @@ public class HttpShardHandlerFactory ext
log.info("Setting shard-connection-timeout to: " + connectionTimeout);
}
}
- MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
+ mgr = new MultiThreadedHttpConnectionManager();
mgr.getParams().setDefaultMaxConnectionsPerHost(20);
mgr.getParams().setMaxTotalConnections(10000);
mgr.getParams().setConnectionTimeout(connectionTimeout);
@@ -118,4 +123,23 @@ public class HttpShardHandlerFactory ext
}
}
+
+ @Override
+ public void close() {
+ try {
+ mgr.shutdown();
+ } catch (Throwable e) {
+ SolrException.log(log, e);
+ }
+ try {
+ loadbalancer.shutdown();
+ } catch (Throwable e) {
+ SolrException.log(log, e);
+ }
+ try {
+ commExecutor.shutdownNow();
+ } catch (Throwable e) {
+ SolrException.log(log, e);
+ }
+ }
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java Wed Jan 25 19:49:26 2012
@@ -31,6 +31,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRef;
import org.apache.lucene.util.ReaderUtil;
import org.apache.lucene.util.UnicodeUtil;
+import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrDocument;
@@ -38,6 +39,7 @@ import org.apache.solr.common.SolrDocume
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.CloudState;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.*;
@@ -206,7 +208,8 @@ public class QueryComponent extends Sear
SolrQueryRequest req = rb.req;
SolrParams params = req.getParams();
- rb.isDistrib = params.getBool("distrib",false);
+ rb.isDistrib = params.getBool("distrib", req.getCore().getCoreDescriptor()
+ .getCoreContainer().isZooKeeperAware());
String shards = params.get(ShardParams.SHARDS);
// for back compat, a shards param with URLs like localhost:8983/solr will mean that this
@@ -244,11 +247,36 @@ public class QueryComponent extends Sear
cloudState = zkController.getCloudState();
- // TODO: check "collection" for which collection(s) to search.. but for now, just default
- // to the collection for this core.
// This can be more efficient... we only record the name, even though we have the
// shard info we need in the next step of mapping slice->shards
- slices = cloudState.getSlices(cloudDescriptor.getCollectionName());
+
+ // Stores the comma-separated list of specified collections.
+ // Eg: "collection1,collection2,collection3"
+ String collections = params.get("collection");
+ if (collections != null) {
+ // If there were one or more collections specified in the query, split
+ // each parameter and store as a seperate member of a List.
+ List<String> collectionList = StrUtils.splitSmart(collections, ",",
+ true);
+
+ // First create an empty HashMap to add the slice info to.
+ slices = new HashMap<String,Slice>();
+
+ // In turn, retrieve the slices that cover each collection from the
+ // cloud state and add them to the Map 'slices'.
+ for (int i = 0; i < collectionList.size(); i++) {
+ String collection = collectionList.get(i);
+ ClientUtils.appendMap(collection, slices, cloudState.getSlices(collection));
+ }
+ } else {
+ // If no collections were specified, default to the collection for
+ // this core.
+ slices = cloudState.getSlices(cloudDescriptor.getCollectionName());
+ }
+
+ // Store the logical slices in the ResponseBuilder and create a new
+ // String array to hold the physical shards (which will be mapped
+ // later).
rb.slices = slices.keySet().toArray(new String[slices.size()]);
rb.shards = new String[rb.slices.length];
@@ -289,14 +317,16 @@ public class QueryComponent extends Sear
StringBuilder sliceShardsStr = new StringBuilder();
boolean first = true;
for (ZkNodeProps nodeProps : sliceShards.values()) {
- if (!liveNodes.contains(nodeProps.get(ZkStateReader.NODE_NAME)))
- continue;
+ ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
+ if (!liveNodes.contains(coreNodeProps.getNodeName())
+ || !coreNodeProps.getState().equals(
+ ZkStateReader.ACTIVE)) continue;
if (first) {
first = false;
} else {
sliceShardsStr.append('|');
}
- String url = nodeProps.get("url");
+ String url = coreNodeProps.getCoreUrl();
if (url.startsWith("http://"))
url = url.substring(7);
sliceShardsStr.append(url);