You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2016/08/23 16:56:22 UTC

lucene-solr:branch_5_5: SOLR-9310: PeerSync fails on a node restart due to IndexFingerPrint mismatch

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_5_5 c5a868f1d -> 0f2064bf5


SOLR-9310: PeerSync fails on a node restart due to IndexFingerPrint mismatch


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/0f2064bf
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/0f2064bf
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/0f2064bf

Branch: refs/heads/branch_5_5
Commit: 0f2064bf5829b97ccfd0be0a63ae52950666ac43
Parents: c5a868f
Author: Noble Paul <no...@apache.org>
Authored: Tue Aug 23 22:26:11 2016 +0530
Committer: Noble Paul <no...@apache.org>
Committed: Tue Aug 23 22:26:11 2016 +0530

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   2 +
 .../handler/component/RealTimeGetComponent.java |  23 +-
 .../apache/solr/search/SolrIndexSearcher.java   | 177 +++++----
 .../java/org/apache/solr/update/PeerSync.java   |  29 +-
 .../java/org/apache/solr/update/UpdateLog.java  |   7 +-
 .../processor/DistributedUpdateProcessor.java   |  30 +-
 .../solr/cloud/PeerSyncReplicationTest.java     | 363 +++++++++++++++++++
 7 files changed, 528 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f2064bf/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 9a83f95..48b74d7 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -33,6 +33,8 @@ Bug Fixes
 
 * SOLR-9334: CloudSolrClient.collectionStateCache is unbounded (noble)
 
+* SOLR-9310: PeerSync fails on a node restart due to IndexFingerPrint mismatch (Pushkar Raste, noble)
+
 ======================= 5.5.2 =======================
 
 Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f2064bf/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
index b30c786..dc237de 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
@@ -20,6 +20,8 @@ import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -557,14 +559,15 @@ public class RealTimeGetComponent extends SearchComponent
     UpdateLog ulog = req.getCore().getUpdateHandler().getUpdateLog();
     if (ulog == null) return;
 
-    try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
-      rb.rsp.add("versions", recentUpdates.getVersions(nVersions));
-    }
-
     if (doFingerprint) {
       IndexFingerprint fingerprint = IndexFingerprint.getFingerprint(req.getCore(), Long.MAX_VALUE);
       rb.rsp.add("fingerprint", fingerprint.toObject());
     }
+
+    try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
+      List<Long> versions = recentUpdates.getVersions(nVersions);
+      rb.rsp.add("versions", versions);
+    }
   }
 
   
@@ -610,6 +613,18 @@ public class RealTimeGetComponent extends SearchComponent
 
     List<String> versions = StrUtils.splitSmart(versionsStr, ",", true);
 
+    // find fingerprint for max version for which updates are requested
+    boolean doFingerprint = params.getBool("fingerprint", false);
+    if (doFingerprint) {
+      String maxVersionForUpdate = Collections.min(versions, new Comparator<String>() {
+        @Override
+        public int compare(String s1, String s2) {
+          return PeerSync.absComparator.compare(Long.parseLong(s1), Long.parseLong(s2));
+        }
+      });
+      IndexFingerprint fingerprint = IndexFingerprint.getFingerprint(req.getCore(), Math.abs(Long.parseLong(maxVersionForUpdate)));
+      rb.rsp.add("fingerprint", fingerprint.toObject());
+    }
 
     List<Object> updates = new ArrayList<>(versions.size());
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f2064bf/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
index eea5f94..6e7240b 100644
--- a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
+++ b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
@@ -31,7 +31,9 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -79,6 +81,7 @@ import org.apache.solr.schema.TrieIntField;
 import org.apache.solr.schema.TrieLongField;
 import org.apache.solr.search.facet.UnInvertedField;
 import org.apache.solr.search.stats.StatsSource;
+import org.apache.solr.update.IndexFingerprint;
 import org.apache.solr.update.SolrIndexConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -117,7 +120,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
   private final int queryResultMaxDocsCached;
   private final boolean useFilterForSortedQuery;
   public final boolean enableLazyFieldLoading;
-  
+
   private final boolean cachingEnabled;
   private final SolrCache<Query,DocSet> filterCache;
   private final SolrCache<QueryResultKey,DocList> queryResultCache;
@@ -131,7 +134,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
   // list of all caches associated with this searcher.
   private final SolrCache[] cacheList;
   private static final SolrCache[] noCaches = new SolrCache[0];
-  
+
   private final FieldInfos fieldInfos;
   // TODO: do we need this separate set of field names? we can just use the fieldinfos?
   private final Collection<String> fieldNames;
@@ -155,11 +158,13 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
   private final LeafReader leafReader;
   // only for addIndexes etc (no fieldcache)
   private final DirectoryReader rawReader;
-  
+
   private String path;
   private final boolean reserveDirectory;
-  private boolean createdDirectory; 
-  
+  private boolean createdDirectory;
+  private final Map<Long, IndexFingerprint> maxVersionFingerprintCache = new ConcurrentHashMap<>();
+
+
   private static DirectoryReader getReader(SolrCore core, SolrIndexConfig config, DirectoryFactory directoryFactory, String path) throws IOException {
     DirectoryReader reader = null;
     Directory dir = directoryFactory.get(path, DirContext.DEFAULT, config.lockType);
@@ -171,25 +176,25 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
     }
     return reader;
   }
-  
+
   // TODO: wrap elsewhere and return a "map" from the schema that overrides get() ?
   // this reader supports reopen
   private static DirectoryReader wrapReader(SolrCore core, DirectoryReader reader) throws IOException {
     assert reader != null;
     return ExitableDirectoryReader.wrap
-        (UninvertingReader.wrap(reader, core.getLatestSchema().getUninversionMap(reader)), 
+        (UninvertingReader.wrap(reader, core.getLatestSchema().getUninversionMap(reader)),
          SolrQueryTimeoutImpl.getInstance());
   }
 
   /**
-   * Builds the necessary collector chain (via delegate wrapping) and executes the query 
-   * against it.  This method takes into consideration both the explicitly provided collector 
-   * and postFilter as well as any needed collector wrappers for dealing with options 
+   * Builds the necessary collector chain (via delegate wrapping) and executes the query
+   * against it.  This method takes into consideration both the explicitly provided collector
+   * and postFilter as well as any needed collector wrappers for dealing with options
    * specified in the QueryCommand.
    */
   private void buildAndRunCollectorChain(QueryResult qr, Query query,
       Collector collector, QueryCommand cmd, DelegatingCollector postFilter) throws IOException {
-    
+
     final boolean terminateEarly = cmd.getTerminateEarly();
     if (terminateEarly) {
       collector = new EarlyTerminatingCollector(collector, cmd.getLen());
@@ -199,7 +204,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
     if( timeAllowed > 0 ) {
       collector = new TimeLimitingCollector(collector, TimeLimitingCollector.getGlobalCounter(), timeAllowed);
     }
-    
+
     if (postFilter != null) {
       postFilter.setLastDelegate(collector);
       collector = postFilter;
@@ -220,7 +225,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
       ((DelegatingCollector) collector).finish();
     }
   }
-  
+
   public SolrIndexSearcher(SolrCore core, String path, IndexSchema schema, SolrIndexConfig config, String name,
                            boolean enableCache, DirectoryFactory directoryFactory) throws IOException {
     // we don't need to reserve the directory because we get it from the factory
@@ -248,9 +253,9 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
       core.getDeletionPolicy().saveCommitPoint(
           reader.getIndexCommit().getGeneration());
     }
-    
+
     Directory dir = getIndexReader().directory();
-    
+
     this.reserveDirectory = reserveDirectory;
     if (reserveDirectory) {
       // keep the directory from being released while we use it
@@ -265,7 +270,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
     queryResultMaxDocsCached = solrConfig.queryResultMaxDocsCached;
     useFilterForSortedQuery = solrConfig.useFilterForSortedQuery;
     enableLazyFieldLoading = solrConfig.enableLazyFieldLoading;
-    
+
     cachingEnabled=enableCache;
     if (cachingEnabled) {
       ArrayList<SolrCache> clist = new ArrayList<>();
@@ -329,14 +334,14 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
     // We already have our own filter cache
     setQueryCache(null);
 
-    // do this at the end since an exception in the constructor means we won't close    
+    // do this at the end since an exception in the constructor means we won't close
     numOpens.incrementAndGet();
   }
-  
+
   /*
    * Override these two methods to provide a way to use global collection stats.
    */
-  @Override 
+  @Override
   public TermStatistics termStatistics(Term term, TermContext context) throws IOException {
     SolrRequestInfo reqInfo = SolrRequestInfo.getRequestInfo();
     if (reqInfo != null) {
@@ -348,7 +353,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
     }
     return localTermStatistics(term, context);
   }
-  
+
   @Override
   public CollectionStatistics collectionStatistics(String field)
       throws IOException {
@@ -362,11 +367,11 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
     }
     return localCollectionStatistics(field);
   }
-  
+
   public TermStatistics localTermStatistics(Term term, TermContext context) throws IOException {
     return super.termStatistics(term, context);
   }
-  
+
   public CollectionStatistics localCollectionStatistics(String field) throws IOException {
     return super.collectionStatistics(field);
   }
@@ -389,24 +394,24 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
   public final int maxDoc() {
     return reader.maxDoc();
   }
-  
+
   public final int docFreq(Term term) throws IOException {
     return reader.docFreq(term);
   }
-  
+
   public final LeafReader getLeafReader() {
     return leafReader;
   }
-  
+
   /** Raw reader (no fieldcaches etc). Useful for operations like addIndexes */
   public final DirectoryReader getRawReader() {
     return rawReader;
   }
-  
+
   @Override
   public final DirectoryReader getIndexReader() {
     assert reader == super.getIndexReader();
-    return reader; 
+    return reader;
   }
 
   /** Register sub-objects such as caches
@@ -448,7 +453,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
     // super.close();
     // can't use super.close() since it just calls reader.close() and that may only be called once
     // per reader (even if incRef() was previously called).
-    
+
     long cpg = reader.getIndexCommit().getGeneration();
     try {
       if (closeReader) rawReader.decRef();
@@ -470,15 +475,15 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
     if (createdDirectory) {
       directoryFactory.release(getIndexReader().directory());
     }
-   
-    
+
+
     // do this at the end so it only gets done if there are no exceptions
     numCloses.incrementAndGet();
   }
 
   /** Direct access to the IndexSchema for use with this searcher */
   public IndexSchema getSchema() { return schema; }
-  
+
   /**
    * Returns a collection of all field names the index reader knows about.
    */
@@ -603,7 +608,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
 
 
   /* ********************** Document retrieval *************************/
-   
+
   /* Future optimizations (yonik)
    *
    * If no cache is present:
@@ -695,7 +700,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
 
   /** Visit a document's fields using a {@link StoredFieldVisitor}
    *  This method does not currently add to the Solr document cache.
-   * 
+   *
    * @see IndexReader#document(int, StoredFieldVisitor) */
   @Override
   public void doc(int n, StoredFieldVisitor visitor) throws IOException {
@@ -708,7 +713,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
     }
     getIndexReader().document(n, visitor);
   }
-  
+
   /** Executes a stored field visitor against a hit from the document cache */
   private void visitFromCached(Document document, StoredFieldVisitor visitor) throws IOException {
     for (IndexableField f : document) {
@@ -749,12 +754,12 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
    * Retrieve the {@link Document} instance corresponding to the document id.
    * <p>
    * Note: The document will have all fields accessible, but if a field
-   * filter is provided, only the provided fields will be loaded (the 
+   * filter is provided, only the provided fields will be loaded (the
    * remainder will be available lazily).
    */
   @Override
   public Document doc(int i, Set<String> fields) throws IOException {
-    
+
     Document d;
     if (documentCache != null) {
       d = documentCache.get(i);
@@ -859,7 +864,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
   }
 
   /**
-   * Takes a list of docs (the doc ids actually), and reads them into an array 
+   * Takes a list of docs (the doc ids actually), and reads them into an array
    * of Documents.
    */
   public void readDocs(Document[] docs, DocList ids) throws IOException {
@@ -945,7 +950,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
 
       final Terms terms = reader.terms(field);
       if (terms == null) continue;
-      
+
       TermsEnum te = terms.iterator();
       if (te.seekExact(idBytes)) {
         PostingsEnum docs = te.postings(null, PostingsEnum.NONE);
@@ -1225,8 +1230,8 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
           }
           continue;
         }
-      } 
-      
+      }
+
       if (filterCache == null) {
         // there is no cache: don't pull bitsets
         if (notCached == null) notCached = new ArrayList<>(sets.length-end);
@@ -1342,7 +1347,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
         if (sub.postingsEnum == null) continue;
         int base = sub.slice.start;
         int docid;
-        
+
         if (largestPossible > docs.length) {
           if (fbs == null) fbs = new FixedBitSet(maxDoc());
           while ((docid = sub.postingsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
@@ -1373,7 +1378,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
     DocSet result;
     if (fbs != null) {
       for (int i=0; i<upto; i++) {
-        fbs.set(docs[i]);  
+        fbs.set(docs[i]);
       }
       bitsSet += upto;
       result = new BitDocSet(fbs, bitsSet);
@@ -1384,7 +1389,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
     if (useCache) {
       filterCache.put(key, result);
     }
-    
+
     return result;
   }
 
@@ -1465,7 +1470,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
 
 
   /**
-   * Returns documents matching both <code>query</code> and the 
+   * Returns documents matching both <code>query</code> and the
    * intersection of the <code>filterList</code>, sorted by <code>sort</code>.
    * <p>
    * This method is cache aware and may retrieve <code>filter</code> from
@@ -1657,28 +1662,28 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
   }
 
   /**
-   * Helper method for extracting the {@link FieldDoc} sort values from a 
-   * {@link TopFieldDocs} when available and making the appropriate call to 
+   * Helper method for extracting the {@link FieldDoc} sort values from a
+   * {@link TopFieldDocs} when available and making the appropriate call to
    * {@link QueryResult#setNextCursorMark} when applicable.
    *
    * @param qr <code>QueryResult</code> to modify
    * @param qc <code>QueryCommand</code> for context of method
-   * @param topDocs May or may not be a <code>TopFieldDocs</code> 
+   * @param topDocs May or may not be a <code>TopFieldDocs</code>
    */
-  private void populateNextCursorMarkFromTopDocs(QueryResult qr, QueryCommand qc, 
+  private void populateNextCursorMarkFromTopDocs(QueryResult qr, QueryCommand qc,
                                                  TopDocs topDocs) {
     // TODO: would be nice to rename & generalize this method for non-cursor cases...
     // ...would be handy to reuse the ScoreDoc/FieldDoc sort vals directly in distrib sort
     // ...but that has non-trivial queryResultCache implications
     // See: SOLR-5595
-    
+
     if (null == qc.getCursorMark()) {
       // nothing to do, short circuit out
       return;
     }
 
     final CursorMark lastCursorMark = qc.getCursorMark();
-    
+
     // if we have a cursor, then we have a sort that at minimum involves uniqueKey..
     // so we must have a TopFieldDocs containing FieldDoc[]
     assert topDocs instanceof TopFieldDocs : "TopFieldDocs cursor constraint violated";
@@ -1691,7 +1696,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
     } else {
       ScoreDoc lastDoc = scoreDocs[scoreDocs.length-1];
       assert lastDoc instanceof FieldDoc : "FieldDoc cursor constraint violated";
-      
+
       List<Object> lastFields = Arrays.<Object>asList(((FieldDoc)lastDoc).fields);
       CursorMark nextCursorMark = lastCursorMark.createNext(lastFields);
       assert null != nextCursorMark : "null nextCursorMark";
@@ -1700,11 +1705,11 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
   }
 
   /**
-   * Helper method for inspecting QueryCommand and creating the appropriate 
+   * Helper method for inspecting QueryCommand and creating the appropriate
    * {@link TopDocsCollector}
    *
    * @param len the number of docs to return
-   * @param cmd The Command whose properties should determine the type of 
+   * @param cmd The Command whose properties should determine the type of
    *        TopDocsCollector to use.
    */
   private TopDocsCollector buildTopDocsCollector(int len, QueryCommand cmd) throws IOException {
@@ -1729,7 +1734,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
       final boolean fillFields = (null != cursor);
       final FieldDoc searchAfter = (null != cursor ? cursor.getSearchAfterFieldDoc() : null);
       return TopFieldCollector.create(weightedSort, len, searchAfter,
-                                      fillFields, needScores, needScores); 
+                                      fillFields, needScores, needScores);
     }
   }
 
@@ -1745,7 +1750,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
     float[] scores;
 
     boolean needScores = (cmd.getFlags() & GET_SCORES) != 0;
-    
+
     Query query = QueryUtils.makeQueryable(cmd.getQuery());
 
     ProcessedFilter pf = getProcessedFilter(cmd.getFilter(), cmd.getFilterList());
@@ -1766,7 +1771,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
           public void collect(int doc) {
             numHits[0]++;
           }
-          
+
           @Override
           public boolean needsScores() {
             return false;
@@ -1783,16 +1788,16 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
           public void collect(int doc) throws IOException {
             numHits[0]++;
             float score = scorer.score();
-            if (score > topscore[0]) topscore[0]=score;            
+            if (score > topscore[0]) topscore[0]=score;
           }
-          
+
           @Override
           public boolean needsScores() {
             return true;
           }
         };
       }
-      
+
       buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter);
 
       nDocsReturned=0;
@@ -1861,29 +1866,29 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
          collector = setCollector;
        } else {
          final Collector topScoreCollector = new SimpleCollector() {
-          
+
            Scorer scorer;
-           
+
            @Override
           public void setScorer(Scorer scorer) throws IOException {
             this.scorer = scorer;
           }
-           
+
           @Override
           public void collect(int doc) throws IOException {
             float score = scorer.score();
             if (score > topscore[0]) topscore[0] = score;
           }
-          
+
           @Override
           public boolean needsScores() {
             return true;
           }
         };
-        
+
         collector = MultiCollector.wrap(setCollector, topScoreCollector);
        }
-       
+
        buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter);
 
       set = setCollector.getDocSet();
@@ -1903,7 +1908,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
 
       buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter);
 
-      set = setCollector.getDocSet();      
+      set = setCollector.getDocSet();
 
       totalHits = topCollector.getTotalHits();
       assert(totalHits == set.size());
@@ -2025,13 +2030,13 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
     search(qr,qc);
     return qr.getDocListAndSet();
   }
-  
+
 
   /**
-   * Returns documents matching both <code>query</code> and the intersection 
-   * of <code>filterList</code>, sorted by <code>sort</code>.  
+   * Returns documents matching both <code>query</code> and the intersection
+   * of <code>filterList</code>, sorted by <code>sort</code>.
    * Also returns the compete set of documents
-   * matching <code>query</code> and <code>filter</code> 
+   * matching <code>query</code> and <code>filter</code>
    * (regardless of <code>offset</code> and <code>len</code>).
    * <p>
    * This method is cache aware and may retrieve <code>filter</code> from
@@ -2062,10 +2067,10 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
   }
 
   /**
-   * Returns documents matching both <code>query</code> and the intersection 
-   * of <code>filterList</code>, sorted by <code>sort</code>.  
+   * Returns documents matching both <code>query</code> and the intersection
+   * of <code>filterList</code>, sorted by <code>sort</code>.
    * Also returns the compete set of documents
-   * matching <code>query</code> and <code>filter</code> 
+   * matching <code>query</code> and <code>filter</code>
    * (regardless of <code>offset</code> and <code>len</code>).
    * <p>
    * This method is cache aware and may retrieve <code>filter</code> from
@@ -2129,7 +2134,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
    * and sorted by <code>sort</code>.  Also returns the compete set of documents
    * matching <code>query</code> and <code>filter</code> (regardless of <code>offset</code> and <code>len</code>).
    * <p>
-   * This method is cache aware and may make an insertion into the cache 
+   * This method is cache aware and may make an insertion into the cache
    * as a result of this call.
    * <p>
    * FUTURE: The returned DocList may be retrieved from a cache.
@@ -2191,7 +2196,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
       }
       leafCollector.collect(doc-base);
     }
-    
+
     TopDocs topDocs = topCollector.topDocs(0, nDocs);
 
     int nDocsReturned = topDocs.scoreDocs.length;
@@ -2224,7 +2229,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
       return a==absQ ? b.intersectionSize(positiveA) : b.andNotSize(positiveA);
     } else {
       // If there isn't a cache, then do a single filtered query
-      // NOTE: we cannot use FilteredQuery, because BitDocSet assumes it will never 
+      // NOTE: we cannot use FilteredQuery, because BitDocSet assumes it will never
       // have deleted documents, but UninvertedField's doNegative has sets with deleted docs
       TotalHitCountCollector collector = new TotalHitCountCollector();
       BooleanQuery.Builder bq = new BooleanQuery.Builder();
@@ -2262,7 +2267,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
    */
   public int numDocs(Query a, Query b) throws IOException {
     Query absA = QueryUtils.getAbs(a);
-    Query absB = QueryUtils.getAbs(b);     
+    Query absB = QueryUtils.getAbs(b);
     DocSet positiveA = getPositiveDocSet(absA);
     DocSet positiveB = getPositiveDocSet(absB);
 
@@ -2284,7 +2289,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
 
 
   /**
-   * Takes a list of docs (the doc ids actually), and returns an array 
+   * Takes a list of docs (the doc ids actually), and returns an array
    * of Documents containing all of the stored fields.
    */
   public Document[] readDocs(DocList ids) throws IOException {
@@ -2373,6 +2378,22 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable,SolrIn
     return super.explain(QueryUtils.makeQueryable(query), doc);
   }
 
+  /** @lucene.internal
+   * gets a cached version of the IndexFingerprint for this searcher
+   **/
+  public IndexFingerprint getIndexFingerprint(long maxVersion) throws IOException {
+    IndexFingerprint fingerprint = maxVersionFingerprintCache.get(maxVersion);
+    if (fingerprint != null) return fingerprint;
+    // possibly expensive, so prevent more than one thread from calculating it for this searcher
+    synchronized (maxVersionFingerprintCache) {
+      fingerprint = maxVersionFingerprintCache.get(maxVersionFingerprintCache);
+      if (fingerprint != null) return fingerprint;
+      fingerprint = IndexFingerprint.getFingerprint(this, maxVersion);
+      maxVersionFingerprintCache.put(maxVersion, fingerprint);
+      return fingerprint;
+    }
+  }
+
   /////////////////////////////////////////////////////////////////////
   // SolrInfoMBean stuff: Statistics and Module Info
   /////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f2064bf/solr/core/src/java/org/apache/solr/update/PeerSync.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java
index dbc0091..c2fffd6 100644
--- a/solr/core/src/java/org/apache/solr/update/PeerSync.java
+++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java
@@ -86,7 +86,7 @@ public class PeerSync  {
   private SolrCore core;
 
   // comparator that sorts by absolute value, putting highest first
-  private static Comparator<Long> absComparator = new Comparator<Long>() {
+  public static Comparator<Long> absComparator = new Comparator<Long>() {
     @Override
     public int compare(Long o1, Long o2) {
       long l1 = Math.abs(o1);
@@ -140,7 +140,7 @@ public class PeerSync  {
     this.maxUpdates = nUpdates;
     this.cantReachIsSuccess = cantReachIsSuccess;
     this.getNoVersionsIsSuccess = getNoVersionsIsSuccess;
-    this.doFingerprint = doFingerprint;
+    this.doFingerprint = doFingerprint && !("true".equals(System.getProperty("solr.disableFingerprint")));
     this.client = core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getHttpClient();
     this.onlyIfActive = onlyIfActive;
     
@@ -458,9 +458,14 @@ public class PeerSync  {
   private boolean compareFingerprint(SyncShardRequest sreq) {
     if (sreq.fingerprint == null) return true;
     try {
-      IndexFingerprint ourFingerprint = IndexFingerprint.getFingerprint(core, Long.MAX_VALUE);
-      int cmp = IndexFingerprint.compare(ourFingerprint, sreq.fingerprint);
-      log.info("Fingerprint comparison: " + cmp);
+      // check our fingerprint only upto the max version in the other fingerprint.
+      // Otherwise for missed updates (look at missed update test in PeerSyncTest) ourFingerprint won't match with otherFingerprint
+      IndexFingerprint ourFingerprint = IndexFingerprint.getFingerprint(core, sreq.fingerprint.getMaxVersionSpecified());
+      int cmp = IndexFingerprint.compare(sreq.fingerprint, ourFingerprint);
+      log.info("Fingerprint comparison: {}" , cmp);
+      if(cmp != 0) {
+        log.info("Other fingerprint: {}, Our fingerprint: {}", sreq.fingerprint , ourFingerprint);
+      }
       return cmp == 0;  // currently, we only check for equality...
     } catch(IOException e){
       log.error(msg() + "Error getting index fingerprint", e);
@@ -482,6 +487,12 @@ public class PeerSync  {
     sreq.params.set("distrib", false);
     sreq.params.set("getUpdates", StrUtils.join(toRequest, ','));
     sreq.params.set("onlyIfActive", onlyIfActive);
+
+    // fingerprint should really be requested only for the maxversion  we are requesting updates for
+    // In case updates are coming in while node is coming up after restart, node would have already
+    // buffered some of the updates. fingerprint we requested with versions would reflect versions
+    // in our buffer as well and will definitely cause a mismatch
+    sreq.params.set("fingerprint",doFingerprint);
     sreq.responses.clear();  // needs to be zeroed for correct correlation to occur
 
     shardHandler.submit(sreq, sreq.shards[0], sreq.params);
@@ -500,6 +511,14 @@ public class PeerSync  {
       return false;
     }
 
+    // overwrite fingerprint we saved in 'handleVersions()'
+    Object fingerprint = srsp.getSolrResponse().getResponse().get("fingerprint");
+
+    if (fingerprint != null) {
+      sreq.fingerprint = IndexFingerprint.fromObject(fingerprint);
+    }
+
+
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set(DISTRIB_UPDATE_PARAM, FROMLEADER.toString());
     params.set("peersync",true); // debugging

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f2064bf/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 2456c3e..57ee239 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -951,11 +951,16 @@ public class UpdateLog implements PluginInfoInitialized {
       }
     }
 
-    public List<Long> getVersions(int n) {
+    public  List<Long> getVersions(int n){
+      return getVersions(n, Long.MAX_VALUE);
+    }
+
+    public List<Long> getVersions(int n, long maxVersion) {
       List<Long> ret = new ArrayList<>(n);
 
       for (List<Update> singleList : updateList) {
         for (Update ptr : singleList) {
+          if(Math.abs(ptr.version) > Math.abs(maxVersion)) continue;
           ret.add(ptr.version);
           if (--n <= 0) return ret;
         }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f2064bf/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index 4af8277..14c1286 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -1022,7 +1022,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
             // leaders can also be in buffering state during "migrate" API call, see SOLR-5308
             if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE
-                && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+                && isReplayOrPeersync == false) {
               // we're not in an active state, and this update isn't from a replay, so buffer it.
               log.info("Leader logic applied but update log is buffering: " + cmd.getPrintableId());
               cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
@@ -1050,7 +1050,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             // The leader forwarded us this update.
             cmd.setVersion(versionOnUpdate);
 
-            if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+            if (ulog.getState() != UpdateLog.State.ACTIVE && isReplayOrPeersync == false) {
               // we're not in an active state, and this update isn't from a replay, so buffer it.
               cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
               ulog.add(cmd);
@@ -1077,9 +1077,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             }
           }
         }
-        
+
         boolean willDistrib = isLeader && nodes != null && nodes.size() > 0;
-        
+
         SolrInputDocument clonedDoc = null;
         if (willDistrib) {
           clonedDoc = cmd.solrDoc.deepCopy();
@@ -1087,7 +1087,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
         // TODO: possibly set checkDeleteByQueries as a flag on the command?
         doLocalAdd(cmd);
-        
+
         if (willDistrib) {
           cmd.solrDoc = clonedDoc;
         }
@@ -1119,7 +1119,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     } else {
       oldDoc.remove(VERSION_FIELD);
     }
-    
+
 
     cmd.solrDoc = docMerger.merge(sdoc, oldDoc);
     return true;
@@ -1127,9 +1127,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
   @Override
   public void processDelete(DeleteUpdateCommand cmd) throws IOException {
-    
+
     assert TestInjection.injectFailUpdateRequests();
-    
+
     updateCommand = cmd;
 
     if (!cmd.isDeleteById()) {
@@ -1143,12 +1143,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     } else {
       isLeader = getNonZkLeaderAssumption(req);
     }
-    
+
     boolean dropCmd = false;
     if (!forwardToLeader) {
       dropCmd  = versionDelete(cmd);
     }
-    
+
     if (dropCmd) {
       // TODO: do we need to add anything to the response?
       return;
@@ -1241,10 +1241,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     //       - log + execute the local DBQ
     // FROM: we are a replica receiving a DBQ from our leader
     //       - log + execute the local DBQ
-    DistribPhase phase = 
+    DistribPhase phase =
     DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
 
-    DocCollection coll = zkEnabled 
+    DocCollection coll = zkEnabled
       ? zkController.getClusterState().getCollection(collection) : null;
 
     if (zkEnabled && DistribPhase.NONE == phase) {
@@ -1468,7 +1468,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     if (!zkController.getZkClient().getConnectionManager().isLikelyExpired()) {
       return;
     }
-    
+
     throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Cannot talk to ZooKeeper - Updates are disabled.");
   }
 
@@ -1524,7 +1524,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
             // leaders can also be in buffering state during "migrate" API call, see SOLR-5308
             if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE
-                && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+                && !isReplayOrPeersync) {
               // we're not in an active state, and this update isn't from a replay, so buffer it.
               log.info("Leader logic applied but update log is buffering: " + cmd.getId());
               cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
@@ -1549,7 +1549,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
           } else {
             cmd.setVersion(-versionOnUpdate);
 
-            if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+            if (ulog.getState() != UpdateLog.State.ACTIVE && isReplayOrPeersync == false) {
               // we're not in an active state, and this update isn't from a replay, so buffer it.
               cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
               ulog.delete(cmd);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f2064bf/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java b/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
new file mode 100644
index 0000000..458a283
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
@@ -0,0 +1,363 @@
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.ZkTestServer.LimitViolationAction;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.handler.ReplicationHandler;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Test sync peer sync when a node restarts and documents are indexed when node was down.
+ *
+ * This test is modeled after SyncSliceTest
+ */
+@Slow
+public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private boolean success = false;
+  int docId = 0;
+
+  List<CloudJettyRunner> nodesDown = new ArrayList<>();
+
+  @Override
+  public void distribTearDown() throws Exception {
+    if (!success) {
+      printLayoutOnTearDown = true;
+    }
+    System.clearProperty("solr.directoryFactory");
+    System.clearProperty("solr.ulog.numRecordsToKeep");
+    System.clearProperty("tests.zk.violationReportAction");
+    super.distribTearDown();
+  }
+
+  public PeerSyncReplicationTest() {
+    super();
+    sliceCount = 1;
+    fixShardCount(3);
+  }
+
+  protected String getCloudSolrConfig() {
+    return "solrconfig-tlog.xml";
+  }
+
+  @Override
+  public void distribSetUp() throws Exception {
+    // tlog gets deleted after node restarts if we use CachingDirectoryFactory.
+    // make sure that tlog stays intact after we restart a node
+    System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
+    System.setProperty("solr.ulog.numRecordsToKeep", "1000");
+    System.setProperty("tests.zk.violationReportAction", LimitViolationAction.IGNORE.toString());
+    super.distribSetUp();
+  }
+
+  @Test
+  public void test() throws Exception {
+    handle.clear();
+    handle.put("timestamp", SKIPVAL);
+
+    waitForThingsToLevelOut(30);
+
+    del("*:*");
+
+    // index enough docs and commit to establish frame of reference for PeerSync
+    for (int i = 0; i < 100; i++) {
+      indexDoc(id, docId, i1, 50, tlong, 50, t1,
+          "document number " + docId++);
+    }
+    commit();
+    waitForThingsToLevelOut(30);
+
+    try {
+      checkShardConsistency(false, true);
+
+      long cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
+      assertEquals(docId, cloudClientDocs);
+
+      CloudJettyRunner initialLeaderJetty = shardToLeaderJetty.get("shard1");
+      List<CloudJettyRunner> otherJetties = getOtherAvailableJetties(initialLeaderJetty);
+      CloudJettyRunner neverLeader = otherJetties.get(otherJetties.size() - 1);
+      otherJetties.remove(neverLeader) ;
+
+      // first shutdown a node that will never be a leader
+      forceNodeFailures(Arrays.asList(neverLeader));
+
+      // node failure and recovery via PeerSync
+      log.info("Forcing PeerSync");
+      CloudJettyRunner nodePeerSynced = forceNodeFailureAndDoPeerSync(false);
+
+      // add a few more docs
+      indexDoc(id, docId, i1, 50, tlong, 50, t1,
+          "document number " + docId++);
+      indexDoc(id, docId, i1, 50, tlong, 50, t1,
+          "document number " + docId++);
+      commit();
+
+      cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
+      assertEquals(docId, cloudClientDocs);
+
+      // now shutdown all other nodes except for 'nodeShutDownForFailure'
+      otherJetties.remove(nodePeerSynced);
+      forceNodeFailures(otherJetties);
+      waitForThingsToLevelOut(30);
+      checkShardConsistency(false, true);
+
+      // now shutdown the original leader
+      log.info("Now shutting down initial leader");
+      forceNodeFailures(Arrays.asList(initialLeaderJetty));
+      log.info("Updating mappings from zk");
+      Thread.sleep(15000); // sleep for a while for leader to change ...
+      updateMappingsFromZk(jettys, clients, true);
+      assertEquals("PeerSynced node did not become leader", nodePeerSynced, shardToLeaderJetty.get("shard1"));
+
+      // bring up node that was down all along, and let it PeerSync from the node that was forced to PeerSynce  
+      bringUpDeadNodeAndEnsureNoReplication(shardToLeaderJetty.get("shard1"), neverLeader, false);
+      waitTillNodesActive();
+
+      checkShardConsistency(false, true);
+
+
+      // bring back all the nodes including initial leader 
+      // (commented as reports Maximum concurrent create/delete watches above limit violation and reports thread leaks)
+      /*for(int i = 0 ; i < nodesDown.size(); i++) {
+        bringUpDeadNodeAndEnsureNoReplication(shardToLeaderJetty.get("shard1"), neverLeader, false);
+      }
+      checkShardConsistency(false, true);*/
+
+      // make sure leader has not changed after bringing initial leader back
+      assertEquals(nodePeerSynced, shardToLeaderJetty.get("shard1"));
+      success = true;
+    } finally {
+      System.clearProperty("solr.disableFingerprint");
+    }
+  }
+
+
+  private void indexInBackground(final int numDocs) {
+    new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          for (int i = 0; i < numDocs; i++) {
+            PeerSyncReplicationTest.this.indexDoc(id, docId, i1, 50, tlong, 50, t1, "document number " + docId);
+            docId++;
+            // slow down adds, to get documents indexed while in PeerSync
+            Thread.sleep(100);
+          }
+        } catch (Exception e) {
+          log.error("Error indexing doc in background", e);
+          //Throwing an error here will kill the thread
+        }
+      }
+    }, getClassName())
+        .start();
+
+
+  }
+
+
+  private void forceNodeFailures(List<CloudJettyRunner> replicasToShutDown) throws Exception {
+    for (CloudJettyRunner replicaToShutDown : replicasToShutDown) {
+      chaosMonkey.killJetty(replicaToShutDown);
+      waitForNoShardInconsistency();
+    }
+
+    int totalDown = 0;
+
+    Set<CloudJettyRunner> jetties = new HashSet<>();
+    jetties.addAll(shardToJetty.get("shard1"));
+
+    if (replicasToShutDown != null) {
+      jetties.removeAll(replicasToShutDown);
+      totalDown += replicasToShutDown.size();
+    }
+
+    jetties.removeAll(nodesDown);
+    totalDown += nodesDown.size();
+
+    assertEquals(getShardCount() - totalDown, jetties.size());
+
+    nodesDown.addAll(replicasToShutDown);
+
+    Thread.sleep(3000);
+  }
+
+
+
+  private CloudJettyRunner forceNodeFailureAndDoPeerSync(boolean disableFingerprint)
+      throws Exception {
+    // kill non leader - new leader could have all the docs or be missing one
+    CloudJettyRunner leaderJetty = shardToLeaderJetty.get("shard1");
+
+    List<CloudJettyRunner> nonLeaderJetties = getOtherAvailableJetties(leaderJetty);
+    CloudJettyRunner replicaToShutDown = nonLeaderJetties.get(random().nextInt(nonLeaderJetties.size())); // random non leader node
+
+    forceNodeFailures(Arrays.asList(replicaToShutDown));
+
+    // two docs need to be sync'd back when replica restarts
+    indexDoc(id, docId, i1, 50, tlong, 50, t1,
+        "document number " + docId++);
+    indexDoc(id, docId, i1, 50, tlong, 50, t1,
+        "document number " + docId++);
+    commit();
+
+    bringUpDeadNodeAndEnsureNoReplication(leaderJetty, replicaToShutDown, disableFingerprint);
+
+    return replicaToShutDown;
+  }
+
+
+
+  private void bringUpDeadNodeAndEnsureNoReplication(CloudJettyRunner leaderJetty, CloudJettyRunner nodeToBringUp,
+                                                     boolean disableFingerprint) throws Exception {
+    // disable fingerprint check if needed
+    System.setProperty("solr.disableFingerprint", String.valueOf(disableFingerprint));
+
+    long numRequestsBefore = (Long) leaderJetty.jetty
+        .getCoreContainer()
+        .getCores()
+        .iterator()
+        .next()
+        .getRequestHandler(ReplicationHandler.PATH)
+        .getStatistics().get("requests");
+
+    indexInBackground(50);
+
+    // bring back dead node and ensure it recovers
+    ChaosMonkey.start(nodeToBringUp.jetty);
+
+    nodesDown.remove(nodeToBringUp);
+
+    waitTillNodesActive();
+    waitForThingsToLevelOut(30);
+
+    Set<CloudJettyRunner> jetties = new HashSet<>();
+    jetties.addAll(shardToJetty.get("shard1"));
+    jetties.removeAll(nodesDown);
+    assertEquals(getShardCount() - nodesDown.size(), jetties.size());
+
+    long cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
+    assertEquals(docId, cloudClientDocs);
+
+    long numRequestsAfter = (Long) leaderJetty.jetty
+        .getCoreContainer()
+        .getCores()
+        .iterator()
+        .next()
+        .getRequestHandler(ReplicationHandler.PATH)
+        .getStatistics().get("requests");
+
+    assertEquals("PeerSync failed. Had to fail back to replication", numRequestsBefore, numRequestsAfter);
+  }
+
+
+
+  private void waitTillNodesActive() throws Exception {
+    for (int i = 0; i < 60; i++) {
+      Thread.sleep(3000);
+      ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+      ClusterState clusterState = zkStateReader.getClusterState();
+      DocCollection collection1 = clusterState.getCollection("collection1");
+      Slice slice = collection1.getSlice("shard1");
+      Collection<Replica> replicas = slice.getReplicas();
+      boolean allActive = true;
+
+      Collection<Replica> replicasToCheck = new ArrayList<>();
+      for (Replica r : replicas) {
+        if(nodesDown.contains(r.getName())) replicasToCheck.add(r);
+      }
+
+      for (Replica replica : replicasToCheck) {
+        if (!clusterState.liveNodesContain(replica.getNodeName()) || replica.getState() != Replica.State.ACTIVE) {
+          allActive = false;
+          break;
+        }
+      }
+      if (allActive) {
+        return;
+      }
+    }
+    printLayout();
+    fail("timeout waiting to see all nodes active");
+  }
+
+
+
+  private List<CloudJettyRunner> getOtherAvailableJetties(CloudJettyRunner leader) {
+    List<CloudJettyRunner> candidates = new ArrayList<>();
+    candidates.addAll(shardToJetty.get("shard1"));
+
+    if (leader != null) {
+      candidates.remove(leader);
+    }
+
+    candidates.removeAll(nodesDown);
+
+    return candidates;
+  }
+
+
+
+  protected void indexDoc(Object... fields) throws IOException,
+      SolrServerException {
+    SolrInputDocument doc = new SolrInputDocument();
+
+    addFields(doc, fields);
+    addFields(doc, "rnd_s", RandomStringUtils.random(random().nextInt(100) + 100));
+
+    UpdateRequest ureq = new UpdateRequest();
+    ureq.add(doc);
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    ureq.setParams(params);
+    ureq.process(cloudClient);
+  }
+
+  // skip the randoms - they can deadlock...
+  @Override
+  protected void indexr(Object... fields) throws Exception {
+    SolrInputDocument doc = new SolrInputDocument();
+    addFields(doc, fields);
+    addFields(doc, "rnd_b", true);
+    indexDoc(doc);
+  }
+
+}