You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2012/02/19 04:57:44 UTC
svn commit: r1290938 - in
/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update: PeerSync.java
UpdateLog.java
Author: yonik
Date: Sun Feb 19 03:57:43 2012
New Revision: 1290938
URL: http://svn.apache.org/viewvc?rev=1290938&view=rev
Log:
SOLR-3126: restore old deletes via tlog so peersync won't reorder
Modified:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java?rev=1290938&r1=1290937&r2=1290938&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java Sun Feb 19 03:57:43 2012
@@ -56,6 +56,7 @@ public class PeerSync {
private List<String> replicas;
private int nUpdates;
+ private int maxUpdates; // maximum number of updates to request before failing
private UpdateHandler uhandler;
private UpdateLog ulog;
@@ -119,6 +120,7 @@ public class PeerSync {
public PeerSync(SolrCore core, List<String> replicas, int nUpdates) {
this.replicas = replicas;
this.nUpdates = nUpdates;
+ this.maxUpdates = nUpdates;
uhandler = core.getUpdateHandler();
ulog = uhandler.getUpdateLog();
@@ -271,6 +273,8 @@ public class PeerSync {
if (otherVersions.size() == 0) {
return true;
}
+
+ boolean completeList = otherVersions.size() < nUpdates; // do we have their complete list of updates?
Collections.sort(otherVersions, absComparator);
@@ -295,7 +299,7 @@ public class PeerSync {
List<Long> toRequest = new ArrayList<Long>();
for (Long otherVersion : otherVersions) {
// stop when the entries get old enough that reorders may lead us to see updates we don't need
- if (Math.abs(otherVersion) < ourLowThreshold) break;
+ if (!completeList && Math.abs(otherVersion) < ourLowThreshold) break;
if (ourUpdateSet.contains(otherVersion) || requestedUpdateSet.contains(otherVersion)) {
// we either have this update, or already requested it
@@ -307,11 +311,15 @@ public class PeerSync {
}
sreq.requestedUpdates = toRequest;
-
+
if (toRequest.isEmpty()) {
// we had (or already requested) all the updates referenced by the replica
return true;
}
+
+ if (toRequest.size() > maxRequests) {
+ return false;
+ }
return requestUpdates(srsp, toRequest);
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1290938&r1=1290937&r2=1290938&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java Sun Feb 19 03:57:43 2012
@@ -111,6 +111,7 @@ public class UpdateLog implements Plugin
private volatile UpdateHandler uhandler; // a core reload can change this reference!
private volatile boolean cancelApplyBufferUpdate;
+ List<Long> startingVersions;
public static class LogPtr {
@@ -174,11 +175,28 @@ public class UpdateLog implements Plugin
newestLogOnStartup = oldLog;
versionInfo = new VersionInfo(uhandler, 256);
+
+ UpdateLog.RecentUpdates startingRecentUpdates = getRecentUpdates();
+ try {
+ startingVersions = startingRecentUpdates.getVersions(numRecordsToKeep);
+ // populate recent deletes list (since we can't get that info from the index)
+ for (int i=startingRecentUpdates.deleteList.size()-1; i>=0; i--) {
+ DeleteUpdate du = startingRecentUpdates.deleteList.get(i);
+ oldDeletes.put(new BytesRef(du.id), new LogPtr(-1,du.version));
+ }
+ } finally {
+ startingRecentUpdates.close();
+ }
+
}
public File getLogDir() {
return tlogDir;
}
+
+ public List<Long> getStartingVersions() {
+ return startingVersions;
+ }
/* Takes over ownership of the log, keeping it until no longer needed
and then decrementing it's reference and dropping it.
@@ -623,13 +641,24 @@ public class UpdateLog implements Plugin
TransactionLog log;
long version;
long pointer;
- }
+ }
+
+ static class DeleteUpdate {
+ long version;
+ byte[] id;
+
+ public DeleteUpdate(long version, byte[] id) {
+ this.version = version;
+ this.id = id;
+ }
+ }
public class RecentUpdates {
Deque<TransactionLog> logList; // newest first
List<List<Update>> updateList;
HashMap<Long, Update> updates;
List<Update> deleteByQueryList;
+ List<DeleteUpdate> deleteList;
public List<Long> getVersions(int n) {
@@ -664,10 +693,12 @@ public class UpdateLog implements Plugin
return result;
}
+
private void update() {
int numUpdates = 0;
updateList = new ArrayList<List<Update>>(logList.size());
deleteByQueryList = new ArrayList<Update>();
+ deleteList = new ArrayList<DeleteUpdate>();
updates = new HashMap<Long,Update>(numRecordsToKeep);
for (TransactionLog oldLog : logList) {
@@ -703,6 +734,8 @@ public class UpdateLog implements Plugin
if (oper == UpdateLog.DELETE_BY_QUERY) {
deleteByQueryList.add(update);
+ } else if (oper == UpdateLog.DELETE) {
+ deleteList.add(new DeleteUpdate(version, (byte[])entry.get(2)));
}
break;