You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2016/08/22 07:02:25 UTC

lucene-solr:master: SOLR-9310: PeerSync fails on a node restart due to IndexFingerPrint mismatch

Repository: lucene-solr
Updated Branches:
  refs/heads/master 364a29c0a -> c2e769450


SOLR-9310: PeerSync fails on a node restart due to IndexFingerPrint mismatch


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/c2e76945
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/c2e76945
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/c2e76945

Branch: refs/heads/master
Commit: c2e769450fac21a8f98e818b4783d7dca14cffb8
Parents: 364a29c
Author: Noble Paul <no...@apache.org>
Authored: Mon Aug 22 12:32:13 2016 +0530
Committer: Noble Paul <no...@apache.org>
Committed: Mon Aug 22 12:32:13 2016 +0530

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   4 +-
 .../handler/component/RealTimeGetComponent.java |  23 +-
 .../apache/solr/search/SolrIndexSearcher.java   |  16 +-
 .../java/org/apache/solr/update/PeerSync.java   |  27 +-
 .../java/org/apache/solr/update/UpdateLog.java  |   7 +-
 .../processor/DistributedUpdateProcessor.java   |  10 +-
 .../solr/collection1/conf/solrconfig-tlog.xml   |   2 +-
 .../solr/cloud/PeerSyncReplicationTest.java     | 364 +++++++++++++++++++
 8 files changed, 433 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2e76945/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 5194726..8ccce7e 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -64,8 +64,10 @@ Apache ZooKeeper 3.4.6
 Jetty 9.3.8.v20160314
 
 
-(No Changes)
+Bug Fixes
+----------------------
 
+* SOLR-9310: PeerSync fails on a node restart due to IndexFingerPrint mismatch (Pushkar Raste, noble)
 
 ==================  6.2.0 ==================
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2e76945/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
index 88dbc9d..5d8654c 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
@@ -627,13 +627,21 @@ public class RealTimeGetComponent extends SearchComponent
     UpdateLog ulog = req.getCore().getUpdateHandler().getUpdateLog();
     if (ulog == null) return;
 
-    try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
-      rb.rsp.add("versions", recentUpdates.getVersions(nVersions));
-    }
-
+    //get all available versions by default
+    long maxVersion = Long.MAX_VALUE;
+    // get fingerprint first as it will cause a soft commit
+    // and would avoid mismatch if documents are being actively index especially during PeerSync
     if (doFingerprint) {
       IndexFingerprint fingerprint = IndexFingerprint.getFingerprint(req.getCore(), Long.MAX_VALUE);
       rb.rsp.add("fingerprint", fingerprint.toObject());
+      // if fingerprint is calculated, it makes sense to get only those versions
+      // which were used in computing the the fingerprint
+      maxVersion = fingerprint.getMaxVersionEncountered();
+    }
+
+    try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
+      List<Long> versions = recentUpdates.getVersions(nVersions, maxVersion);
+      rb.rsp.add("versions", versions);
     }
   }
 
@@ -687,6 +695,13 @@ public class RealTimeGetComponent extends SearchComponent
           .collect(Collectors.toList());
     }
 
+    // find fingerprint for max version for which updates are requested
+    boolean doFingerprint = params.getBool("fingerprint", false);
+    if (doFingerprint) {
+      long maxVersionForUpdate = Collections.min(versions, PeerSync.absComparator);
+      IndexFingerprint fingerprint = IndexFingerprint.getFingerprint(req.getCore(), Math.abs(maxVersionForUpdate));
+      rb.rsp.add("fingerprint", fingerprint.toObject());
+    }
 
     List<Object> updates = new ArrayList<>(versions.size());
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2e76945/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
index 0f480c6..de4d6bf 100644
--- a/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
+++ b/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
@@ -147,7 +147,7 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable, SolrI
   private final String path;
   private boolean releaseDirectory;
 
-  private volatile IndexFingerprint fingerprint;
+  private volatile Map<Long,IndexFingerprint> maxVersionFingerprintCache = new HashMap<>();
   private final Object fingerprintLock = new Object();
 
   private static DirectoryReader getReader(SolrCore core, SolrIndexConfig config, DirectoryFactory directoryFactory,
@@ -2380,9 +2380,17 @@ public class SolrIndexSearcher extends IndexSearcher implements Closeable, SolrI
    **/
   public IndexFingerprint getIndexFingerprint(long maxVersion) throws IOException {
     // possibly expensive, so prevent more than one thread from calculating it for this searcher
-    synchronized (fingerprintLock) {
-      if (fingerprint == null) {
-        fingerprint = IndexFingerprint.getFingerprint(this, maxVersion);
+    
+    // TODO what happens if updates came out of order, would cached fingerprint still be valid?
+    // May be caching fingerprint may lead more problems
+    IndexFingerprint fingerprint = maxVersionFingerprintCache.get(maxVersionFingerprintCache);
+    if(fingerprint == null) {
+      synchronized (fingerprintLock) {
+        if (maxVersionFingerprintCache.get(maxVersionFingerprintCache) == null) {
+          log.info("Fingerprint for max version: " + maxVersion + " not found in cache" );
+          maxVersionFingerprintCache.put(maxVersion, IndexFingerprint.getFingerprint(this, maxVersion));
+        }
+        fingerprint = maxVersionFingerprintCache.get(maxVersion) ;
       }
     }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2e76945/solr/core/src/java/org/apache/solr/update/PeerSync.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java
index 79f5ac9..1eb69a5 100644
--- a/solr/core/src/java/org/apache/solr/update/PeerSync.java
+++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java
@@ -564,9 +564,14 @@ public class PeerSync  {
   private boolean compareFingerprint(SyncShardRequest sreq) {
     if (sreq.fingerprint == null) return true;
     try {
-      IndexFingerprint ourFingerprint = IndexFingerprint.getFingerprint(core, Long.MAX_VALUE);
-      int cmp = IndexFingerprint.compare(ourFingerprint, sreq.fingerprint);
-      log.info("Fingerprint comparison: " + cmp);
+      // check our fingerprint only upto the max version in the other fingerprint. 
+      // Otherwise for missed updates (look at missed update test in PeerSyncTest) ourFingerprint won't match with otherFingerprint   
+      IndexFingerprint ourFingerprint = IndexFingerprint.getFingerprint(core, sreq.fingerprint.getMaxVersionSpecified());
+      int cmp = IndexFingerprint.compare(sreq.fingerprint, ourFingerprint);
+      log.info("Fingerprint comparison: {}" , cmp);
+      if(cmp != 0) {
+        log.info("Other fingerprint: {}, Our fingerprint: {}", sreq.fingerprint , ourFingerprint);
+      }
       return cmp == 0;  // currently, we only check for equality...
     } catch(IOException e){
       log.error(msg() + "Error getting index fingerprint", e);
@@ -588,6 +593,12 @@ public class PeerSync  {
     sreq.params.set("distrib", false);
     sreq.params.set("getUpdates", versionsAndRanges);
     sreq.params.set("onlyIfActive", onlyIfActive);
+    
+    // fingerprint should really be requested only for the maxversion  we are requesting updates for
+    // In case updates are coming in while node is coming up after restart, node would have already
+    // buffered some of the updates. fingerprint we requested with versions would reflect versions
+    // in our buffer as well and will definitely cause a mismatch
+    sreq.params.set("fingerprint",doFingerprint);
     sreq.responses.clear();  // needs to be zeroed for correct correlation to occur
 
     shardHandler.submit(sreq, sreq.shards[0], sreq.params);
@@ -602,9 +613,17 @@ public class PeerSync  {
 
     SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
     if (updates.size() < sreq.totalRequestedUpdates) {
-      log.error(msg() + " Requested " + sreq.requestedUpdates.size() + " updates from " + sreq.shards[0] + " but retrieved " + updates.size());
+      log.error(msg() + " Requested " + sreq.totalRequestedUpdates + " updates from " + sreq.shards[0] + " but retrieved " + updates.size());
       return false;
     }
+    
+    // overwrite fingerprint we saved in 'handleVersions()'   
+    Object fingerprint = srsp.getSolrResponse().getResponse().get("fingerprint");
+
+    if (fingerprint != null) {
+      sreq.fingerprint = IndexFingerprint.fromObject(fingerprint);
+    }
+
 
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set(DISTRIB_UPDATE_PARAM, FROMLEADER.toString());

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2e76945/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 2f55e40..ba25acf 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -943,11 +943,16 @@ public class UpdateLog implements PluginInfoInitialized {
       }
     }
 
-    public List<Long> getVersions(int n) {
+    public  List<Long> getVersions(int n){
+      return getVersions(n, Long.MAX_VALUE);
+    }
+
+    public List<Long> getVersions(int n, long maxVersion) {
       List<Long> ret = new ArrayList<>(n);
 
       for (List<Update> singleList : updateList) {
         for (Update ptr : singleList) {
+          if(ptr.version > maxVersion) continue;
           ret.add(ptr.version);
           if (--n <= 0) return ret;
         }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2e76945/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index 9b0e4dc..f19352d 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -1027,7 +1027,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
             // leaders can also be in buffering state during "migrate" API call, see SOLR-5308
             if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE
-                && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+                && isReplayOrPeersync == false) {
               // we're not in an active state, and this update isn't from a replay, so buffer it.
               log.info("Leader logic applied but update log is buffering: " + cmd.getPrintableId());
               cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
@@ -1055,7 +1055,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             // The leader forwarded us this update.
             cmd.setVersion(versionOnUpdate);
 
-            if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+            if (ulog.getState() != UpdateLog.State.ACTIVE && isReplayOrPeersync == false) {
               // we're not in an active state, and this update isn't from a replay, so buffer it.
               cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
               ulog.add(cmd);
@@ -1429,7 +1429,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
         } else {
           cmd.setVersion(-versionOnUpdate);
 
-          if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+          if (ulog.getState() != UpdateLog.State.ACTIVE && isReplayOrPeersync == false) {
             // we're not in an active state, and this update isn't from a replay, so buffer it.
             cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
             ulog.deleteByQuery(cmd);
@@ -1542,7 +1542,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
             // leaders can also be in buffering state during "migrate" API call, see SOLR-5308
             if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE
-                && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+                && !isReplayOrPeersync) {
               // we're not in an active state, and this update isn't from a replay, so buffer it.
               log.info("Leader logic applied but update log is buffering: " + cmd.getId());
               cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
@@ -1567,7 +1567,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
           } else {
             cmd.setVersion(-versionOnUpdate);
 
-            if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+            if (ulog.getState() != UpdateLog.State.ACTIVE && isReplayOrPeersync == false) {
               // we're not in an active state, and this update isn't from a replay, so buffer it.
               cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
               ulog.delete(cmd);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2e76945/solr/core/src/test-files/solr/collection1/conf/solrconfig-tlog.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-tlog.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-tlog.xml
index c6e2f95..315d1be 100644
--- a/solr/core/src/test-files/solr/collection1/conf/solrconfig-tlog.xml
+++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-tlog.xml
@@ -48,7 +48,7 @@
   </requestHandler>
   
   <peerSync>
-    <useRangeVersions>${solr.peerSync.useRangeVersions:false}</useRangeVersions>
+    <useRangeVersions>${solr.peerSync.useRangeVersions:true}</useRangeVersions>
   </peerSync>
 
   <updateHandler class="solr.DirectUpdateHandler2">

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c2e76945/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java b/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
new file mode 100644
index 0000000..d813177
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
@@ -0,0 +1,364 @@
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.ZkTestServer.LimitViolationAction;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.handler.ReplicationHandler;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Test sync peer sync when a node restarts and documents are indexed when node was down.
+ * 
+ * This test is modeled after SyncSliceTest
+ */
+@Slow
+public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private boolean success = false;
+  int docId = 0;
+
+  List<CloudJettyRunner> nodesDown = new ArrayList<>();
+
+  @Override
+  public void distribTearDown() throws Exception {
+    if (!success) {
+      printLayoutOnTearDown = true;
+    }
+    System.clearProperty("solr.directoryFactory");
+    System.clearProperty("solr.ulog.numRecordsToKeep");
+    System.clearProperty("tests.zk.violationReportAction");
+    super.distribTearDown();
+  }
+
+  public PeerSyncReplicationTest() {
+    super();
+    sliceCount = 1;
+    fixShardCount(3);
+  }
+
+  protected String getCloudSolrConfig() {
+    return "solrconfig-tlog.xml";
+  }
+
+  @Override
+  public void distribSetUp() throws Exception {
+    // tlog gets deleted after node restarts if we use CachingDirectoryFactory.
+    // make sure that tlog stays intact after we restart a node
+    System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
+    System.setProperty("solr.ulog.numRecordsToKeep", "1000");
+    System.setProperty("tests.zk.violationReportAction", LimitViolationAction.IGNORE.toString());
+    super.distribSetUp();
+  }
+
+  @Test
+  public void test() throws Exception {
+    handle.clear();
+    handle.put("timestamp", SKIPVAL);
+
+    waitForThingsToLevelOut(30);
+
+    del("*:*");
+
+    // index enough docs and commit to establish frame of reference for PeerSync
+    for (int i = 0; i < 100; i++) {
+      indexDoc(id, docId, i1, 50, tlong, 50, t1,
+          "document number " + docId++);
+    }
+    commit();
+    waitForThingsToLevelOut(30);
+
+    try {
+      checkShardConsistency(false, true);
+
+      long cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
+      assertEquals(docId, cloudClientDocs);
+
+      CloudJettyRunner initialLeaderJetty = shardToLeaderJetty.get("shard1");
+      List<CloudJettyRunner> otherJetties = getOtherAvailableJetties(initialLeaderJetty);
+      CloudJettyRunner neverLeader = otherJetties.get(otherJetties.size() - 1);
+      otherJetties.remove(neverLeader) ;
+
+      // first shutdown a node that will never be a leader
+      forceNodeFailures(Arrays.asList(neverLeader));
+
+      // node failure and recovery via PeerSync
+      log.info("Forcing PeerSync");
+      CloudJettyRunner nodePeerSynced = forceNodeFailureAndDoPeerSync(false);
+
+      // add a few more docs
+      indexDoc(id, docId, i1, 50, tlong, 50, t1,
+          "document number " + docId++);
+      indexDoc(id, docId, i1, 50, tlong, 50, t1,
+          "document number " + docId++);
+      commit();
+
+      cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
+      assertEquals(docId, cloudClientDocs);
+
+      // now shutdown all other nodes except for 'nodeShutDownForFailure'
+      otherJetties.remove(nodePeerSynced);
+      forceNodeFailures(otherJetties);
+      waitForThingsToLevelOut(30);
+      checkShardConsistency(false, true);
+
+      // now shutdown the original leader
+      log.info("Now shutting down initial leader");
+      forceNodeFailures(Arrays.asList(initialLeaderJetty));
+      log.info("Updating mappings from zk");
+      Thread.sleep(15000); // sleep for a while for leader to change ...
+      updateMappingsFromZk(jettys, clients, true);
+      assertEquals("PeerSynced node did not become leader", nodePeerSynced, shardToLeaderJetty.get("shard1"));
+
+      // bring up node that was down all along, and let it PeerSync from the node that was forced to PeerSynce  
+      bringUpDeadNodeAndEnsureNoReplication(shardToLeaderJetty.get("shard1"), neverLeader, false);
+      waitTillNodesActive();
+
+      checkShardConsistency(false, true);
+
+      
+      // bring back all the nodes including initial leader 
+      // (commented as reports Maximum concurrent create/delete watches above limit violation and reports thread leaks)
+      /*for(int i = 0 ; i < nodesDown.size(); i++) {
+        bringUpDeadNodeAndEnsureNoReplication(shardToLeaderJetty.get("shard1"), neverLeader, false);
+      }
+      checkShardConsistency(false, true);*/
+
+      // make sure leader has not changed after bringing initial leader back
+      assertEquals(nodePeerSynced, shardToLeaderJetty.get("shard1"));
+      success = true;
+    } finally {
+      System.clearProperty("solr.disableFingerprint");
+    }
+  }
+
+  
+  private Future<Void> indexInBackground(int numDocs) {
+    ExecutorService executorService = Executors.newSingleThreadExecutor();
+    Future<Void> f = (Future<Void>) executorService.submit(new Callable<Void>() {
+
+      @Override
+      public Void call() throws Exception {
+          for (int i = 0; i < numDocs; i++) {
+            indexDoc(id, docId, i1, 50, tlong, 50, t1, "document number " + docId++);
+            // slow down adds, to get documents indexed while in PeerSync
+            Thread.sleep(100);
+          }
+        return null;
+      }
+    });
+
+    return f;
+  }
+   
+
+  private void forceNodeFailures(List<CloudJettyRunner> replicasToShutDown) throws Exception {
+    for (CloudJettyRunner replicaToShutDown : replicasToShutDown) {
+      chaosMonkey.killJetty(replicaToShutDown);
+      waitForNoShardInconsistency();
+    }
+
+    int totalDown = 0;
+
+    Set<CloudJettyRunner> jetties = new HashSet<>();
+    jetties.addAll(shardToJetty.get("shard1"));
+
+    if (replicasToShutDown != null) {
+      jetties.removeAll(replicasToShutDown);
+      totalDown += replicasToShutDown.size();
+    }
+
+    jetties.removeAll(nodesDown);
+    totalDown += nodesDown.size();
+
+    assertEquals(getShardCount() - totalDown, jetties.size());
+
+    nodesDown.addAll(replicasToShutDown);
+
+    Thread.sleep(3000);
+  }
+  
+  
+
+  private CloudJettyRunner forceNodeFailureAndDoPeerSync(boolean disableFingerprint)
+      throws Exception {
+    // kill non leader - new leader could have all the docs or be missing one
+    CloudJettyRunner leaderJetty = shardToLeaderJetty.get("shard1");
+
+    List<CloudJettyRunner> nonLeaderJetties = getOtherAvailableJetties(leaderJetty);
+    CloudJettyRunner replicaToShutDown = nonLeaderJetties.get(random().nextInt(nonLeaderJetties.size())); // random non leader node
+
+    forceNodeFailures(Arrays.asList(replicaToShutDown));
+
+    // two docs need to be sync'd back when replica restarts
+    indexDoc(id, docId, i1, 50, tlong, 50, t1,
+        "document number " + docId++);
+    indexDoc(id, docId, i1, 50, tlong, 50, t1,
+        "document number " + docId++);
+    commit();
+
+    bringUpDeadNodeAndEnsureNoReplication(leaderJetty, replicaToShutDown, disableFingerprint);
+
+    return replicaToShutDown;
+  }
+  
+  
+
+  private void bringUpDeadNodeAndEnsureNoReplication(CloudJettyRunner leaderJetty, CloudJettyRunner nodeToBringUp,
+      boolean disableFingerprint) throws Exception {
+    // disable fingerprint check if needed
+    System.setProperty("solr.disableFingerprint", String.valueOf(disableFingerprint));
+
+    long numRequestsBefore = (Long) leaderJetty.jetty
+        .getCoreContainer()
+        .getCores()
+        .iterator()
+        .next()
+        .getRequestHandler(ReplicationHandler.PATH)
+        .getStatistics().get("requests");
+
+    indexInBackground(50);
+    
+    // bring back dead node and ensure it recovers
+    ChaosMonkey.start(nodeToBringUp.jetty);
+    
+    nodesDown.remove(nodeToBringUp);
+
+    waitTillNodesActive();
+    waitForThingsToLevelOut(30);
+
+    Set<CloudJettyRunner> jetties = new HashSet<>();
+    jetties.addAll(shardToJetty.get("shard1"));
+    jetties.removeAll(nodesDown);
+    assertEquals(getShardCount() - nodesDown.size(), jetties.size());
+
+    long cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
+    assertEquals(docId, cloudClientDocs);
+
+    long numRequestsAfter = (Long) leaderJetty.jetty
+        .getCoreContainer()
+        .getCores()
+        .iterator()
+        .next()
+        .getRequestHandler(ReplicationHandler.PATH)
+        .getStatistics().get("requests");
+
+    assertEquals("PeerSync failed. Had to fail back to replication", numRequestsBefore, numRequestsAfter);
+  }
+
+  
+  
+  private void waitTillNodesActive() throws Exception {
+    for (int i = 0; i < 60; i++) {
+      Thread.sleep(3000);
+      ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+      ClusterState clusterState = zkStateReader.getClusterState();
+      DocCollection collection1 = clusterState.getCollection("collection1");
+      Slice slice = collection1.getSlice("shard1");
+      Collection<Replica> replicas = slice.getReplicas();
+      boolean allActive = true;
+
+      Collection<Replica> replicasToCheck = null;
+      replicasToCheck = replicas.stream().filter(r -> nodesDown.contains(r.getName()))
+            .collect(Collectors.toList());
+
+      for (Replica replica : replicasToCheck) {
+        if (!clusterState.liveNodesContain(replica.getNodeName()) || replica.getState() != Replica.State.ACTIVE) {
+          allActive = false;
+          break;
+        }
+      }
+      if (allActive) {
+        return;
+      }
+    }
+    printLayout();
+    fail("timeout waiting to see all nodes active");
+  }
+  
+  
+
+  private List<CloudJettyRunner> getOtherAvailableJetties(CloudJettyRunner leader) {
+    List<CloudJettyRunner> candidates = new ArrayList<>();
+    candidates.addAll(shardToJetty.get("shard1"));
+
+    if (leader != null) {
+      candidates.remove(leader);
+    }
+
+    candidates.removeAll(nodesDown);
+
+    return candidates;
+  }
+
+  
+  
+  protected void indexDoc(Object... fields) throws IOException,
+      SolrServerException {
+    SolrInputDocument doc = new SolrInputDocument();
+
+    addFields(doc, fields);
+    addFields(doc, "rnd_s", RandomStringUtils.random(new Random().nextInt(100) + 100));
+
+    UpdateRequest ureq = new UpdateRequest();
+    ureq.add(doc);
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    ureq.setParams(params);
+    ureq.process(cloudClient);
+  }
+
+  // skip the randoms - they can deadlock...
+  @Override
+  protected void indexr(Object... fields) throws Exception {
+    SolrInputDocument doc = new SolrInputDocument();
+    addFields(doc, fields);
+    addFields(doc, "rnd_b", true);
+    indexDoc(doc);
+  }
+
+}