You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2012/02/19 04:57:44 UTC

svn commit: r1290938 - in /lucene/dev/trunk/solr/core/src/java/org/apache/solr/update: PeerSync.java UpdateLog.java

Author: yonik
Date: Sun Feb 19 03:57:43 2012
New Revision: 1290938

URL: http://svn.apache.org/viewvc?rev=1290938&view=rev
Log:
SOLR-3126: restore old deletes via tlog so peersync won't reorder

Modified:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java?rev=1290938&r1=1290937&r2=1290938&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java Sun Feb 19 03:57:43 2012
@@ -56,6 +56,7 @@ public class PeerSync  {
 
   private List<String> replicas;
   private int nUpdates;
+  private int maxUpdates;  // maximum number of updates to request before failing
 
   private UpdateHandler uhandler;
   private UpdateLog ulog;
@@ -119,6 +120,7 @@ public class PeerSync  {
   public PeerSync(SolrCore core, List<String> replicas, int nUpdates) {
     this.replicas = replicas;
     this.nUpdates = nUpdates;
+    this.maxUpdates = nUpdates;
 
     uhandler = core.getUpdateHandler();
     ulog = uhandler.getUpdateLog();
@@ -271,6 +273,8 @@ public class PeerSync  {
     if (otherVersions.size() == 0) {
       return true;
     }
+    
+    boolean completeList = otherVersions.size() < nUpdates;  // do we have their complete list of updates?
 
     Collections.sort(otherVersions, absComparator);
 
@@ -295,7 +299,7 @@ public class PeerSync  {
     List<Long> toRequest = new ArrayList<Long>();
     for (Long otherVersion : otherVersions) {
       // stop when the entries get old enough that reorders may lead us to see updates we don't need
-      if (Math.abs(otherVersion) < ourLowThreshold) break;
+      if (!completeList && Math.abs(otherVersion) < ourLowThreshold) break;
 
       if (ourUpdateSet.contains(otherVersion) || requestedUpdateSet.contains(otherVersion)) {
         // we either have this update, or already requested it
@@ -307,11 +311,15 @@ public class PeerSync  {
     }
 
     sreq.requestedUpdates = toRequest;
-
+    
     if (toRequest.isEmpty()) {
       // we had (or already requested) all the updates referenced by the replica
       return true;
     }
+    
+    if (toRequest.size() > maxRequests) {
+      return false;
+    }
 
     return requestUpdates(srsp, toRequest);
   }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1290938&r1=1290937&r2=1290938&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java Sun Feb 19 03:57:43 2012
@@ -111,6 +111,7 @@ public class UpdateLog implements Plugin
 
   private volatile UpdateHandler uhandler;    // a core reload can change this reference!
   private volatile boolean cancelApplyBufferUpdate;
+  List<Long> startingVersions;
 
 
   public static class LogPtr {
@@ -174,11 +175,28 @@ public class UpdateLog implements Plugin
     newestLogOnStartup = oldLog;
 
     versionInfo = new VersionInfo(uhandler, 256);
+
+    UpdateLog.RecentUpdates startingRecentUpdates = getRecentUpdates();
+    try {
+      startingVersions = startingRecentUpdates.getVersions(numRecordsToKeep);
+      // populate recent deletes list (since we can't get that info from the index)
+      for (int i=startingRecentUpdates.deleteList.size()-1; i>=0; i--) {
+        DeleteUpdate du = startingRecentUpdates.deleteList.get(i);
+        oldDeletes.put(new BytesRef(du.id), new LogPtr(-1,du.version));
+      }
+    } finally {
+      startingRecentUpdates.close();
+    }
+
   }
   
   public File getLogDir() {
     return tlogDir;
   }
+  
+  public List<Long> getStartingVersions() {
+    return startingVersions;
+  }
 
   /* Takes over ownership of the log, keeping it until no longer needed
      and then decrementing it's reference and dropping it.
@@ -623,13 +641,24 @@ public class UpdateLog implements Plugin
     TransactionLog log;
     long version;
     long pointer;
-  } 
+  }
+
+  static class DeleteUpdate {
+    long version;
+    byte[] id;
+
+    public DeleteUpdate(long version, byte[] id) {
+      this.version = version;
+      this.id = id;
+    }
+  }
   
   public class RecentUpdates {
     Deque<TransactionLog> logList;    // newest first
     List<List<Update>> updateList;
     HashMap<Long, Update> updates;
     List<Update> deleteByQueryList;
+    List<DeleteUpdate> deleteList;
 
 
     public List<Long> getVersions(int n) {
@@ -664,10 +693,12 @@ public class UpdateLog implements Plugin
       return result;
     }
 
+
     private void update() {
       int numUpdates = 0;
       updateList = new ArrayList<List<Update>>(logList.size());
       deleteByQueryList = new ArrayList<Update>();
+      deleteList = new ArrayList<DeleteUpdate>();
       updates = new HashMap<Long,Update>(numRecordsToKeep);
 
       for (TransactionLog oldLog : logList) {
@@ -703,6 +734,8 @@ public class UpdateLog implements Plugin
                   
                   if (oper == UpdateLog.DELETE_BY_QUERY) {
                     deleteByQueryList.add(update);
+                  } else if (oper == UpdateLog.DELETE) {
+                    deleteList.add(new DeleteUpdate(version, (byte[])entry.get(2)));
                   }
                   
                   break;