You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/01/18 23:56:04 UTC

svn commit: r1233104 - /lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java

Author: markrmiller
Date: Wed Jan 18 22:56:04 2012
New Revision: 1233104

URL: http://svn.apache.org/viewvc?rev=1233104&view=rev
Log:
fix mismerge

Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java?rev=1233104&r1=1233103&r2=1233104&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/SolrCore.java Wed Jan 18 22:56:04 2012
@@ -60,6 +60,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.net.URL;
 import java.lang.reflect.Constructor;
+import java.util.concurrent.locks.ReentrantLock;
 
 
 /**
@@ -475,20 +476,6 @@ public final class SolrCore implements S
   }
   
   /**
-   * 
-   * @param dataDir
-   * @param schema
-   * @throws SAXException 
-   * @throws IOException 
-   * @throws ParserConfigurationException 
-   * 
-   * @since solr 1.0
-   */
-  public SolrCore(String dataDir, IndexSchema schema) throws ParserConfigurationException, IOException, SAXException {
-    this(null, dataDir, new SolrConfig(), schema, null);
-  }
-  
-  /**
    * Creates a new core and register it in the list of cores.
    * If a core with the same name already exists, it will be stopped and replaced by this one.
    *
@@ -961,15 +948,21 @@ public final class SolrCore implements S
   // This reference is protected by searcherLock.
   private RefCounted<SolrIndexSearcher> _searcher;
 
-  // All of the open searchers.  Don't access this directly.
+  // All of the normal open searchers.  Don't access this directly.
   // protected by synchronizing on searcherLock.
   private final LinkedList<RefCounted<SolrIndexSearcher>> _searchers = new LinkedList<RefCounted<SolrIndexSearcher>>();
+  private final LinkedList<RefCounted<SolrIndexSearcher>> _realtimeSearchers = new LinkedList<RefCounted<SolrIndexSearcher>>();
 
   final ExecutorService searcherExecutor = Executors.newSingleThreadExecutor();
   private int onDeckSearchers;  // number of searchers preparing
+  // Lock ordering: one can acquire the openSearcherLock and then the searcherLock, but not vice-versa.
   private Object searcherLock = new Object();  // the sync object for the searcher
+  private ReentrantLock openSearcherLock = new ReentrantLock(true);     // used to serialize opens/reopens for absolute ordering
   private final int maxWarmingSearchers;  // max number of on-deck searchers allowed
 
+  private RefCounted<SolrIndexSearcher> realtimeSearcher;
+
+
   /**
   * Return a registered {@link RefCounted}&lt;{@link SolrIndexSearcher}&gt; with
   * the reference count incremented.  It <b>must</b> be decremented when no longer needed.
@@ -988,29 +981,162 @@ public final class SolrCore implements S
   }
 
   /**
-  * Return the newest {@link RefCounted}&lt;{@link SolrIndexSearcher}&gt; with
+  * Return the newest normal {@link RefCounted}&lt;{@link SolrIndexSearcher}&gt; with
   * the reference count incremented.  It <b>must</b> be decremented when no longer needed.
   * If no searcher is currently open, then if openNew==true a new searcher will be opened,
   * or null is returned if openNew==false.
   */
   public RefCounted<SolrIndexSearcher> getNewestSearcher(boolean openNew) {
     synchronized (searcherLock) {
-      if (_searchers.isEmpty()) {
-        if (!openNew) return null;
-        // Not currently implemented since simply calling getSearcher during inform()
-        // can result in a deadlock.  Right now, solr always opens a searcher first
-        // before calling inform() anyway, so this should never happen.
-        throw new UnsupportedOperationException();
-      }
-      RefCounted<SolrIndexSearcher> newest = _searchers.getLast();
-      newest.incref();
-      return newest;
+      if (!_searchers.isEmpty()) {
+        RefCounted<SolrIndexSearcher> newest = _searchers.getLast();
+        newest.incref();
+        return newest;
+      }
     }
+
+    return openNew ? getRealtimeSearcher() : null;
   }
 
+
+  /** Gets the latest real-time searcher w/o forcing open a new searcher if one already exists.
+   * The reference count will be incremented.
+   */
+  public RefCounted<SolrIndexSearcher> getRealtimeSearcher() {
+    synchronized (searcherLock) {
+      if (realtimeSearcher != null) {
+        realtimeSearcher.incref();
+        return realtimeSearcher;
+      }
+    }
+
+    // use the searcher lock to prevent multiple people from trying to open at once
+    openSearcherLock.lock();
+    try {
+
+      // try again
+      synchronized (searcherLock) {
+        if (realtimeSearcher != null) {
+          realtimeSearcher.incref();
+          return realtimeSearcher;
+        }
+      }
+
+      // force a new searcher open
+      return openNewSearcher(true, true);
+    } finally {
+      openSearcherLock.unlock();
+    }
+  }
+
+
   public RefCounted<SolrIndexSearcher> getSearcher(boolean forceNew, boolean returnSearcher, final Future[] waitSearcher) throws IOException {
     return getSearcher(forceNew, returnSearcher, waitSearcher, false);
   }
+
+
+  /** Opens a new searcher and returns a RefCounted<SolrIndexSearcher> with it's reference incremented.
+   *
+   * "realtime" means that we need to open quickly for a realtime view of the index, hence don't do any
+   * autowarming and add to the _realtimeSearchers queue rather than the _searchers queue (so it won't
+   * be used for autowarming by a future normal searcher).  A "realtime" searcher will currently never
+   * become "registered" (since it currently lacks caching).
+   *
+   * realtimeSearcher is updated to the latest opened searcher, regardless of the value of "realtime".
+   *
+   * This method aquires openSearcherLock - do not call with searckLock held!
+   */
+  public RefCounted<SolrIndexSearcher> openNewSearcher(boolean updateHandlerReopens, boolean realtime) {
+    SolrIndexSearcher tmp;
+    RefCounted<SolrIndexSearcher> newestSearcher = null;
+    boolean nrt = solrConfig.reopenReaders && updateHandlerReopens;
+
+    openSearcherLock.lock();
+    try {
+      String newIndexDir = null;
+      File indexDirFile = null;
+      File newIndexDirFile = null;
+
+      // if it's not a normal near-realtime update, check that paths haven't changed.
+      if (!nrt) {
+        indexDirFile = new File(getIndexDir()).getCanonicalFile();
+        newIndexDir = getNewIndexDir();
+        newIndexDirFile = new File(newIndexDir).getCanonicalFile();
+      }
+
+      synchronized (searcherLock) {
+        newestSearcher = realtimeSearcher;
+        if (newestSearcher != null) {
+          newestSearcher.incref();      // the matching decref is in the finally block
+        }
+      }
+
+      if (newestSearcher != null && solrConfig.reopenReaders
+          && (nrt || indexDirFile.equals(newIndexDirFile))) {
+
+        IndexReader newReader;
+        IndexReader currentReader = newestSearcher.get().getIndexReader();
+
+        if (updateHandlerReopens) {
+          // SolrCore.verbose("start reopen from",previousSearcher,"writer=",writer);
+          IndexWriter writer = getUpdateHandler().getSolrCoreState().getIndexWriter(this);
+          newReader = IndexReader.openIfChanged(currentReader, writer, true);
+
+        } else {
+          // verbose("start reopen without writer, reader=", currentReader);
+          newReader = IndexReader.openIfChanged(currentReader);
+          // verbose("reopen result", newReader);
+        }
+
+        if (newReader == null) {
+          // if this is a request for a realtime searcher, just return the same searcher if there haven't been any changes.
+          if (realtime) {
+            newestSearcher.incref();
+            return newestSearcher;
+          }
+
+          currentReader.incRef();
+          newReader = currentReader;
+        }
+
+       // for now, turn off caches if this is for a realtime reader (caches take a little while to instantiate)
+        tmp = new SolrIndexSearcher(this, schema, (realtime ? "realtime":"main"), newReader, true, !realtime, true, directoryFactory);
+
+      } else {
+        // verbose("non-reopen START:");
+        tmp = new SolrIndexSearcher(this, newIndexDir, schema, getSolrConfig().mainIndexConfig, "main", true, directoryFactory);
+        // verbose("non-reopen DONE: searcher=",tmp);
+      }
+
+      List<RefCounted<SolrIndexSearcher>> searcherList = realtime ? _realtimeSearchers : _searchers;
+      RefCounted<SolrIndexSearcher> newSearcher = newHolder(tmp, searcherList);    // refcount now at 1
+
+      // Increment reference again for "realtimeSearcher" variable.  It should be at 2 after.
+      // When it's decremented by both the caller of this method, and by realtimeSearcher being replaced,
+      // it will be closed.
+      newSearcher.incref();
+
+      synchronized (searcherLock) {
+        if (realtimeSearcher != null) {
+          realtimeSearcher.decref();
+        }
+        realtimeSearcher = newSearcher;
+        searcherList.add(realtimeSearcher);
+      }
+
+      return newSearcher;
+
+    } catch (Exception e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error opening new searcher", e);
+    }
+    finally {
+      openSearcherLock.unlock();
+      if (newestSearcher != null) {
+        newestSearcher.decref();
+      }
+    }
+
+  }
   
   /**
    * Get a {@link SolrIndexSearcher} or start the process of creating a new one.
@@ -1106,80 +1232,28 @@ public final class SolrCore implements S
       }
     }
 
-    // open the index synchronously
-    // if this fails, we need to decrement onDeckSearchers again.
-    SolrIndexSearcher tmp;
-    RefCounted<SolrIndexSearcher> newestSearcher = null;
+    // a signal to decrement onDeckSearchers if something goes wrong.
+    final boolean[] decrementOnDeckCount=new boolean[]{true};
+    RefCounted<SolrIndexSearcher> currSearcherHolder = null;     // searcher we are autowarming from
+    RefCounted<SolrIndexSearcher> searchHolder = null;
+    boolean success = false;
 
+    openSearcherLock.lock();
     try {
-      newestSearcher = getNewestSearcher(false);
-      String newIndexDir = getNewIndexDir();
-      File indexDirFile = new File(getIndexDir()).getCanonicalFile();
-      File newIndexDirFile = new File(newIndexDir).getCanonicalFile();
-      
-      if (newestSearcher != null && solrConfig.reopenReaders
-          && indexDirFile.equals(newIndexDirFile)) {
-
-        if (updateHandlerReopens) {
-          
-          tmp = getUpdateHandler().reopenSearcher(newestSearcher.get());
-        } else {
-          
-          IndexReader currentReader = newestSearcher.get().getIndexReader();
-          IndexReader newReader;
-          
-          // verbose("start reopen without writer, reader=", currentReader);
-          newReader = IndexReader.openIfChanged(currentReader);
-          // verbose("reopen result", newReader);
-
-
-          if (newReader == null) {
-            currentReader.incRef();
-            newReader = currentReader;
-          }
-          
-          tmp = new SolrIndexSearcher(this, schema, "main", newReader, true, true, true, directoryFactory);
-        }
-
-
-      } else {
-        // verbose("non-reopen START:");
-        tmp = new SolrIndexSearcher(this, newIndexDir, schema, getSolrConfig().mainIndexConfig, "main", true, directoryFactory);
-        // verbose("non-reopen DONE: searcher=",tmp);
+      searchHolder = openNewSearcher(updateHandlerReopens, false);
+       // the searchHolder will be incremented once already (and it will eventually be assigned to _searcher when registered)
+       // increment it again if we are going to return it to the caller.
+      if (returnSearcher) {
+        searchHolder.incref();
       }
-    } catch (Throwable th) {
-      synchronized(searcherLock) {
-        onDeckSearchers--;
-        // notify another waiter to continue... it may succeed
-        // and wake any others.
-        searcherLock.notify();
-      }
-      // need to close the searcher here??? we shouldn't have to.
-      throw new RuntimeException(th);
-    } finally {
-      if (newestSearcher != null) {
-        newestSearcher.decref();
-      }
-    }
-    
-    final SolrIndexSearcher newSearcher=tmp;
 
-    RefCounted<SolrIndexSearcher> currSearcherHolder=null;
-    final RefCounted<SolrIndexSearcher> newSearchHolder=newHolder(newSearcher);
 
-    if (returnSearcher) newSearchHolder.incref();
+      final RefCounted<SolrIndexSearcher> newSearchHolder = searchHolder;
+      final SolrIndexSearcher newSearcher = newSearchHolder.get();
 
-    // a signal to decrement onDeckSearchers if something goes wrong.
-    final boolean[] decrementOnDeckCount=new boolean[1];
-    decrementOnDeckCount[0]=true;
-
-    try {
 
       boolean alreadyRegistered = false;
       synchronized (searcherLock) {
-        _searchers.add(newSearchHolder);
-        // verbose("added searcher ",newSearchHolder.get()," to _searchers");
-
         if (_searcher == null) {
           // if there isn't a current searcher then we may
           // want to register this one before warming is complete instead of waiting.
@@ -1198,174 +1272,133 @@ public final class SolrCore implements S
 
       final SolrIndexSearcher currSearcher = currSearcherHolder==null ? null : currSearcherHolder.get();
 
-      //
-      // Note! if we registered the new searcher (but didn't increment it's
-      // reference count because returnSearcher==false, it's possible for
-      // someone else to register another searcher, and thus cause newSearcher
-      // to close while we are warming.
-      //
-      // Should we protect against that by incrementing the reference count?
-      // Maybe we should just let it fail?   After all, if returnSearcher==false
-      // and newSearcher has been de-registered, what's the point of continuing?
-      //
-
       Future future=null;
 
       // warm the new searcher based on the current searcher.
       // should this go before the other event handlers or after?
       if (currSearcher != null) {
-        try {
-          future = searcherExecutor.submit(
-                  new Callable() {
-                    public Object call() throws Exception {
-                      try {
-                        newSearcher.warm(currSearcher);
-                      } catch (Throwable e) {
-                        SolrException.log(log, null, e);
-                      }
-                      return null;
-                    }
-                  }
-          );
-        } catch(Exception e) {
-          // if submit fails, newSearchHolder does not get decref'd
-          if (newSearchHolder != null) {
-            newSearchHolder.decref();
-            if (returnSearcher) {
-              newSearchHolder.decref();
+        future = searcherExecutor.submit(
+            new Callable() {
+              public Object call() throws Exception {
+                try {
+                  newSearcher.warm(currSearcher);
+                } catch (Throwable e) {
+                  SolrException.log(log,e);
+                }
+                return null;
+              }
             }
-          }
-          throw e;
-        }
+        );
       }
-      
+
       if (currSearcher==null && firstSearcherListeners.size() > 0) {
-        try {
-          future = searcherExecutor.submit(
-                  new Callable() {
-                    public Object call() throws Exception {
-                      try {
-                        for (SolrEventListener listener : firstSearcherListeners) {
-                          listener.newSearcher(newSearcher,null);
-                        }
-                      } catch (Throwable e) {
-                        SolrException.log(log, null, e);
-                      }
-                      return null;
-                    }
+        future = searcherExecutor.submit(
+            new Callable() {
+              public Object call() throws Exception {
+                try {
+                  for (SolrEventListener listener : firstSearcherListeners) {
+                    listener.newSearcher(newSearcher,null);
                   }
-          );
-        } catch(Exception e) {
-          // if submit fails, newSearchHolder does not get decref'd
-          if (newSearchHolder != null) {
-            newSearchHolder.decref();
-            if (returnSearcher) {
-              newSearchHolder.decref();
+                } catch (Throwable e) {
+                  SolrException.log(log,null,e);
+                }
+                return null;
+              }
             }
-          }
-          throw e;
-        }
+        );
       }
 
       if (currSearcher!=null && newSearcherListeners.size() > 0) {
-        try {
-          future = searcherExecutor.submit(
-                  new Callable() {
-                    public Object call() throws Exception {
-                      try {
-                        for (SolrEventListener listener : newSearcherListeners) {
-                          listener.newSearcher(newSearcher, currSearcher);
-                        }
-                      } catch (Throwable e) {
-                        SolrException.log(log, null, e);
-                      }
-                      return null;
-                    }
+        future = searcherExecutor.submit(
+            new Callable() {
+              public Object call() throws Exception {
+                try {
+                  for (SolrEventListener listener : newSearcherListeners) {
+                    listener.newSearcher(newSearcher, currSearcher);
                   }
-          );
-      } catch(Exception e) {
-        // if submit fails, newSearchHolder does not get decref'd
-        if (newSearchHolder != null) {
-          newSearchHolder.decref();
-          if (returnSearcher) {
-            newSearchHolder.decref();
-          }
-        }
-        throw e;
-      }
+                } catch (Throwable e) {
+                  SolrException.log(log,null,e);
+                }
+                return null;
+              }
+            }
+        );
       }
 
       // WARNING: this code assumes a single threaded executor (that all tasks
       // queued will finish first).
       final RefCounted<SolrIndexSearcher> currSearcherHolderF = currSearcherHolder;
       if (!alreadyRegistered) {
-        try {
-          future = searcherExecutor.submit(
-                  new Callable() {
-                    public Object call() throws Exception {
-                      try {
-                        // signal that we no longer need to decrement
-                        // the count *before* registering the searcher since
-                        // registerSearcher will decrement even if it errors.
-                        decrementOnDeckCount[0]=false;
-                        registerSearcher(newSearchHolder);
-                      } catch (Throwable e) {
-                        SolrException.log(log, null, e);
-                      } finally {
-                        // we are all done with the old searcher we used
-                        // for warming...
-                        if (currSearcherHolderF!=null) currSearcherHolderF.decref();
-                      }
-                      return null;
-                    }
-                  }
-          );
-        } catch(Exception e) {
-          // if submit fails, newSearchHolder does not get decref'd
-          if (newSearchHolder != null) {
-            newSearchHolder.decref();
-            if (returnSearcher) {
-              newSearchHolder.decref();
+        future = searcherExecutor.submit(
+            new Callable() {
+              public Object call() throws Exception {
+                try {
+                  // registerSearcher will decrement onDeckSearchers and
+                  // do a notify, even if it fails.
+                  registerSearcher(newSearchHolder);
+                } catch (Throwable e) {
+                  SolrException.log(log, e);
+                } finally {
+                  // we are all done with the old searcher we used
+                  // for warming...
+                  if (currSearcherHolderF!=null) currSearcherHolderF.decref();
+                }
+                return null;
+              }
             }
-          }
-          throw e;
-        }
+        );
       }
 
       if (waitSearcher != null) {
         waitSearcher[0] = future;
       }
 
+      success = true;
+
       // Return the searcher as the warming tasks run in parallel
       // callers may wait on the waitSearcher future returned.
       return returnSearcher ? newSearchHolder : null;
 
     } catch (Exception e) {
-      SolrException.log(log, null, e);
-      if (currSearcherHolder != null) currSearcherHolder.decref();
+      if (e instanceof SolrException) throw (SolrException)e;
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    } finally {
 
-      synchronized (searcherLock) {
-        if (decrementOnDeckCount[0]) {
+      if (!success) {
+        synchronized (searcherLock) {
           onDeckSearchers--;
+
+          if (onDeckSearchers < 0) {
+            // sanity check... should never happen
+            log.error(logid+"ERROR!!! onDeckSearchers after decrement=" + onDeckSearchers);
+            onDeckSearchers=0; // try and recover
+          }
+          // if we failed, we need to wake up at least one waiter to continue the process
+          searcherLock.notify();
         }
-        if (onDeckSearchers < 0) {
-          // sanity check... should never happen
-          log.error(logid+"ERROR!!! onDeckSearchers after decrement=" + onDeckSearchers);
-          onDeckSearchers=0; // try and recover
+
+        if (currSearcherHolder != null) {
+          currSearcherHolder.decref();
+        }
+
+        if (searchHolder != null) {
+          searchHolder.decref();      // decrement 1 for _searcher (searchHolder will never become _searcher now)
+          if (returnSearcher) {
+            searchHolder.decref();    // decrement 1 because we won't be returning the searcher to the user
+          }
         }
-        // if we failed, we need to wake up at least one waiter to continue the process
-        searcherLock.notify();
       }
 
-      // since the indexreader was already opened, assume we can continue on
-      // even though we got an exception.
-      return returnSearcher ? newSearchHolder : null;
+      // we want to do this after we decrement onDeckSearchers so another thread
+      // doesn't increment first and throw a false warning.
+      openSearcherLock.unlock();
+
     }
 
   }
 
 
-  private RefCounted<SolrIndexSearcher> newHolder(SolrIndexSearcher newSearcher) {
+  private RefCounted<SolrIndexSearcher> newHolder(SolrIndexSearcher newSearcher, final List<RefCounted<SolrIndexSearcher>> searcherList) {
     RefCounted<SolrIndexSearcher> holder = new RefCounted<SolrIndexSearcher>(newSearcher) {
       @Override
       public void close() {
@@ -1377,11 +1410,13 @@ public final class SolrCore implements S
             // This relies on the RefCounted class allowing close() to be called every
             // time the counter hits zero.
             if (refcount.get() > 0) return;
-            _searchers.remove(this);
+            searcherList.remove(this);
           }
           resource.close();
-        } catch (IOException e) {
-          log.error("Error closing searcher:" + SolrException.toStr(e));
+        } catch (Throwable e) {
+          // do not allow decref() operations to fail since they are typically called in finally blocks
+          // and throwing another exception would be very unexpected.
+          SolrException.log(log, "Error closing searcher:", e);
         }
       }
     };
@@ -1389,6 +1424,9 @@ public final class SolrCore implements S
     return holder;
   }
 
+  public boolean isReloaded() {
+    return isReloaded;
+  }
 
   // Take control of newSearcherHolder (which should have a reference count of at
   // least 1 already.  If the caller wishes to use the newSearcherHolder directly
@@ -1424,6 +1462,7 @@ public final class SolrCore implements S
         log.info(logid+"Registered new searcher " + newSearcher);
 
       } catch (Throwable e) {
+        // an exception in register() shouldn't be fatal.
         log(e);
       } finally {
         // wake up anyone waiting for a searcher
@@ -1439,9 +1478,13 @@ public final class SolrCore implements S
   public void closeSearcher() {
     log.info(logid+"Closing main searcher on request.");
     synchronized (searcherLock) {
+      if (realtimeSearcher != null) {
+        realtimeSearcher.decref();
+        realtimeSearcher = null;
+      }
       if (_searcher != null) {
         _searcher.decref();   // dec refcount for this._searcher
-        _searcher=null; // isClosed() does check this
+        _searcher = null; // isClosed() does check this
         infoRegistry.remove("currentSearcher");
       }
     }
@@ -1471,7 +1514,7 @@ public final class SolrCore implements S
     
     handler.handleRequest(req,rsp);
     setResponseHeaderValues(handler,req,rsp);
-    
+
     if (log.isInfoEnabled()) {
       StringBuilder sb = new StringBuilder(logid);
       for (int i=0; i<toLog.size(); i++) {
@@ -1526,7 +1569,7 @@ public final class SolrCore implements S
 
 
   final public static void log(Throwable e) {
-    SolrException.log(log, null, e);
+    SolrException.log(log,null,e);
   }
 
   
@@ -1580,8 +1623,10 @@ public final class SolrCore implements S
         }
         log.info("created "+info.name+": " + info.className);
       } catch (Exception ex) {
-          throw new SolrException
+          SolrException e = new SolrException
             (SolrException.ErrorCode.SERVER_ERROR, "QueryResponseWriter init failure", ex);
+          SolrException.log(log,null,e);
+          throw e;
       }
     }
 
@@ -1896,10 +1941,6 @@ public final class SolrCore implements S
       return getWrappedWriter().getContentType(request, response);
     }
   }
-
-  public boolean isReloaded() {
-    return isReloaded;
-  }
 }