You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/01/18 23:56:04 UTC
svn commit: r1233104 -
/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java
Author: markrmiller
Date: Wed Jan 18 22:56:04 2012
New Revision: 1233104
URL: http://svn.apache.org/viewvc?rev=1233104&view=rev
Log:
fix mismerge
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java?rev=1233104&r1=1233103&r2=1233104&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java Wed Jan 18 22:56:04 2012
@@ -60,6 +60,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
import java.lang.reflect.Constructor;
+import java.util.concurrent.locks.ReentrantLock;
/**
@@ -475,20 +476,6 @@ public final class SolrCore implements S
}
/**
- *
- * @param dataDir
- * @param schema
- * @throws SAXException
- * @throws IOException
- * @throws ParserConfigurationException
- *
- * @since solr 1.0
- */
- public SolrCore(String dataDir, IndexSchema schema) throws ParserConfigurationException, IOException, SAXException {
- this(null, dataDir, new SolrConfig(), schema, null);
- }
-
- /**
* Creates a new core and register it in the list of cores.
* If a core with the same name already exists, it will be stopped and replaced by this one.
*
@@ -961,15 +948,21 @@ public final class SolrCore implements S
// This reference is protected by searcherLock.
private RefCounted<SolrIndexSearcher> _searcher;
- // All of the open searchers. Don't access this directly.
+ // All of the normal open searchers. Don't access this directly.
// protected by synchronizing on searcherLock.
private final LinkedList<RefCounted<SolrIndexSearcher>> _searchers = new LinkedList<RefCounted<SolrIndexSearcher>>();
+ private final LinkedList<RefCounted<SolrIndexSearcher>> _realtimeSearchers = new LinkedList<RefCounted<SolrIndexSearcher>>();
final ExecutorService searcherExecutor = Executors.newSingleThreadExecutor();
private int onDeckSearchers; // number of searchers preparing
+ // Lock ordering: one can acquire the openSearcherLock and then the searcherLock, but not vice-versa.
private Object searcherLock = new Object(); // the sync object for the searcher
+ private ReentrantLock openSearcherLock = new ReentrantLock(true); // used to serialize opens/reopens for absolute ordering
private final int maxWarmingSearchers; // max number of on-deck searchers allowed
+ private RefCounted<SolrIndexSearcher> realtimeSearcher;
+
+
/**
* Return a registered {@link RefCounted}<{@link SolrIndexSearcher}> with
* the reference count incremented. It <b>must</b> be decremented when no longer needed.
@@ -988,29 +981,162 @@ public final class SolrCore implements S
}
/**
- * Return the newest {@link RefCounted}<{@link SolrIndexSearcher}> with
+ * Return the newest normal {@link RefCounted}<{@link SolrIndexSearcher}> with
* the reference count incremented. It <b>must</b> be decremented when no longer needed.
* If no searcher is currently open, then if openNew==true a new searcher will be opened,
* or null is returned if openNew==false.
*/
public RefCounted<SolrIndexSearcher> getNewestSearcher(boolean openNew) {
synchronized (searcherLock) {
- if (_searchers.isEmpty()) {
- if (!openNew) return null;
- // Not currently implemented since simply calling getSearcher during inform()
- // can result in a deadlock. Right now, solr always opens a searcher first
- // before calling inform() anyway, so this should never happen.
- throw new UnsupportedOperationException();
- }
- RefCounted<SolrIndexSearcher> newest = _searchers.getLast();
- newest.incref();
- return newest;
+ if (!_searchers.isEmpty()) {
+ RefCounted<SolrIndexSearcher> newest = _searchers.getLast();
+ newest.incref();
+ return newest;
+ }
}
+
+ return openNew ? getRealtimeSearcher() : null;
}
+
+ /** Gets the latest real-time searcher w/o forcing open a new searcher if one already exists.
+ * The reference count will be incremented.
+ */
+ public RefCounted<SolrIndexSearcher> getRealtimeSearcher() {
+ synchronized (searcherLock) {
+ if (realtimeSearcher != null) {
+ realtimeSearcher.incref();
+ return realtimeSearcher;
+ }
+ }
+
+ // use the searcher lock to prevent multiple people from trying to open at once
+ openSearcherLock.lock();
+ try {
+
+ // try again
+ synchronized (searcherLock) {
+ if (realtimeSearcher != null) {
+ realtimeSearcher.incref();
+ return realtimeSearcher;
+ }
+ }
+
+ // force a new searcher open
+ return openNewSearcher(true, true);
+ } finally {
+ openSearcherLock.unlock();
+ }
+ }
+
+
public RefCounted<SolrIndexSearcher> getSearcher(boolean forceNew, boolean returnSearcher, final Future[] waitSearcher) throws IOException {
return getSearcher(forceNew, returnSearcher, waitSearcher, false);
}
+
+
+ /** Opens a new searcher and returns a RefCounted<SolrIndexSearcher> with it's reference incremented.
+ *
+ * "realtime" means that we need to open quickly for a realtime view of the index, hence don't do any
+ * autowarming and add to the _realtimeSearchers queue rather than the _searchers queue (so it won't
+ * be used for autowarming by a future normal searcher). A "realtime" searcher will currently never
+ * become "registered" (since it currently lacks caching).
+ *
+ * realtimeSearcher is updated to the latest opened searcher, regardless of the value of "realtime".
+ *
+ * This method aquires openSearcherLock - do not call with searckLock held!
+ */
+ public RefCounted<SolrIndexSearcher> openNewSearcher(boolean updateHandlerReopens, boolean realtime) {
+ SolrIndexSearcher tmp;
+ RefCounted<SolrIndexSearcher> newestSearcher = null;
+ boolean nrt = solrConfig.reopenReaders && updateHandlerReopens;
+
+ openSearcherLock.lock();
+ try {
+ String newIndexDir = null;
+ File indexDirFile = null;
+ File newIndexDirFile = null;
+
+ // if it's not a normal near-realtime update, check that paths haven't changed.
+ if (!nrt) {
+ indexDirFile = new File(getIndexDir()).getCanonicalFile();
+ newIndexDir = getNewIndexDir();
+ newIndexDirFile = new File(newIndexDir).getCanonicalFile();
+ }
+
+ synchronized (searcherLock) {
+ newestSearcher = realtimeSearcher;
+ if (newestSearcher != null) {
+ newestSearcher.incref(); // the matching decref is in the finally block
+ }
+ }
+
+ if (newestSearcher != null && solrConfig.reopenReaders
+ && (nrt || indexDirFile.equals(newIndexDirFile))) {
+
+ IndexReader newReader;
+ IndexReader currentReader = newestSearcher.get().getIndexReader();
+
+ if (updateHandlerReopens) {
+ // SolrCore.verbose("start reopen from",previousSearcher,"writer=",writer);
+ IndexWriter writer = getUpdateHandler().getSolrCoreState().getIndexWriter(this);
+ newReader = IndexReader.openIfChanged(currentReader, writer, true);
+
+ } else {
+ // verbose("start reopen without writer, reader=", currentReader);
+ newReader = IndexReader.openIfChanged(currentReader);
+ // verbose("reopen result", newReader);
+ }
+
+ if (newReader == null) {
+ // if this is a request for a realtime searcher, just return the same searcher if there haven't been any changes.
+ if (realtime) {
+ newestSearcher.incref();
+ return newestSearcher;
+ }
+
+ currentReader.incRef();
+ newReader = currentReader;
+ }
+
+ // for now, turn off caches if this is for a realtime reader (caches take a little while to instantiate)
+ tmp = new SolrIndexSearcher(this, schema, (realtime ? "realtime":"main"), newReader, true, !realtime, true, directoryFactory);
+
+ } else {
+ // verbose("non-reopen START:");
+ tmp = new SolrIndexSearcher(this, newIndexDir, schema, getSolrConfig().mainIndexConfig, "main", true, directoryFactory);
+ // verbose("non-reopen DONE: searcher=",tmp);
+ }
+
+ List<RefCounted<SolrIndexSearcher>> searcherList = realtime ? _realtimeSearchers : _searchers;
+ RefCounted<SolrIndexSearcher> newSearcher = newHolder(tmp, searcherList); // refcount now at 1
+
+ // Increment reference again for "realtimeSearcher" variable. It should be at 2 after.
+ // When it's decremented by both the caller of this method, and by realtimeSearcher being replaced,
+ // it will be closed.
+ newSearcher.incref();
+
+ synchronized (searcherLock) {
+ if (realtimeSearcher != null) {
+ realtimeSearcher.decref();
+ }
+ realtimeSearcher = newSearcher;
+ searcherList.add(realtimeSearcher);
+ }
+
+ return newSearcher;
+
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error opening new searcher", e);
+ }
+ finally {
+ openSearcherLock.unlock();
+ if (newestSearcher != null) {
+ newestSearcher.decref();
+ }
+ }
+
+ }
/**
* Get a {@link SolrIndexSearcher} or start the process of creating a new one.
@@ -1106,80 +1232,28 @@ public final class SolrCore implements S
}
}
- // open the index synchronously
- // if this fails, we need to decrement onDeckSearchers again.
- SolrIndexSearcher tmp;
- RefCounted<SolrIndexSearcher> newestSearcher = null;
+ // a signal to decrement onDeckSearchers if something goes wrong.
+ final boolean[] decrementOnDeckCount=new boolean[]{true};
+ RefCounted<SolrIndexSearcher> currSearcherHolder = null; // searcher we are autowarming from
+ RefCounted<SolrIndexSearcher> searchHolder = null;
+ boolean success = false;
+ openSearcherLock.lock();
try {
- newestSearcher = getNewestSearcher(false);
- String newIndexDir = getNewIndexDir();
- File indexDirFile = new File(getIndexDir()).getCanonicalFile();
- File newIndexDirFile = new File(newIndexDir).getCanonicalFile();
-
- if (newestSearcher != null && solrConfig.reopenReaders
- && indexDirFile.equals(newIndexDirFile)) {
-
- if (updateHandlerReopens) {
-
- tmp = getUpdateHandler().reopenSearcher(newestSearcher.get());
- } else {
-
- IndexReader currentReader = newestSearcher.get().getIndexReader();
- IndexReader newReader;
-
- // verbose("start reopen without writer, reader=", currentReader);
- newReader = IndexReader.openIfChanged(currentReader);
- // verbose("reopen result", newReader);
-
-
- if (newReader == null) {
- currentReader.incRef();
- newReader = currentReader;
- }
-
- tmp = new SolrIndexSearcher(this, schema, "main", newReader, true, true, true, directoryFactory);
- }
-
-
- } else {
- // verbose("non-reopen START:");
- tmp = new SolrIndexSearcher(this, newIndexDir, schema, getSolrConfig().mainIndexConfig, "main", true, directoryFactory);
- // verbose("non-reopen DONE: searcher=",tmp);
+ searchHolder = openNewSearcher(updateHandlerReopens, false);
+ // the searchHolder will be incremented once already (and it will eventually be assigned to _searcher when registered)
+ // increment it again if we are going to return it to the caller.
+ if (returnSearcher) {
+ searchHolder.incref();
}
- } catch (Throwable th) {
- synchronized(searcherLock) {
- onDeckSearchers--;
- // notify another waiter to continue... it may succeed
- // and wake any others.
- searcherLock.notify();
- }
- // need to close the searcher here??? we shouldn't have to.
- throw new RuntimeException(th);
- } finally {
- if (newestSearcher != null) {
- newestSearcher.decref();
- }
- }
-
- final SolrIndexSearcher newSearcher=tmp;
- RefCounted<SolrIndexSearcher> currSearcherHolder=null;
- final RefCounted<SolrIndexSearcher> newSearchHolder=newHolder(newSearcher);
- if (returnSearcher) newSearchHolder.incref();
+ final RefCounted<SolrIndexSearcher> newSearchHolder = searchHolder;
+ final SolrIndexSearcher newSearcher = newSearchHolder.get();
- // a signal to decrement onDeckSearchers if something goes wrong.
- final boolean[] decrementOnDeckCount=new boolean[1];
- decrementOnDeckCount[0]=true;
-
- try {
boolean alreadyRegistered = false;
synchronized (searcherLock) {
- _searchers.add(newSearchHolder);
- // verbose("added searcher ",newSearchHolder.get()," to _searchers");
-
if (_searcher == null) {
// if there isn't a current searcher then we may
// want to register this one before warming is complete instead of waiting.
@@ -1198,174 +1272,133 @@ public final class SolrCore implements S
final SolrIndexSearcher currSearcher = currSearcherHolder==null ? null : currSearcherHolder.get();
- //
- // Note! if we registered the new searcher (but didn't increment it's
- // reference count because returnSearcher==false, it's possible for
- // someone else to register another searcher, and thus cause newSearcher
- // to close while we are warming.
- //
- // Should we protect against that by incrementing the reference count?
- // Maybe we should just let it fail? After all, if returnSearcher==false
- // and newSearcher has been de-registered, what's the point of continuing?
- //
-
Future future=null;
// warm the new searcher based on the current searcher.
// should this go before the other event handlers or after?
if (currSearcher != null) {
- try {
- future = searcherExecutor.submit(
- new Callable() {
- public Object call() throws Exception {
- try {
- newSearcher.warm(currSearcher);
- } catch (Throwable e) {
- SolrException.log(log, null, e);
- }
- return null;
- }
- }
- );
- } catch(Exception e) {
- // if submit fails, newSearchHolder does not get decref'd
- if (newSearchHolder != null) {
- newSearchHolder.decref();
- if (returnSearcher) {
- newSearchHolder.decref();
+ future = searcherExecutor.submit(
+ new Callable() {
+ public Object call() throws Exception {
+ try {
+ newSearcher.warm(currSearcher);
+ } catch (Throwable e) {
+ SolrException.log(log,e);
+ }
+ return null;
+ }
}
- }
- throw e;
- }
+ );
}
-
+
if (currSearcher==null && firstSearcherListeners.size() > 0) {
- try {
- future = searcherExecutor.submit(
- new Callable() {
- public Object call() throws Exception {
- try {
- for (SolrEventListener listener : firstSearcherListeners) {
- listener.newSearcher(newSearcher,null);
- }
- } catch (Throwable e) {
- SolrException.log(log, null, e);
- }
- return null;
- }
+ future = searcherExecutor.submit(
+ new Callable() {
+ public Object call() throws Exception {
+ try {
+ for (SolrEventListener listener : firstSearcherListeners) {
+ listener.newSearcher(newSearcher,null);
}
- );
- } catch(Exception e) {
- // if submit fails, newSearchHolder does not get decref'd
- if (newSearchHolder != null) {
- newSearchHolder.decref();
- if (returnSearcher) {
- newSearchHolder.decref();
+ } catch (Throwable e) {
+ SolrException.log(log,null,e);
+ }
+ return null;
+ }
}
- }
- throw e;
- }
+ );
}
if (currSearcher!=null && newSearcherListeners.size() > 0) {
- try {
- future = searcherExecutor.submit(
- new Callable() {
- public Object call() throws Exception {
- try {
- for (SolrEventListener listener : newSearcherListeners) {
- listener.newSearcher(newSearcher, currSearcher);
- }
- } catch (Throwable e) {
- SolrException.log(log, null, e);
- }
- return null;
- }
+ future = searcherExecutor.submit(
+ new Callable() {
+ public Object call() throws Exception {
+ try {
+ for (SolrEventListener listener : newSearcherListeners) {
+ listener.newSearcher(newSearcher, currSearcher);
}
- );
- } catch(Exception e) {
- // if submit fails, newSearchHolder does not get decref'd
- if (newSearchHolder != null) {
- newSearchHolder.decref();
- if (returnSearcher) {
- newSearchHolder.decref();
- }
- }
- throw e;
- }
+ } catch (Throwable e) {
+ SolrException.log(log,null,e);
+ }
+ return null;
+ }
+ }
+ );
}
// WARNING: this code assumes a single threaded executor (that all tasks
// queued will finish first).
final RefCounted<SolrIndexSearcher> currSearcherHolderF = currSearcherHolder;
if (!alreadyRegistered) {
- try {
- future = searcherExecutor.submit(
- new Callable() {
- public Object call() throws Exception {
- try {
- // signal that we no longer need to decrement
- // the count *before* registering the searcher since
- // registerSearcher will decrement even if it errors.
- decrementOnDeckCount[0]=false;
- registerSearcher(newSearchHolder);
- } catch (Throwable e) {
- SolrException.log(log, null, e);
- } finally {
- // we are all done with the old searcher we used
- // for warming...
- if (currSearcherHolderF!=null) currSearcherHolderF.decref();
- }
- return null;
- }
- }
- );
- } catch(Exception e) {
- // if submit fails, newSearchHolder does not get decref'd
- if (newSearchHolder != null) {
- newSearchHolder.decref();
- if (returnSearcher) {
- newSearchHolder.decref();
+ future = searcherExecutor.submit(
+ new Callable() {
+ public Object call() throws Exception {
+ try {
+ // registerSearcher will decrement onDeckSearchers and
+ // do a notify, even if it fails.
+ registerSearcher(newSearchHolder);
+ } catch (Throwable e) {
+ SolrException.log(log, e);
+ } finally {
+ // we are all done with the old searcher we used
+ // for warming...
+ if (currSearcherHolderF!=null) currSearcherHolderF.decref();
+ }
+ return null;
+ }
}
- }
- throw e;
- }
+ );
}
if (waitSearcher != null) {
waitSearcher[0] = future;
}
+ success = true;
+
// Return the searcher as the warming tasks run in parallel
// callers may wait on the waitSearcher future returned.
return returnSearcher ? newSearchHolder : null;
} catch (Exception e) {
- SolrException.log(log, null, e);
- if (currSearcherHolder != null) currSearcherHolder.decref();
+ if (e instanceof SolrException) throw (SolrException)e;
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ } finally {
- synchronized (searcherLock) {
- if (decrementOnDeckCount[0]) {
+ if (!success) {
+ synchronized (searcherLock) {
onDeckSearchers--;
+
+ if (onDeckSearchers < 0) {
+ // sanity check... should never happen
+ log.error(logid+"ERROR!!! onDeckSearchers after decrement=" + onDeckSearchers);
+ onDeckSearchers=0; // try and recover
+ }
+ // if we failed, we need to wake up at least one waiter to continue the process
+ searcherLock.notify();
}
- if (onDeckSearchers < 0) {
- // sanity check... should never happen
- log.error(logid+"ERROR!!! onDeckSearchers after decrement=" + onDeckSearchers);
- onDeckSearchers=0; // try and recover
+
+ if (currSearcherHolder != null) {
+ currSearcherHolder.decref();
+ }
+
+ if (searchHolder != null) {
+ searchHolder.decref(); // decrement 1 for _searcher (searchHolder will never become _searcher now)
+ if (returnSearcher) {
+ searchHolder.decref(); // decrement 1 because we won't be returning the searcher to the user
+ }
}
- // if we failed, we need to wake up at least one waiter to continue the process
- searcherLock.notify();
}
- // since the indexreader was already opened, assume we can continue on
- // even though we got an exception.
- return returnSearcher ? newSearchHolder : null;
+ // we want to do this after we decrement onDeckSearchers so another thread
+ // doesn't increment first and throw a false warning.
+ openSearcherLock.unlock();
+
}
}
- private RefCounted<SolrIndexSearcher> newHolder(SolrIndexSearcher newSearcher) {
+ private RefCounted<SolrIndexSearcher> newHolder(SolrIndexSearcher newSearcher, final List<RefCounted<SolrIndexSearcher>> searcherList) {
RefCounted<SolrIndexSearcher> holder = new RefCounted<SolrIndexSearcher>(newSearcher) {
@Override
public void close() {
@@ -1377,11 +1410,13 @@ public final class SolrCore implements S
// This relies on the RefCounted class allowing close() to be called every
// time the counter hits zero.
if (refcount.get() > 0) return;
- _searchers.remove(this);
+ searcherList.remove(this);
}
resource.close();
- } catch (IOException e) {
- log.error("Error closing searcher:" + SolrException.toStr(e));
+ } catch (Throwable e) {
+ // do not allow decref() operations to fail since they are typically called in finally blocks
+ // and throwing another exception would be very unexpected.
+ SolrException.log(log, "Error closing searcher:", e);
}
}
};
@@ -1389,6 +1424,9 @@ public final class SolrCore implements S
return holder;
}
+ public boolean isReloaded() {
+ return isReloaded;
+ }
// Take control of newSearcherHolder (which should have a reference count of at
// least 1 already. If the caller wishes to use the newSearcherHolder directly
@@ -1424,6 +1462,7 @@ public final class SolrCore implements S
log.info(logid+"Registered new searcher " + newSearcher);
} catch (Throwable e) {
+ // an exception in register() shouldn't be fatal.
log(e);
} finally {
// wake up anyone waiting for a searcher
@@ -1439,9 +1478,13 @@ public final class SolrCore implements S
public void closeSearcher() {
log.info(logid+"Closing main searcher on request.");
synchronized (searcherLock) {
+ if (realtimeSearcher != null) {
+ realtimeSearcher.decref();
+ realtimeSearcher = null;
+ }
if (_searcher != null) {
_searcher.decref(); // dec refcount for this._searcher
- _searcher=null; // isClosed() does check this
+ _searcher = null; // isClosed() does check this
infoRegistry.remove("currentSearcher");
}
}
@@ -1471,7 +1514,7 @@ public final class SolrCore implements S
handler.handleRequest(req,rsp);
setResponseHeaderValues(handler,req,rsp);
-
+
if (log.isInfoEnabled()) {
StringBuilder sb = new StringBuilder(logid);
for (int i=0; i<toLog.size(); i++) {
@@ -1526,7 +1569,7 @@ public final class SolrCore implements S
final public static void log(Throwable e) {
- SolrException.log(log, null, e);
+ SolrException.log(log,null,e);
}
@@ -1580,8 +1623,10 @@ public final class SolrCore implements S
}
log.info("created "+info.name+": " + info.className);
} catch (Exception ex) {
- throw new SolrException
+ SolrException e = new SolrException
(SolrException.ErrorCode.SERVER_ERROR, "QueryResponseWriter init failure", ex);
+ SolrException.log(log,null,e);
+ throw e;
}
}
@@ -1896,10 +1941,6 @@ public final class SolrCore implements S
return getWrappedWriter().getContentType(request, response);
}
}
-
- public boolean isReloaded() {
- return isReloaded;
- }
}