You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/02/05 18:40:40 UTC

[53/61] [abbrv] lucene-solr git commit: SOLR-8586: add index fingerprinting and use it in peersync

SOLR-8586: add index fingerprinting and use it in peersync


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/629767be
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/629767be
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/629767be

Branch: refs/heads/lucene-6835
Commit: 629767be0686d39995f2afc1f1f267f9d1a68cef
Parents: eee6fa0
Author: yonik <yo...@apache.org>
Authored: Thu Feb 4 14:54:08 2016 -0500
Committer: yonik <yo...@apache.org>
Committed: Thu Feb 4 14:55:13 2016 -0500

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   5 +
 .../org/apache/solr/cloud/SyncStrategy.java     |   2 +-
 .../handler/component/RealTimeGetComponent.java |   8 +
 .../apache/solr/update/IndexFingerprint.java    | 232 +++++++++++++++++++
 .../java/org/apache/solr/update/PeerSync.java   |  78 +++++--
 .../java/org/apache/solr/update/UpdateLog.java  |   4 +-
 .../org/apache/solr/update/PeerSyncTest.java    |  23 ++
 7 files changed, 334 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/629767be/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index ef22a2f..c396d39 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -344,6 +344,10 @@ New Features
 * SOLR-8560: Added RequestStatusState enum which can be used when comparing states of 
   asynchronous requests. (Shai Erera)
 
+* SOLR-8586: added index fingerprint, a hash over all versions currently in the index.
+  PeerSync now uses this to check if replicas are in sync. (yonik)
+
+
 Bug Fixes
 ----------------------
 
@@ -440,6 +444,7 @@ Bug Fixes
 
 * SOLR-8607: The Schema API refuses to add new fields that match existing dynamic fields.
   (Jan Høydahl, Steve Rowe)
+
   
 Optimizations
 ----------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/629767be/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
index 7a16598..d811f5c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
@@ -173,7 +173,7 @@ public class SyncStrategy {
     // if we can't reach a replica for sync, we still consider the overall sync a success
     // TODO: as an assurance, we should still try and tell the sync nodes that we couldn't reach
     // to recover once more?
-    PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true, peerSyncOnlyWithActive);
+    PeerSync peerSync = new PeerSync(core, syncWith, core.getUpdateHandler().getUpdateLog().getNumRecordsToKeep(), true, true, peerSyncOnlyWithActive, false);
     return peerSync.sync();
   }
   

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/629767be/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
index 2e1f729..2f71b3e 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
@@ -66,6 +66,7 @@ import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.search.SolrReturnFields;
 import org.apache.solr.search.SyntaxError;
 import org.apache.solr.update.DocumentBuilder;
+import org.apache.solr.update.IndexFingerprint;
 import org.apache.solr.update.PeerSync;
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.util.RefCounted;
@@ -596,6 +597,8 @@ public class RealTimeGetComponent extends SearchComponent
     int nVersions = params.getInt("getVersions", -1);
     if (nVersions == -1) return;
 
+    boolean doFingerprint = params.getBool("fingerprint", false);
+
     String sync = params.get("sync");
     if (sync != null) {
       processSync(rb, nVersions, sync);
@@ -608,6 +611,11 @@ public class RealTimeGetComponent extends SearchComponent
     try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
       rb.rsp.add("versions", recentUpdates.getVersions(nVersions));
     }
+
+    if (doFingerprint) {
+      IndexFingerprint fingerprint = IndexFingerprint.getFingerprint(req.getCore(), Long.MAX_VALUE);
+      rb.rsp.add("fingerprint", fingerprint.toObject());
+    }
   }
 
   

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/629767be/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
new file mode 100644
index 0000000..c73b57b
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/update/IndexFingerprint.java
@@ -0,0 +1,232 @@
+package org.apache.solr.update;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.net.ConnectException;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.http.NoHttpResponseException;
+import org.apache.http.client.HttpClient;
+import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.lucene.index.LeafReader;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.queries.function.FunctionValues;
+import org.apache.lucene.queries.function.ValueSource;
+import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.Hash;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.component.HttpShardHandlerFactory;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.ShardHandlerFactory;
+import org.apache.solr.handler.component.ShardRequest;
+import org.apache.solr.handler.component.ShardResponse;
+import org.apache.solr.logging.MDCLoggingContext;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.update.processor.UpdateRequestProcessor;
+import org.apache.solr.update.processor.UpdateRequestProcessorChain;
+import org.apache.solr.util.RefCounted;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER;
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+
+/** @lucene.internal */
+public class IndexFingerprint {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private long maxVersionSpecified;
+  private long maxVersionEncountered;
+  private long maxInHash;
+  private long versionsHash;
+  private long numVersions;
+  private long numDocs;
+  private long maxDoc;
+
+  public long getMaxVersionSpecified() {
+    return maxVersionSpecified;
+  }
+
+  public long getMaxVersionEncountered() {
+    return maxVersionEncountered;
+  }
+
+  public long getMaxInHash() {
+    return maxInHash;
+  }
+
+  public long getVersionsHash() {
+    return versionsHash;
+  }
+
+  public long getNumVersions() {
+    return numVersions;
+  }
+
+  public long getNumDocs() {
+    return numDocs;
+  }
+
+  public long getMaxDoc() {
+    return maxDoc;
+  }
+
+  /** Opens a new realtime searcher and returns it's fingerprint */
+  public static IndexFingerprint getFingerprint(SolrCore core, long maxVersion) throws IOException {
+    core.getUpdateHandler().getUpdateLog().openRealtimeSearcher();
+    RefCounted<SolrIndexSearcher> newestSearcher = core.getUpdateHandler().getUpdateLog().uhandler.core.getRealtimeSearcher();
+    try {
+      return getFingerprint(newestSearcher.get(), maxVersion);
+    } finally {
+      if (newestSearcher != null) {
+        newestSearcher.decref();
+      }
+    }
+  }
+
+  public static IndexFingerprint getFingerprint(SolrIndexSearcher searcher, long maxVersion) throws IOException {
+    long start = System.currentTimeMillis();
+
+    SchemaField versionField = VersionInfo.getAndCheckVersionField(searcher.getSchema());
+
+    IndexFingerprint f = new IndexFingerprint();
+    f.maxVersionSpecified = maxVersion;
+    f.maxDoc = searcher.maxDoc();
+
+    // TODO: this could be parallelized, or even cached per-segment if performance becomes an issue
+    ValueSource vs = versionField.getType().getValueSource(versionField, null);
+    Map funcContext = ValueSource.newContext(searcher);
+    vs.createWeight(funcContext, searcher);
+    for (LeafReaderContext ctx : searcher.getTopReaderContext().leaves()) {
+      int maxDoc = ctx.reader().maxDoc();
+      f.numDocs += ctx.reader().numDocs();
+      Bits liveDocs = ctx.reader().getLiveDocs();
+      FunctionValues fv = vs.getValues(funcContext, ctx);
+      for (int doc = 0; doc < maxDoc; doc++) {
+        if (liveDocs != null && !liveDocs.get(doc)) continue;
+        long v = fv.longVal(doc);
+        f.maxVersionEncountered = Math.max(v, f.maxVersionEncountered);
+        if (v <= f.maxVersionSpecified) {
+          f.maxInHash = Math.max(v, f.maxInHash);
+          f.versionsHash += Hash.fmix64(v);
+          f.numVersions++;
+        }
+      }
+    }
+
+    long end = System.currentTimeMillis();
+    log.info("IndexFingerprint millis:" + (end-start) + " result:" + f);
+
+    return f;
+  }
+
+  /** returns 0 for equal, negative if f1 is less recent than f2, positive if more recent */
+  public static int compare(IndexFingerprint f1, IndexFingerprint f2) {
+    int cmp;
+
+    // NOTE: some way want number of docs in index to take precedence over highest version (add-only systems for sure)
+
+    // if we're comparing all of the versions in the index, then go by the highest encountered.
+    if (f1.maxVersionSpecified == Long.MAX_VALUE) {
+      cmp = Long.compare(f1.maxVersionEncountered, f2.maxVersionEncountered);
+      if (cmp != 0) return cmp;
+    }
+
+    // Go by the highest version under the requested max.
+    cmp = Long.compare(f1.maxInHash, f2.maxInHash);
+    if (cmp != 0) return cmp;
+
+    // go by who has the most documents in the index
+    cmp = Long.compare(f1.numVersions, f2.numVersions);
+    if (cmp != 0) return cmp;
+
+    // both have same number of documents, so go by hash
+    cmp = Long.compare(f1.versionsHash, f2.versionsHash);
+    return cmp;
+  }
+
+  /**
+   * Create a generic object suitable for serializing with ResponseWriters
+   */
+  public Object toObject() {
+    Map<String,Object> map = new LinkedHashMap<>();
+    map.put("maxVersionSpecified", maxVersionSpecified);
+    map.put("maxVersionEncountered", maxVersionEncountered);
+    map.put("maxInHash", maxInHash);
+    map.put("versionsHash", versionsHash);
+    map.put("numVersions", numVersions);
+    map.put("numDocs", numDocs);
+    map.put("maxDoc", maxDoc);
+    return map;
+  }
+
+  private static long getLong(Object o, String key, long def) {
+    long v = def;
+
+    Object oval = null;
+    if (o instanceof Map) {
+      oval = ((Map)o).get(key);
+    } else if (o instanceof NamedList) {
+      oval = ((NamedList)o).get(key);
+    }
+
+    return oval != null ? ((Number)oval).longValue() : def;
+  }
+
+  /**
+   * Create an IndexFingerprint object from a deserialized generic object (Map or NamedList)
+   */
+  public static IndexFingerprint fromObject(Object o) {
+    IndexFingerprint f = new IndexFingerprint();
+    f.maxVersionSpecified = getLong(o, "maxVersionSpecified", Long.MAX_VALUE);
+    f.maxVersionEncountered = getLong(o, "maxVersionEncountered", -1);
+    f.maxInHash = getLong(o, "maxInHash", -1);
+    f.versionsHash = getLong(o, "versionsHash", -1);
+    f.numVersions = getLong(o, "numVersions", -1);
+    f.numDocs = getLong(o, "numDocs", -1);
+    f.maxDoc = getLong(o, "maxDoc", -1);
+    return f;
+  }
+
+  @Override
+  public String toString() {
+    return toObject().toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/629767be/solr/core/src/java/org/apache/solr/update/PeerSync.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java
index dacb470..ea71783 100644
--- a/solr/core/src/java/org/apache/solr/update/PeerSync.java
+++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java
@@ -68,6 +68,7 @@ public class PeerSync  {
   private UpdateLog ulog;
   private HttpShardHandlerFactory shardHandlerFactory;
   private ShardHandler shardHandler;
+  private List<SyncShardRequest> requests = new ArrayList<>();
 
   private List<Long> startingVersions;
 
@@ -76,8 +77,10 @@ public class PeerSync  {
   private Set<Long> requestedUpdateSet;
   private long ourLowThreshold;  // 20th percentile
   private long ourHighThreshold; // 80th percentile
+  private long ourHighest;  // currently just used for logging/debugging purposes
   private final boolean cantReachIsSuccess;
   private final boolean getNoVersionsIsSuccess;
+  private final boolean doFingerprint;
   private final HttpClient client;
   private final boolean onlyIfActive;
   private SolrCore core;
@@ -116,6 +119,8 @@ public class PeerSync  {
 
   private static class SyncShardRequest extends ShardRequest {
     List<Long> reportedVersions;
+    IndexFingerprint fingerprint;
+    boolean doFingerprintComparison;
     List<Long> requestedUpdates;
     Exception updateException;
   }
@@ -125,16 +130,17 @@ public class PeerSync  {
   }
   
   public PeerSync(SolrCore core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess) {
-    this(core, replicas, nUpdates, cantReachIsSuccess, getNoVersionsIsSuccess, false);
+    this(core, replicas, nUpdates, cantReachIsSuccess, getNoVersionsIsSuccess, false, true);
   }
   
-  public PeerSync(SolrCore core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean onlyIfActive) {
+  public PeerSync(SolrCore core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess, boolean onlyIfActive, boolean doFingerprint) {
     this.core = core;
     this.replicas = replicas;
     this.nUpdates = nUpdates;
     this.maxUpdates = nUpdates;
     this.cantReachIsSuccess = cantReachIsSuccess;
     this.getNoVersionsIsSuccess = getNoVersionsIsSuccess;
+    this.doFingerprint = doFingerprint;
     this.client = core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getHttpClient();
     this.onlyIfActive = onlyIfActive;
     
@@ -169,9 +175,8 @@ public class PeerSync  {
     return "PeerSync: core="+uhandler.core.getName()+ " url="+myURL +" ";
   }
 
-  /** Returns true if peer sync was successful, meaning that this core may not be considered to have the latest updates
-   *  when considering the last N updates between it and its peers.
-   *  A commit is not performed.
+  /** Returns true if peer sync was successful, meaning that this core may be considered to have the latest updates.
+   * It does not mean that the remote replica is in sync with us.
    */
   public boolean sync() {
     if (ulog == null) {
@@ -210,7 +215,7 @@ public class PeerSync  {
         
         ourLowThreshold = percentile(startingVersions, 0.8f);
         ourHighThreshold = percentile(startingVersions, 0.2f);
-        
+
         // now make sure that the starting updates overlap our updates
         // there shouldn't be reorders, so any overlap will do.
         
@@ -231,6 +236,7 @@ public class PeerSync  {
         }
         
         ourUpdates = newList;
+        Collections.sort(ourUpdates, absComparator);
       } else {
         
         if (ourUpdates.size() > 0) {
@@ -243,9 +249,10 @@ public class PeerSync  {
           return false;
         }
       }
-      
+
+      ourHighest = ourUpdates.get(0);
       ourUpdateSet = new HashSet<>(ourUpdates);
-      requestedUpdateSet = new HashSet<>(ourUpdates);
+      requestedUpdateSet = new HashSet<>();
       
       for (;;) {
         ShardResponse srsp = shardHandler.takeCompletedOrError();
@@ -257,9 +264,18 @@ public class PeerSync  {
           return false;
         }
       }
-      
-      log.info(msg() + "DONE. sync succeeded");
-      return true;
+
+      // finish up any comparisons with other shards that we deferred
+      boolean success = true;
+      for (SyncShardRequest sreq : requests) {
+        if (sreq.doFingerprintComparison) {
+          success = compareFingerprint(sreq);
+          if (!success) break;
+        }
+      }
+
+      log.info(msg() + "DONE. sync " + (success ? "succeeded" : "failed"));
+      return success;
     } finally {
       MDCLoggingContext.clear();
     }
@@ -267,6 +283,7 @@ public class PeerSync  {
   
   private void requestVersions(String replica) {
     SyncShardRequest sreq = new SyncShardRequest();
+    requests.add(sreq);
     sreq.purpose = 1;
     sreq.shards = new String[]{replica};
     sreq.actualShards = sreq.shards;
@@ -274,6 +291,7 @@ public class PeerSync  {
     sreq.params.set("qt","/get");
     sreq.params.set("distrib",false);
     sreq.params.set("getVersions",nUpdates);
+    sreq.params.set("fingerprint",doFingerprint);
     shardHandler.submit(sreq, replica, sreq.params);
   }
 
@@ -355,7 +373,12 @@ public class PeerSync  {
     SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
     sreq.reportedVersions =  otherVersions;
 
-    log.info(msg() + " Received " + otherVersions.size() + " versions from " + sreq.shards[0] );
+    Object fingerprint = srsp.getSolrResponse().getResponse().get("fingerprint");
+
+    log.info(msg() + " Received " + otherVersions.size() + " versions from " + sreq.shards[0] + " fingerprint:" + fingerprint );
+    if (fingerprint != null) {
+      sreq.fingerprint = IndexFingerprint.fromObject(fingerprint);
+    }
 
     if (otherVersions.size() == 0) {
       return getNoVersionsIsSuccess; 
@@ -371,13 +394,14 @@ public class PeerSync  {
     
     long otherHigh = percentile(otherVersions, .2f);
     long otherLow = percentile(otherVersions, .8f);
+    long otherHighest = otherVersions.get(0);
 
     if (ourHighThreshold < otherLow) {
       // Small overlap between version windows and ours is older
       // This means that we might miss updates if we attempted to use this method.
       // Since there exists just one replica that is so much newer, we must
       // fail the sync.
-      log.info(msg() + " Our versions are too old. ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow);
+      log.info(msg() + " Our versions are too old. ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow + " ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
       return false;
     }
 
@@ -385,7 +409,10 @@ public class PeerSync  {
       // Small overlap between windows and ours is newer.
       // Using this list to sync would result in requesting/replaying results we don't need
       // and possibly bringing deleted docs back to life.
-      log.info(msg() + " Our versions are newer. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
+      log.info(msg() + " Our versions are newer. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
+
+      // Because our versions are newer, IndexFingerprint with the remote would not match us.
+      // We return true on our side, but the remote peersync with us should fail.
       return true;
     }
     
@@ -408,9 +435,15 @@ public class PeerSync  {
     sreq.requestedUpdates = toRequest;
     
     if (toRequest.isEmpty()) {
-      log.info(msg() + " Our versions are newer. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
+      log.info(msg() + " No additional versions requested. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh+ " ourHighest=" + ourHighest + " otherHighest=" + otherHighest);
 
       // we had (or already requested) all the updates referenced by the replica
+
+      // If we requested updates from another replica, we can't compare fingerprints yet with this replica, we need to defer
+      if (doFingerprint) {
+        sreq.doFingerprintComparison = true;
+      }
+
       return true;
     }
     
@@ -422,6 +455,19 @@ public class PeerSync  {
     return requestUpdates(srsp, toRequest);
   }
 
+  private boolean compareFingerprint(SyncShardRequest sreq) {
+    if (sreq.fingerprint == null) return true;
+    try {
+      IndexFingerprint ourFingerprint = IndexFingerprint.getFingerprint(core, Long.MAX_VALUE);
+      int cmp = IndexFingerprint.compare(ourFingerprint, sreq.fingerprint);
+      log.info("Fingerprint comparison: " + cmp);
+      return cmp == 0;  // currently, we only check for equality...
+    } catch(IOException e){
+      log.error(msg() + "Error getting index fingerprint", e);
+      return false;
+    }
+  }
+
   private boolean requestUpdates(ShardResponse srsp, List<Long> toRequest) {
     String replica = srsp.getShardRequest().shards[0];
 
@@ -556,7 +602,7 @@ public class PeerSync  {
       }
     }
 
-    return true;
+    return compareFingerprint(sreq);
   }
 
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/629767be/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index c63d807..78c30b9 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -505,7 +505,9 @@ public class UpdateLog implements PluginInfoInitialized {
     }
   }
 
-  /** Opens a new realtime searcher and clears the id caches */
+  /** Opens a new realtime searcher and clears the id caches.
+   * This may also be called when we updates are being buffered (from PeerSync/IndexFingerprint)
+   */
   public void openRealtimeSearcher() {
     synchronized (this) {
       // We must cause a new IndexReader to be opened before anything looks at these caches again

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/629767be/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
index 8083ad0..bcaf846 100644
--- a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
+++ b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
@@ -168,6 +168,29 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
 
     assertSync(client1, numVersions, true, shardsArr[0]);
     client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1);
+
+    // now lets check fingerprinting causes appropriate fails
+    v = 4000;
+    add(client0, seenLeader, sdoc("id",Integer.toString((int)v),"_version_",v));
+    toAdd = numVersions+10;
+    for (int i=0; i<toAdd; i++) {
+      add(client0, seenLeader, sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
+      add(client1, seenLeader, sdoc("id",Integer.toString((int)v+i+1),"_version_",v+i+1));
+    }
+
+    // client0 now has an additional add beyond our window and the fingerprint should cause this to fail
+    assertSync(client1, numVersions, false, shardsArr[0]);
+
+    // lets add the missing document and verify that order doesn't matter
+    add(client1, seenLeader, sdoc("id",Integer.toString((int)v),"_version_",v));
+    assertSync(client1, numVersions, true, shardsArr[0]);
+
+    // lets do some overwrites to ensure that repeated updates and maxDoc don't matter
+    for (int i=0; i<10; i++) {
+      add(client0, seenLeader, sdoc("id", Integer.toString((int) v + i + 1), "_version_", v + i + 1));
+    }
+    assertSync(client1, numVersions, true, shardsArr[0]);
+
   }