You are viewing a plain text version of this content. The canonical link for it is here.
Posted to solr-commits@lucene.apache.org by yo...@apache.org on 2008/10/24 18:50:46 UTC

svn commit: r707683 - in /lucene/solr/trunk/src/java/org/apache/solr: core/IndexDeletionPolicyWrapper.java handler/ReplicationHandler.java handler/SnapPuller.java

Author: yonik
Date: Fri Oct 24 09:50:45 2008
New Revision: 707683

URL: http://svn.apache.org/viewvc?rev=707683&view=rev
Log:
SOLR-561: thread safe fixes, commit point reservation fixes

Modified:
    lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java
    lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java
    lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java

Modified: lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java?rev=707683&r1=707682&r2=707683&view=diff
==============================================================================
--- lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java (original)
+++ lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java Fri Oct 24 09:50:45 2008
@@ -19,10 +19,10 @@
  * @see org.apache.lucene.index.IndexDeletionPolicy
  */
 public class IndexDeletionPolicyWrapper implements IndexDeletionPolicy {
-  private IndexDeletionPolicy deletionPolicy;
-  private Map<Long, IndexCommit> solrVersionVsCommits = new ConcurrentHashMap<Long, IndexCommit>();
-  private Map<Long, Long> reserves = new ConcurrentHashMap<Long,Long>();
-  private IndexCommit latestCommit;
+  private final IndexDeletionPolicy deletionPolicy;
+  private volatile Map<Long, IndexCommit> solrVersionVsCommits = new ConcurrentHashMap<Long, IndexCommit>();
+  private final Map<Long, Long> reserves = new ConcurrentHashMap<Long,Long>();
+  private volatile IndexCommit latestCommit;
 
   public IndexDeletionPolicyWrapper(IndexDeletionPolicy deletionPolicy) {
     this.deletionPolicy = deletionPolicy;
@@ -51,7 +51,20 @@
    * @param reserveTime  time in milliseconds for which the commit point is to be reserved
    */
   public void setReserveDuration(Long indexVersion, long reserveTime) {
-      reserves.put(indexVersion, System.currentTimeMillis() + reserveTime);
+    long timeToSet = System.currentTimeMillis() + reserveTime;
+    for(;;) {
+      Long previousTime = reserves.put(indexVersion, timeToSet);
+
+      // this is the common success case: the older time didn't exist, or
+      // came before the new time.
+      if (previousTime == null || previousTime <= timeToSet) break;
+
+      // At this point, we overwrote a longer reservation, so we want to restore the older one.
+      // the problem is that an even longer reservation may come in concurrently
+      // and we don't want to overwrite that one too.  We simply keep retrying in a loop
+      // with the maximum time value we have seen.
+      timeToSet = previousTime;      
+    }
   }
 
   private void cleanReserves() {

Modified: lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=707683&r1=707682&r2=707683&view=diff
==============================================================================
--- lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java (original)
+++ lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java Fri Oct 24 09:50:45 2008
@@ -96,7 +96,7 @@
 
   private Integer reserveCommitDuration = SnapPuller.readInterval("00:00:10");
 
-  private IndexCommit indexCommitPoint;
+  private volatile IndexCommit indexCommitPoint;
 
   public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
     rsp.setHttpCaching(false);
@@ -107,9 +107,10 @@
       return;
     }
     if (command.equals(CMD_INDEX_VERSION)) {
-      if (indexCommitPoint != null) {
-        rsp.add(CMD_INDEX_VERSION, indexCommitPoint.getVersion());
-        rsp.add(GENERATION, indexCommitPoint.getGeneration());
+      IndexCommit commitPoint = indexCommitPoint;  // make a copy so it won't change
+      if (commitPoint != null) {
+        rsp.add(CMD_INDEX_VERSION, commitPoint.getVersion());
+        rsp.add(GENERATION, commitPoint.getGeneration());
       } else {
         // must never happen
         rsp.add(CMD_INDEX_VERSION, 0L);
@@ -201,9 +202,8 @@
   void doSnapPull() {
     if (!isSlave)
       return;
-    if (snapPullLock.isLocked())
+    if (!snapPullLock.tryLock())
       return;
-    snapPullLock.lock();
     try {
       snapPuller.fetchLatestIndex(core);
     } catch (Exception e) {
@@ -214,7 +214,6 @@
   }
 
   boolean isReplicating() {
-    boolean b = snapPullLock.isLocked();
     return snapPullLock.isLocked();
   }
 
@@ -445,9 +444,10 @@
     long[] versionAndGeneration = getIndexVersion();
     details.add(CMD_INDEX_VERSION, versionAndGeneration[0]);
     details.add(GENERATION, versionAndGeneration[1]);
-    if (isMaster && indexCommitPoint != null) {
-      details.add("replicatable" + CMD_INDEX_VERSION, indexCommitPoint.getVersion());
-      details.add("replicatable" + GENERATION, indexCommitPoint.getGeneration());
+    IndexCommit commit = indexCommitPoint;  // make a copy so it won't change
+    if (isMaster && commit != null) {
+      details.add("replicatable" + CMD_INDEX_VERSION, commit.getVersion());
+      details.add("replicatable" + GENERATION, commit.getGeneration());
     }
 
     if (isSlave) {

Modified: lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java?rev=707683&r1=707682&r2=707683&view=diff
==============================================================================
--- lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java (original)
+++ lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java Fri Oct 24 09:50:45 2008
@@ -58,35 +58,35 @@
 public class SnapPuller {
   private static final Logger LOG = LoggerFactory.getLogger(SnapPuller.class.getName());
 
-  private String masterUrl;
+  private final String masterUrl;
 
-  private ReplicationHandler replicationHandler;
+  private final ReplicationHandler replicationHandler;
 
-  private Integer pollInterval;
+  private final Integer pollInterval;
 
   private String pollIntervalStr;
 
   private ScheduledExecutorService executorService;
 
-  private long executorStartTime;
+  private volatile long executorStartTime;
 
-  private long replicationStartTime;
+  private volatile long replicationStartTime;
 
-  private SolrCore solrCore;
+  private final SolrCore solrCore;
 
-  private List<Map<String, Object>> filesToDownload;
+  private volatile List<Map<String, Object>> filesToDownload;
 
-  private List<Map<String, Object>> confFilesToDownload;
+  private volatile List<Map<String, Object>> confFilesToDownload;
 
-  private List<Map<String, Object>> filesDownloaded;
+  private volatile List<Map<String, Object>> filesDownloaded;
 
-  private List<Map<String, Object>> confFilesDownloaded;
+  private volatile List<Map<String, Object>> confFilesDownloaded;
 
-  private Map<String, Object> currentFile;
+  private volatile Map<String, Object> currentFile;
 
-  private FileFetcher fileFetcher;
+  private volatile FileFetcher fileFetcher;
 
-  private boolean stop = false;
+  private volatile boolean stop = false;
 
   /**
    * Disable the timer task for polling
@@ -214,7 +214,7 @@
       IndexCommit commit;
       RefCounted<SolrIndexSearcher> searcherRefCounted = null;
       try {
-        searcherRefCounted = core.getSearcher();
+        searcherRefCounted = core.getNewestSearcher(false);
         commit = searcherRefCounted.get().getReader().getIndexCommit();
       } finally {
         if (searcherRefCounted != null)
@@ -587,6 +587,7 @@
   List<Map<String, Object>> getConfFilesDownloaded() {
     //make a copy first because it can be null later
     List<Map<String, Object>> tmp = confFilesDownloaded;
+    // NOTE: it's safe to make a copy of a SynchronizedCollection(ArrayList)
     return tmp == null ? Collections.EMPTY_LIST : new ArrayList<Map<String, Object>>(tmp);
   }