You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/02/05 18:40:40 UTC
[53/61] [abbrv] lucene-solr git commit: SOLR-8586: add index
fingerprinting and use it in peersync
SOLR-8586: add index fingerprinting and use it in peersync
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/629767be
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/629767be
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/629767be
Branch: refs/heads/lucene-6835
Commit: 629767be0686d39995f2afc1f1f267f9d1a68cef
Parents: eee6fa0
Author: yonik <yo...@apache.org>
Authored: Thu Feb 4 14:54:08 2016 -0500
Committer: yonik <yo...@apache.org>
Committed: Thu Feb 4 14:55:13 2016 -0500
----------------------------------------------------------------------
solr/CHANGES.txt | 5 +
.../org/apache/solr/cloud/SyncStrategy.java | 2 +-
.../handler/component/RealTimeGetComponent.java | 8 +
.../apache/solr/update/IndexFingerprint.java | 232 +++++++++++++++++++
.../java/org/apache/solr/update/PeerSync.java | 78 +++++--
.../java/org/apache/solr/update/UpdateLog.java | 4 +-
.../org/apache/solr/update/PeerSyncTest.java | 23 ++
7 files changed, 334 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/629767be/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index ef22a2f..c396d39 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -344,6 +344,10 @@ New Features
* SOLR-8560: Added RequestStatusState enum which can be used when comparing states of
asynchronous requests. (Shai Erera)
+* SOLR-8586: added index fingerprint, a hash over all versions currently in the index.
+ PeerSync now uses this to check if replicas are in sync. (yonik)
+
+
Bug Fixes
----------------------
@@ -440,6 +444,7 @@ Bug Fixes
* SOLR-8607: The Schema API refuses to add new fields that match existing dynamic fields.
(Jan Høydahl, Steve Rowe)
+
Optimizations
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/629767be/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
index 7a16598..d811f5c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
@@ -173,7 +173,7 @@ public class SyncStrategy {
// if we can't reach a replica for sync, we still consider the overall sync a success
// TODO: as an assurance, we should still try and tell the sync nodes that we couldn't reach
// to recover once more?
- PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true, peerSyncOnlyWithActive);
+ PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true, peerSyncOnlyWithActive, false);
return peerSync.sync();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/629767be/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
index 2e1f729..2f71b3e 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
@@ -66,6 +66,7 @@ import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.search.SolrReturnFields;
import org.apache.solr.search.SyntaxError;
import org.apache.solr.update.DocumentBuilder;
+import org.apache.solr.update.IndexFingerprint;
import org.apache.solr.update.PeerSync;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.util.RefCounted;
@@ -596,6 +597,8 @@ public class RealTimeGetComponent extends SearchComponent
int nVersions = params.getInt("getVersions", -1);
if (nVersions == -1) return;
+ boolean doFingerprint = params.getBool("fingerprint", false);
+
String sync = params.get("sync");
if (sync != null) {
processSync(rb, nVersions, sync);
@@ -608,6 +611,11 @@ public class RealTimeGetComponent extends SearchComponent
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
rb.rsp.add("versions", recentUpdates.getVersions(nVersions));
}
+
+ if (doFingerprint) {
+ IndexFingerprint fingerprint = IndexFingerprint.getFingerprint(req.getCore(), Long.MAX_VALUE);
+ rb.rsp.add("fingerprint", fingerprint.toObject());
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/629767be/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
new file mode 100644
index 0000000..c73b57b
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
@@ -0,0 +1,232 @@
+package org.apache.solr.update;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.net.ConnectException;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.http.NoHttpResponseException;
+import org.apache.http.client.HttpClient;
+import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.lucene.index.LeafReader;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.queries.function.FunctionValues;
+import org.apache.lucene.queries.function.ValueSource;
+import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.Hash;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.component.HttpShardHandlerFactory;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.ShardHandlerFactory;
+import org.apache.solr.handler.component.ShardRequest;
+import org.apache.solr.handler.component.ShardResponse;
+import org.apache.solr.logging.MDCLoggingContext;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.update.processor.UpdateRequestProcessor;
+import org.apache.solr.update.processor.UpdateRequestProcessorChain;
+import org.apache.solr.util.RefCounted;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER;
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+
+/** @lucene.internal */
+public class IndexFingerprint {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private long maxVersionSpecified;
+ private long maxVersionEncountered;
+ private long maxInHash;
+ private long versionsHash;
+ private long numVersions;
+ private long numDocs;
+ private long maxDoc;
+
+ public long getMaxVersionSpecified() {
+ return maxVersionSpecified;
+ }
+
+ public long getMaxVersionEncountered() {
+ return maxVersionEncountered;
+ }
+
+ public long getMaxInHash() {
+ return maxInHash;
+ }
+
+ public long getVersionsHash() {
+ return versionsHash;
+ }
+
+ public long getNumVersions() {
+ return numVersions;
+ }
+
+ public long getNumDocs() {
+ return numDocs;
+ }
+
+ public long getMaxDoc() {
+ return maxDoc;
+ }
+
+ /** Opens a new realtime searcher and returns it's fingerprint */
+ public static IndexFingerprint getFingerprint(SolrCore core, long maxVersion) throws IOException {
+ core.getUpdateHandler().getUpdateLog().openRealtimeSearcher();
+ RefCounted<SolrIndexSearcher> newestSearcher = core.getUpdateHandler().getUpdateLog().uhandler.core.getRealtimeSearcher();
+ try {
+ return getFingerprint(newestSearcher.get(), maxVersion);
+ } finally {
+ if (newestSearcher != null) {
+ newestSearcher.decref();
+ }
+ }
+ }
+
+ public static IndexFingerprint getFingerprint(SolrIndexSearcher searcher, long maxVersion) throws IOException {
+ long start = System.currentTimeMillis();
+
+ SchemaField versionField = VersionInfo.getAndCheckVersionField(searcher.getSchema());
+
+ IndexFingerprint f = new IndexFingerprint();
+ f.maxVersionSpecified = maxVersion;
+ f.maxDoc = searcher.maxDoc();
+
+ // TODO: this could be parallelized, or even cached per-segment if performance becomes an issue
+ ValueSource vs = versionField.getType().getValueSource(versionField, null);
+ Map funcContext = ValueSource.newContext(searcher);
+ vs.createWeight(funcContext, searcher);
+ for (LeafReaderContext ctx : searcher.getTopReaderContext().leaves()) {
+ int maxDoc = ctx.reader().maxDoc();
+ f.numDocs += ctx.reader().numDocs();
+ Bits liveDocs = ctx.reader().getLiveDocs();
+ FunctionValues fv = vs.getValues(funcContext, ctx);
+ for (int doc = 0; doc < maxDoc; doc++) {
+ if (liveDocs != null && !liveDocs.get(doc)) continue;
+ long v = fv.longVal(doc);
+ f.maxVersionEncountered = Math.max(v, f.maxVersionEncountered);
+ if (v <= f.maxVersionSpecified) {
+ f.maxInHash = Math.max(v, f.maxInHash);
+ f.versionsHash += Hash.fmix64(v);
+ f.numVersions++;
+ }
+ }
+ }
+
+ long end = System.currentTimeMillis();
+ log.info("IndexFingerprint millis:" + (end-start) + " result:" + f);
+
+ return f;
+ }
+
+ /** returns 0 for equal, negative if f1 is less recent than f2, positive if more recent */
+ public static int compare(IndexFingerprint f1, IndexFingerprint f2) {
+ int cmp;
+
+ // NOTE: some way want number of docs in index to take precedence over highest version (add-only systems for sure)
+
+ // if we're comparing all of the versions in the index, then go by the highest encountered.
+ if (f1.maxVersionSpecified == Long.MAX_VALUE) {
+ cmp = Long.compare(f1.maxVersionEncountered, f2.maxVersionEncountered);
+ if (cmp != 0) return cmp;
+ }
+
+ // Go by the highest version under the requested max.
+ cmp = Long.compare(f1.maxInHash, f2.maxInHash);
+ if (cmp != 0) return cmp;
+
+ // go by who has the most documents in the index
+ cmp = Long.compare(f1.numVersions, f2.numVersions);
+ if (cmp != 0) return cmp;
+
+ // both have same number of documents, so go by hash
+ cmp = Long.compare(f1.versionsHash, f2.versionsHash);
+ return cmp;
+ }
+
+ /**
+ * Create a generic object suitable for serializing with ResponseWriters
+ */
+ public Object toObject() {
+ Map<String,Object> map = new LinkedHashMap<>();
+ map.put("maxVersionSpecified", maxVersionSpecified);
+ map.put("maxVersionEncountered", maxVersionEncountered);
+ map.put("maxInHash", maxInHash);
+ map.put("versionsHash", versionsHash);
+ map.put("numVersions", numVersions);
+ map.put("numDocs", numDocs);
+ map.put("maxDoc", maxDoc);
+ return map;
+ }
+
+ private static long getLong(Object o, String key, long def) {
+ long v = def;
+
+ Object oval = null;
+ if (o instanceof Map) {
+ oval = ((Map)o).get(key);
+ } else if (o instanceof NamedList) {
+ oval = ((NamedList)o).get(key);
+ }
+
+ return oval != null ? ((Number)oval).longValue() : def;
+ }
+
+ /**
+ * Create an IndexFingerprint object from a deserialized generic object (Map or NamedList)
+ */
+ public static IndexFingerprint fromObject(Object o) {
+ IndexFingerprint f = new IndexFingerprint();
+ f.maxVersionSpecified = getLong(o, "maxVersionSpecified", Long.MAX_VALUE);
+ f.maxVersionEncountered = getLong(o, "maxVersionEncountered", -1);
+ f.maxInHash = getLong(o, "maxInHash", -1);
+ f.versionsHash = getLong(o, "versionsHash", -1);
+ f.numVersions = getLong(o, "numVersions", -1);
+ f.numDocs = getLong(o, "numDocs", -1);
+ f.maxDoc = getLong(o, "maxDoc", -1);
+ return f;
+ }
+
+ @Override
+ public String toString() {
+ return toObject().toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/629767be/solr/core/src/java/org/apache/solr/update/PeerSync.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java
index dacb470..ea71783 100644
--- a/solr/core/src/java/org/apache/solr/update/PeerSync.java
+++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java
@@ -68,6 +68,7 @@ public class PeerSync {
private UpdateLog ulog;
private HttpShardHandlerFactory shardHandlerFactory;
private ShardHandler shardHandler;
+ private List<SyncShardRequest> requests = new ArrayList<>();
private List<Long> startingVersions;
@@ -76,8 +77,10 @@ public class PeerSync {
private Set<Long> requestedUpdateSet;
private long ourLowThreshold; // 20th percentile
private long ourHighThreshold; // 80th percentile
+ private long ourHighest; // currently just used for logging/debugging purposes
private final boolean cantReachIsSuccess;
private final boolean getNoVersionsIsSuccess;
+ private final boolean doFingerprint;
private final HttpClient client;
private final boolean onlyIfActive;
private SolrCore core;
@@ -116,6 +119,8 @@ public class PeerSync {
private static class SyncShardRequest extends ShardRequest {
List<Long> reportedVersions;
+ IndexFingerprint fingerprint;
+ boolean doFingerprintComparison;
List<Long> requestedUpdates;
Exception updateException;
}
@@ -125,16 +130,17 @@ public class PeerSync {
}
public PeerSync(SolrCore core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess) {
- this(core, replicas, nUpdates, cantReachIsSuccess, getNoVersionsIsSuccess, false);
+ this(core, replicas, nUpdates, cantReachIsSuccess, getNoVersionsIsSuccess, false, true);
}
- public PeerSync(SolrCore core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean onlyIfActive) {
+ public PeerSync(SolrCore core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean onlyIfActive, boolean doFingerprint) {
this.core = core;
this.replicas = replicas;
this.nUpdates = nUpdates;
this.maxUpdates = nUpdates;
this.cantReachIsSuccess = cantReachIsSuccess;
this.getNoVersionsIsSuccess = getNoVersionsIsSuccess;
+ this.doFingerprint = doFingerprint;
this.client = core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getHttpClient();
this.onlyIfActive = onlyIfActive;
@@ -169,9 +175,8 @@ public class PeerSync {
return "PeerSync: core="+uhandler.core.getName()+ " url="+myURL +" ";
}
- /** Returns true if peer sync was successful, meaning that this core may not be considered to have the latest updates
- * when considering the last N updates between it and its peers.
- * A commit is not performed.
+ /** Returns true if peer sync was successful, meaning that this core may be considered to have the latest updates.
+ * It does not mean that the remote replica is in sync with us.
*/
public boolean sync() {
if (ulog == null) {
@@ -210,7 +215,7 @@ public class PeerSync {
ourLowThreshold = percentile(startingVersions, 0.8f);
ourHighThreshold = percentile(startingVersions, 0.2f);
-
+
// now make sure that the starting updates overlap our updates
// there shouldn't be reorders, so any overlap will do.
@@ -231,6 +236,7 @@ public class PeerSync {
}
ourUpdates = newList;
+ Collections.sort(ourUpdates, absComparator);
} else {
if (ourUpdates.size() > 0) {
@@ -243,9 +249,10 @@ public class PeerSync {
return false;
}
}
-
+
+ ourHighest = ourUpdates.get(0);
ourUpdateSet = new HashSet<>(ourUpdates);
- requestedUpdateSet = new HashSet<>(ourUpdates);
+ requestedUpdateSet = new HashSet<>();
for (;;) {
ShardResponse srsp = shardHandler.takeCompletedOrError();
@@ -257,9 +264,18 @@ public class PeerSync {
return false;
}
}
-
- log.info(msg() + "DONE. sync succeeded");
- return true;
+
+ // finish up any comparisons with other shards that we deferred
+ boolean success = true;
+ for (SyncShardRequest sreq : requests) {
+ if (sreq.doFingerprintComparison) {
+ success = compareFingerprint(sreq);
+ if (!success) break;
+ }
+ }
+
+ log.info(msg() + "DONE. sync " + (success ? "succeeded" : "failed"));
+ return success;
} finally {
MDCLoggingContext.clear();
}
@@ -267,6 +283,7 @@ public class PeerSync {
private void requestVersions(String replica) {
SyncShardRequest sreq = new SyncShardRequest();
+ requests.add(sreq);
sreq.purpose = 1;
sreq.shards = new String[]{replica};
sreq.actualShards = sreq.shards;
@@ -274,6 +291,7 @@ public class PeerSync {
sreq.params.set("qt","/get");
sreq.params.set("distrib",false);
sreq.params.set("getVersions",nUpdates);
+ sreq.params.set("fingerprint",doFingerprint);
shardHandler.submit(sreq, replica, sreq.params);
}
@@ -355,7 +373,12 @@ public class PeerSync {
SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
sreq.reportedVersions = otherVersions;
- log.info(msg() + " Received " + otherVersions.size() + " versions from " + sreq.shards[0] );
+ Object fingerprint = srsp.getSolrResponse().getResponse().get("fingerprint");
+
+ log.info(msg() + " Received " + otherVersions.size() + " versions from " + sreq.shards[0] + " fingerprint:" + fingerprint );
+ if (fingerprint != null) {
+ sreq.fingerprint = IndexFingerprint.fromObject(fingerprint);
+ }
if (otherVersions.size() == 0) {
return getNoVersionsIsSuccess;
@@ -371,13 +394,14 @@ public class PeerSync {
long otherHigh = percentile(otherVersions, .2f);
long otherLow = percentile(otherVersions, .8f);
+ long otherHighest = otherVersions.get(0);
if (ourHighThreshold < otherLow) {
// Small overlap between version windows and ours is older
// This means that we might miss updates if we attempted to use this method.
// Since there exists just one replica that is so much newer, we must
// fail the sync.
- log.info(msg() + " Our versions are too old. ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow);
+ log.info(msg() + " Our versions are too old. ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow + " ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
return false;
}
@@ -385,7 +409,10 @@ public class PeerSync {
// Small overlap between windows and ours is newer.
// Using this list to sync would result in requesting/replaying results we don't need
// and possibly bringing deleted docs back to life.
- log.info(msg() + " Our versions are newer. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
+ log.info(msg() + " Our versions are newer. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
+
+ // Because our versions are newer, IndexFingerprint with the remote would not match us.
+ // We return true on our side, but the remote peersync with us should fail.
return true;
}
@@ -408,9 +435,15 @@ public class PeerSync {
sreq.requestedUpdates = toRequest;
if (toRequest.isEmpty()) {
- log.info(msg() + " Our versions are newer. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
+ log.info(msg() + " No additional versions requested. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
// we had (or already requested) all the updates referenced by the replica
+
+ // If we requested updates from another replica, we can't compare fingerprints yet with this replica, we need to defer
+ if (doFingerprint) {
+ sreq.doFingerprintComparison = true;
+ }
+
return true;
}
@@ -422,6 +455,19 @@ public class PeerSync {
return requestUpdates(srsp, toRequest);
}
+ private boolean compareFingerprint(SyncShardRequest sreq) {
+ if (sreq.fingerprint == null) return true;
+ try {
+ IndexFingerprint ourFingerprint = IndexFingerprint.getFingerprint(core, Long.MAX_VALUE);
+ int cmp = IndexFingerprint.compare(ourFingerprint, sreq.fingerprint);
+ log.info("Fingerprint comparison: " + cmp);
+ return cmp == 0; // currently, we only check for equality...
+ } catch(IOException e){
+ log.error(msg() + "Error getting index fingerprint", e);
+ return false;
+ }
+ }
+
private boolean requestUpdates(ShardResponse srsp, List<Long> toRequest) {
String replica = srsp.getShardRequest().shards[0];
@@ -556,7 +602,7 @@ public class PeerSync {
}
}
- return true;
+ return compareFingerprint(sreq);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/629767be/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index c63d807..78c30b9 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -505,7 +505,9 @@ public class UpdateLog implements PluginInfoInitialized {
}
}
- /** Opens a new realtime searcher and clears the id caches */
+ /** Opens a new realtime searcher and clears the id caches.
+ * This may also be called when we updates are being buffered (from PeerSync/IndexFingerprint)
+ */
public void openRealtimeSearcher() {
synchronized (this) {
// We must cause a new IndexReader to be opened before anything looks at these caches again
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/629767be/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
index 8083ad0..bcaf846 100644
--- a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
+++ b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
@@ -168,6 +168,29 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
assertSync(client1, numVersions, true, shardsArr[0]);
client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1);
+
+ // now lets check fingerprinting causes appropriate fails
+ v = 4000;
+ add(client0, seenLeader, sdoc("id",Integer.toString((int)v),"_version_",v));
+ toAdd = numVersions+10;
+ for (int i=0; i<toAdd; i++) {
+ add(client0, seenLeader, sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
+ add(client1, seenLeader, sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
+ }
+
+ // client0 now has an additional add beyond our window and the fingerprint should cause this to fail
+ assertSync(client1, numVersions, false, shardsArr[0]);
+
+ // lets add the missing document and verify that order doesn't matter
+ add(client1, seenLeader, sdoc("id",Integer.toString((int)v),"_version_",v));
+ assertSync(client1, numVersions, true, shardsArr[0]);
+
+ // lets do some overwrites to ensure that repeated updates and maxDoc don't matter
+ for (int i=0; i<10; i++) {
+ add(client0, seenLeader, sdoc("id", Integer.toString((int) v + i + 1), "_version_", v + i + 1));
+ }
+ assertSync(client1, numVersions, true, shardsArr[0]);
+
}