You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/02/06 17:24:01 UTC

[07/15] lucene-solr git commit: fix more nocommits; add separate test that deleteAll can replicate

fix more nocommits; add separate test that deleteAll can replicate


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/022540e8
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/022540e8
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/022540e8

Branch: refs/heads/jira/lucene-5438-nrt-replication
Commit: 022540e8c23e57046f1b54a9cd485dab5ff4b563
Parents: 3389068
Author: Mike McCandless <mi...@apache.org>
Authored: Wed Jan 27 06:00:12 2016 -0500
Committer: Mike McCandless <mi...@apache.org>
Committed: Wed Jan 27 06:00:12 2016 -0500

----------------------------------------------------------------------
 .../nrt/PreCopyMergedSegmentWarmer.java         |   18 -
 .../lucene/replicator/nrt/PrimaryNode.java      |   18 -
 .../lucene/replicator/nrt/ReplicaNode.java      |   25 +-
 .../lucene/replicator/nrt/NodeProcess.java      |   18 +-
 .../replicator/nrt/SimplePrimaryNode.java       |   53 +-
 .../replicator/nrt/SimpleReplicaNode.java       |   30 +-
 .../lucene/replicator/nrt/SimpleServer.java     |   38 +-
 .../replicator/nrt/TestNRTReplication.java      | 1090 ++--------------
 .../nrt/TestStressNRTReplication.java           | 1160 ++++++++++++++++++
 9 files changed, 1359 insertions(+), 1091 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/022540e8/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PreCopyMergedSegmentWarmer.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PreCopyMergedSegmentWarmer.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PreCopyMergedSegmentWarmer.java
index 1918ede..77f23ab 100644
--- a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PreCopyMergedSegmentWarmer.java
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PreCopyMergedSegmentWarmer.java
@@ -23,30 +23,14 @@ package org.apache.lucene.replicator.nrt;
  *  flushed segment sizes, not merged segments. */
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 
 import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
 import org.apache.lucene.index.LeafReader;
 import org.apache.lucene.index.SegmentCommitInfo;
 import org.apache.lucene.index.SegmentReader;
-import org.apache.lucene.replicator.nrt.CopyJob.OnceDone;
-import org.apache.lucene.store.AlreadyClosedException;
-import org.apache.lucene.store.DataInput;
-import org.apache.lucene.store.DataOutput;
-import org.apache.lucene.store.InputStreamDataInput;
-import org.apache.lucene.store.OutputStreamDataOutput;
-import org.apache.lucene.util.IOUtils;
-import org.apache.lucene.util.ThreadInterruptedException;
 
 // TODO: or ... replica node can do merging locally?  tricky to keep things in sync, when one node merges more slowly than others...
 
@@ -71,8 +55,6 @@ class PreCopyMergedSegmentWarmer extends IndexReaderWarmer {
       filesMetaData.put(fileName, metaData);
     }
 
-    // nocommit if one replica is very slow then it dos's all other replicas?
-
     primary.preCopyMergedSegmentFiles(info, filesMetaData);
     primary.message(String.format(Locale.ROOT, "top: done warm merge " + info + ": took %.3f sec, %.1f MB", (System.nanoTime()-startNS)/1000000000., info.sizeInBytes()/1024/1024.));
     primary.finishedMergedFiles.addAll(filesMetaData.keySet());

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/022540e8/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
index 183f16f..ccd8848 100644
--- a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
@@ -17,43 +17,26 @@ package org.apache.lucene.replicator.nrt;
  * limitations under the License.
  */
 
-import java.io.Closeable;
 import java.io.IOException;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.lucene.codecs.CodecUtil;
-import org.apache.lucene.index.IndexFileNames;
 import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.LogMergePolicy;
-import org.apache.lucene.index.MergePolicy;
 import org.apache.lucene.index.SegmentCommitInfo;
 import org.apache.lucene.index.SegmentInfos;
 import org.apache.lucene.index.StandardDirectoryReader;
 import org.apache.lucene.index.Term;
-import org.apache.lucene.index.TieredMergePolicy;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.SearcherFactory;
 import org.apache.lucene.search.SearcherManager;
 import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.store.AlreadyClosedException;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IOContext;
-import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.RAMFile;
 import org.apache.lucene.store.RAMOutputStream;
-import org.apache.lucene.util.IOUtils;
-import org.apache.lucene.util.PrintStreamInfoStream;
 import org.apache.lucene.util.ThreadInterruptedException;
 
 /*
@@ -178,7 +161,6 @@ public abstract class PrimaryNode extends Node {
     // on xlog replay we are replaying more ops than necessary.
     commitData.put(VERSION_KEY, Long.toString(copyState.version));
     message("top: commit commitData=" + commitData);
-    // nocommit this is now an NRT-visible change!  make test where nothing is indexing and confirm we don't do silly commit + refresh loop forever!
     writer.setCommitData(commitData, false);
     writer.commit();
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/022540e8/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
index af142d5..713c6f1 100644
--- a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
@@ -17,14 +17,7 @@ package org.apache.lucene.replicator.nrt;
  * limitations under the License.
  */
 
-import java.io.EOFException;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.nio.file.NoSuchFileException;
-import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -34,15 +27,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.lucene.codecs.CodecUtil;
 import org.apache.lucene.index.IndexFileNames;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.SegmentInfos;
@@ -52,24 +41,12 @@ import org.apache.lucene.search.SearcherFactory;
 import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.BufferedChecksumIndexInput;
-import org.apache.lucene.store.ByteArrayDataInput;
 import org.apache.lucene.store.ByteArrayIndexInput;
-import org.apache.lucene.store.ChecksumIndexInput;
-import org.apache.lucene.store.DataInput;
-import org.apache.lucene.store.DataOutput;
 import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
 import org.apache.lucene.store.IOContext;
-import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.InputStreamDataInput;
 import org.apache.lucene.store.Lock;
-import org.apache.lucene.store.LockObtainFailedException;
-import org.apache.lucene.store.OutputStreamDataOutput;
-import org.apache.lucene.store.RAMOutputStream;
-import org.apache.lucene.store.RateLimiter;
 import org.apache.lucene.util.IOUtils;
-import org.apache.lucene.util.ThreadInterruptedException;
 
 /** Replica node, that pulls index changes from the primary node by copying newly flushed or merged index files */
 
@@ -544,7 +521,7 @@ abstract class ReplicaNode extends Node {
       synchronized (mergeCopyJobs) {
         for (CopyJob mergeJob : mergeCopyJobs) {
           if (mergeJob.getFileNames().contains(fileName)) {
-            // nocommit can't we .transferAndCancel?
+            // TODO: we could maybe transferAndCancel here?  except CopyJob can't transferAndCancel more than one currently
             message("top: now cancel merge copy job=" + mergeJob + ": file " + fileName + " is now being copied via NRT point");
             mergeJob.cancel("newNRTPoint is copying over the same file", null);
           }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/022540e8/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
index 4e29508..9de2c04 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
@@ -19,18 +19,9 @@ package org.apache.lucene.replicator.nrt;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.lucene.document.Document;
-import org.apache.lucene.store.DataInput;
-import org.apache.lucene.store.DataOutput;
-import org.apache.lucene.store.InputStreamDataInput;
-import org.apache.lucene.store.OutputStreamDataOutput;
 
 /** Parent JVM hold this "wrapper" to refer to each child JVM.  This is roughly equivalent e.g. to a client-side "sugar" API. */
 class NodeProcess implements Closeable {
@@ -234,5 +225,14 @@ class NodeProcess implements Closeable {
     c.flush();
     c.in.readByte();
   }
+
+  public void deleteAllDocuments(Connection c) throws IOException {
+    if (isPrimary == false) {
+      throw new IllegalStateException("only primary can index");
+    }
+    c.out.writeByte(SimplePrimaryNode.CMD_DELETE_ALL_DOCS);
+    c.flush();
+    c.in.readByte();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/022540e8/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
index 18e77ef..0afd1b4 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
@@ -43,17 +43,16 @@ import org.apache.lucene.document.FieldType;
 import org.apache.lucene.document.StringField;
 import org.apache.lucene.document.TextField;
 import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexFileNames;
 import org.apache.lucene.index.IndexOptions;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.LogMergePolicy;
 import org.apache.lucene.index.MergePolicy;
 import org.apache.lucene.index.SegmentCommitInfo;
-import org.apache.lucene.index.SegmentInfos;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.index.TieredMergePolicy;
 import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.SearcherFactory;
 import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.store.DataInput;
@@ -61,8 +60,6 @@ import org.apache.lucene.store.DataOutput;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.MockDirectoryWrapper;
-import org.apache.lucene.store.NIOFSDirectory;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.TestUtil;
@@ -74,6 +71,8 @@ class SimplePrimaryNode extends PrimaryNode {
 
   final int tcpPort;
 
+  final Random random;
+
   // These are updated by parent test process whenever replicas change:
   int[] replicaTCPPorts;
   int[] replicaIDs;
@@ -81,6 +80,10 @@ class SimplePrimaryNode extends PrimaryNode {
   // So we only flip a bit once per file name:
   final Set<String> bitFlipped = Collections.synchronizedSet(new HashSet<>());
 
+  final List<MergePreCopy> warmingSegments = Collections.synchronizedList(new ArrayList<>());
+
+  final boolean doFlipBitsDuringCopy;
+
   static class MergePreCopy {
     final List<Connection> connections = Collections.synchronizedList(new ArrayList<>());
     final Map<String,FileMetaData> files;
@@ -109,11 +112,12 @@ class SimplePrimaryNode extends PrimaryNode {
     }
   }
 
-  final List<MergePreCopy> warmingSegments = Collections.synchronizedList(new ArrayList<>());
-
-  public SimplePrimaryNode(Random random, Path indexPath, int id, int tcpPort, long primaryGen, long forcePrimaryVersion, SearcherFactory searcherFactory) throws IOException {
+  public SimplePrimaryNode(Random random, Path indexPath, int id, int tcpPort, long primaryGen, long forcePrimaryVersion, SearcherFactory searcherFactory,
+                           boolean doFlipBitsDuringCopy) throws IOException {
     super(initWriter(id, random, indexPath), id, primaryGen, forcePrimaryVersion, searcherFactory);
     this.tcpPort = tcpPort;
+    this.random = new Random(random.nextLong());
+    this.doFlipBitsDuringCopy = doFlipBitsDuringCopy;
   }
 
   /** Records currently alive replicas. */
@@ -187,7 +191,7 @@ class SimplePrimaryNode extends PrimaryNode {
       long startNS = System.nanoTime();
       long lastWarnNS = startNS;
 
-      // TODO: maybe ... place some sort of time limit on how long we are willing to wait for slow replicas to finish copying?
+      // TODO: maybe ... place some sort of time limit on how long we are willing to wait for slow replica(s) to finish copying?
       while (preCopy.finished() == false) {
         try {
           Thread.sleep(10);
@@ -241,7 +245,16 @@ class SimplePrimaryNode extends PrimaryNode {
                 message("top: warning: replica socket=" + c.s + " for segment=" + info + " seems to be dead; closing files=" + files.keySet());
                 IOUtils.closeWhileHandlingException(c);
                 it.remove();
+                done = true;
               }
+
+              if (done == false && random.nextInt(1000) == 17) {
+                message("top: warning: now randomly dropping replica from merge warming; files=" + files.keySet());
+                IOUtils.closeWhileHandlingException(c);
+                it.remove();
+                done = true;
+              }
+
             } catch (Throwable t) {
               message("top: ignore exception trying to read byte during warm for segment=" + info + " to replica socket=" + c.s + ": " + t + " files=" + files.keySet());
               IOUtils.closeWhileHandlingException(c);
@@ -368,7 +381,7 @@ class SimplePrimaryNode extends PrimaryNode {
           while (upto < len) {
             int chunk = (int) Math.min(buffer.length, (len-upto));
             in.readBytes(buffer, 0, chunk);
-            if (TestNRTReplication.DO_BIT_FLIPS_DURING_COPY) {
+            if (doFlipBitsDuringCopy) {
               if (random.nextInt(3000) == 17 && bitFlipped.contains(fileName) == false) {
                 bitFlipped.add(fileName);
                 message("file " + fileName + " to R" + replicaID + ": now randomly flipping a bit at byte=" + upto);
@@ -435,6 +448,10 @@ class SimplePrimaryNode extends PrimaryNode {
         handleDeleteDocument(in, out);
         out.writeByte((byte) 1);
         bos.flush();
+      } else if (cmd == CMD_DELETE_ALL_DOCS) {
+        writer.deleteAll();
+        out.writeByte((byte) 1);
+        bos.flush();
       } else if (cmd == CMD_INDEXING_DONE) {
         out.writeByte((byte) 1);
         bos.flush();
@@ -508,6 +525,7 @@ class SimplePrimaryNode extends PrimaryNode {
   static final byte CMD_MARKER_SEARCH = 3;
   static final byte CMD_COMMIT = 4;
   static final byte CMD_CLOSE = 5;
+  static final byte CMD_SEARCH_ALL = 21;
 
   // Send (to primary) the list of currently running replicas:
   static final byte CMD_SET_REPLICAS = 16;
@@ -518,6 +536,7 @@ class SimplePrimaryNode extends PrimaryNode {
   static final byte CMD_UPDATE_DOC = 7;
   static final byte CMD_DELETE_DOC = 8;
   static final byte CMD_INDEXING_DONE = 19;
+  static final byte CMD_DELETE_ALL_DOCS = 22;
 
   // Sent by replica to primary when replica first starts up, so primary can add it to any warming merges:
   static final byte CMD_NEW_REPLICA = 20;
@@ -579,6 +598,22 @@ class SimplePrimaryNode extends PrimaryNode {
         }
         continue outer;
 
+      case CMD_SEARCH_ALL:
+        {
+          Thread.currentThread().setName("search all");
+          IndexSearcher searcher = mgr.acquire();
+          try {
+            long version = ((DirectoryReader) searcher.getIndexReader()).getVersion();
+            int hitCount = searcher.search(new MatchAllDocsQuery(), 1).totalHits;
+            //message("version=" + version + " searcher=" + searcher);
+            out.writeVLong(version);
+            out.writeVInt(hitCount);
+          } finally {
+            mgr.release(searcher);
+          }
+        }
+        continue outer;
+
       case CMD_MARKER_SEARCH:
         {
           Thread.currentThread().setName("msearch");

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/022540e8/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
index 8667df1..27a5547 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
@@ -24,19 +24,16 @@ import java.io.InputStream;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.nio.file.Path;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.SearcherFactory;
 import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.store.DataInput;
@@ -45,7 +42,6 @@ import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.MockDirectoryWrapper;
-import org.apache.lucene.store.NIOFSDirectory;
 import org.apache.lucene.store.RateLimitedIndexOutput;
 import org.apache.lucene.store.RateLimiter;
 import org.apache.lucene.util.LuceneTestCase;
@@ -182,7 +178,7 @@ class SimpleReplicaNode extends ReplicaNode {
         break;
 
       case SimplePrimaryNode.CMD_GET_SEARCHING_VERSION:
-        // nocommit this is hacky:
+        // This is called when primary has crashed and we need to elect a new primary from all the still running replicas:
 
         // Tricky: if a sync is just finishing up, i.e. managed to finish copying all files just before we crashed primary, and is now
         // in the process of opening a new reader, we need to wait for it, to be sure we really pick the most current replica:
@@ -190,7 +186,7 @@ class SimpleReplicaNode extends ReplicaNode {
           message("top: getSearchingVersion: now wait for finish sync");
           // TODO: use immediate concurrency instead of polling:
           while (isCopying() && stop.get() == false) {
-            Thread.sleep(50);
+            Thread.sleep(10);
             message("top: curNRTCopy=" + curNRTCopy);
           }
           message("top: getSearchingVersion: done wait for finish sync");
@@ -212,6 +208,24 @@ class SimpleReplicaNode extends ReplicaNode {
             //node.message("version=" + version + " searcher=" + searcher);
             out.writeVLong(version);
             out.writeVInt(hitCount);
+            bos.flush();
+          } finally {
+            mgr.release(searcher);
+          }
+        }
+        continue outer;
+
+      case SimplePrimaryNode.CMD_SEARCH_ALL:
+        {
+          Thread.currentThread().setName("search all");
+          IndexSearcher searcher = mgr.acquire();
+          try {
+            long version = ((DirectoryReader) searcher.getIndexReader()).getVersion();
+            int hitCount = searcher.search(new MatchAllDocsQuery(), 1).totalHits;
+            //node.message("version=" + version + " searcher=" + searcher);
+            out.writeVLong(version);
+            out.writeVInt(hitCount);
+            bos.flush();
           } finally {
             mgr.release(searcher);
           }
@@ -227,6 +241,7 @@ class SimpleReplicaNode extends ReplicaNode {
             int hitCount = searcher.search(new TermQuery(new Term("marker", "marker")), 1).totalHits;
             out.writeVLong(version);
             out.writeVInt(hitCount);
+            bos.flush();
           } finally {
             mgr.release(searcher);
           }
@@ -290,6 +305,7 @@ class SimpleReplicaNode extends ReplicaNode {
       default:
         throw new IllegalArgumentException("unrecognized cmd=" + cmd);
       }
+      System.out.println("NOW FLUSH");
       bos.flush();
 
       break;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/022540e8/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java
index 7a257de..5b04721 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java
@@ -19,61 +19,32 @@ package org.apache.lucene.replicator.nrt;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
-import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
 import java.lang.reflect.Method;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketException;
 import java.nio.file.Path;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.nio.file.Paths;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.lucene.analysis.MockAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.FieldType;
-import org.apache.lucene.document.StringField;
-import org.apache.lucene.document.TextField;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexOptions;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.store.DataInput;
 import org.apache.lucene.store.DataOutput;
-import org.apache.lucene.store.IOContext;
-import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.InputStreamDataInput;
-import org.apache.lucene.store.MockDirectoryWrapper;
-import org.apache.lucene.store.NIOFSDirectory;
 import org.apache.lucene.store.OutputStreamDataOutput;
 import org.apache.lucene.util.Constants;
 import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
 import org.apache.lucene.util.LuceneTestCase.SuppressSysoutChecks;
-import org.apache.lucene.util.LuceneTestCase;
-import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.TestUtil;
 
 /** Child process with silly naive TCP socket server to handle
@@ -262,8 +233,9 @@ public class SimpleServer extends LuceneTestCase {
     long primaryGen = Long.parseLong(System.getProperty("tests.nrtreplication.primaryGen"));
     Node.globalStartNS = Long.parseLong(System.getProperty("tests.nrtreplication.startNS"));
 
-    boolean doRandomCrash = isPrimary ? TestNRTReplication.DO_CRASH_PRIMARY : TestNRTReplication.DO_CRASH_REPLICA;
-    boolean doRandomClose = isPrimary ? false : TestNRTReplication.DO_CLOSE_REPLICA;
+    boolean doRandomCrash = "true".equals(System.getProperty("tests.nrtreplication.doRandomCrash"));
+    boolean doRandomClose = "true".equals(System.getProperty("tests.nrtreplication.doRandomClose"));
+    boolean doFlipBitsDuringCopy = "true".equals(System.getProperty("tests.nrtreplication.doFlipBitsDuringCopy"));
 
     // Create server socket that we listen for incoming requests on:
     try (final ServerSocket ss = new ServerSocket(0)) {
@@ -272,7 +244,7 @@ public class SimpleServer extends LuceneTestCase {
       System.out.println("\nPORT: " + tcpPort);
       final Node node;
       if (isPrimary) {
-        node = new SimplePrimaryNode(random(), indexPath, id, tcpPort, primaryGen, forcePrimaryVersion, null);
+        node = new SimplePrimaryNode(random(), indexPath, id, tcpPort, primaryGen, forcePrimaryVersion, null, doFlipBitsDuringCopy);
         System.out.println("\nCOMMIT VERSION: " + ((PrimaryNode) node).getLastCommitVersion());
       } else {
         node = new SimpleReplicaNode(random(), id, tcpPort, indexPath, primaryGen, primaryTCPPort, null);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/022540e8/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
index 5a073ff..755b234 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
@@ -18,499 +18,39 @@ package org.apache.lucene.replicator.nrt;
  */
 
 import java.io.BufferedReader;
-import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.UnsupportedEncodingException;
-import java.io.Writer;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.nio.charset.MalformedInputException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
-import org.apache.lucene.analysis.MockAnalyzer;
 import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.index.ConcurrentMergeScheduler;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.SegmentInfos;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.ScoreDoc;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.search.TopDocs;
-import org.apache.lucene.store.AlreadyClosedException;
-import org.apache.lucene.store.DataInput;
-import org.apache.lucene.store.DataOutput;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.InputStreamDataInput;
-import org.apache.lucene.store.MockDirectoryWrapper;
-import org.apache.lucene.store.NIOFSDirectory;
-import org.apache.lucene.store.OutputStreamDataOutput;
-import org.apache.lucene.store.RateLimiter;
-import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.LineFileDocs;
+import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
 import org.apache.lucene.util.LuceneTestCase.SuppressSysoutChecks;
-import org.apache.lucene.util.LuceneTestCase;
-import org.apache.lucene.util.TestUtil;
-import org.apache.lucene.util.ThreadInterruptedException;
 
 import com.carrotsearch.randomizedtesting.SeedUtils;
 
-// nocommit sometimes have one replica be really slow at copying / have random pauses (fake GC) / etc.
-
-// nocommit randomly p.destroy() one replica?
-
-/*
-  TODO
-    - why do we do the "rename temp to actual" all at the end...?  what really does that buy us?
-    - replica should also track maxSegmentName its seen, and tap into inflateGens if it's later promoted to primary?
-    - test should not print scary exceptions and then succeed!
-    - since all nodes are local, we could have a different test only impl that just does local file copies instead of via tcp...
-    - are the pre-copied-completed-merged files not being cleared in primary?
-      - hmm the logic isn't right today?  a replica may skip pulling a given copy state, that recorded the finished merged segments?
-    - beast & fix bugs
-    - graceful cluster restart
-    - better translog integration
-    - get "graceful primary shutdown" working
-    - there is still some global state we rely on for "correctness", e.g. lastPrimaryVersion
-    - clean up how version is persisted in commit data
-    - why am i not using hashes here?  how does ES use them?
-    - get all other "single shard" functions working too: this cluster should "act like" a single shard
-      - SLM
-      - controlled nrt reopen thread / returning long gen on write
-      - live field values
-      - add indexes
-    - make cluster level APIs to search, index, that deal w/ primary failover, etc.
-    - must prune xlog
-      - refuse to start primary unless we have quorum
-    - later
-      - if we named index files using segment's ID we wouldn't have file name conflicts after primary crash / rollback?
-      - back pressure on indexing if replicas can't keep up?
-      - get xlog working on top?  needs to be checkpointed, so we can correlate IW ops to NRT reader version and prune xlog based on commit
-        quorum
-        - maybe fix IW to return "gen" or "seq id" or "segment name" or something?
-      - replica can copy files from other replicas too / use multicast / rsync / something
-      - each replica could also pre-open a SegmentReader after pre-copy when warming a merge
-      - we can pre-copy newly flushed files too, for cases where reopen rate is low vs IW's flushing because RAM buffer is full
-      - opto: pre-copy files as they are written; if they will become CFS, we can build CFS on the replica?
-      - what about multiple commit points?
-      - fix primary to init directly from an open replica, instead of having to commit/close the replica first
-*/
-
-// Tricky cases:
-//   - we are pre-copying a merge, then replica starts up part way through, so it misses that pre-copy and must do it on next nrt point
-//   - a down replica starts up, but it's "from the future" vs the current primary, and must copy over file names with different contents
-//     but referenced by its latest commit point, so it must fully remove that commit ... which is a hazardous window
-//   - replica comes up just as the primary is crashing / moving
-//   - electing a new primary when a replica is just finishing its nrt sync: we need to wait for it so we are sure to get the "most up to
-//     date" replica
-//   - replica comes up after merged segment finished so it doesn't copy over the merged segment "promptly" (i.e. only sees it on NRT refresh)
-
-/**
- * Test case showing how to implement NRT replication.  This test spawns a sub-process per-node, running TestNRTReplicationChild.
- *
- * One node is primary, and segments are periodically flushed there, then concurrently the N replica nodes copy the new files over and open new readers, while
- * primary also opens a new reader.
- *
- * Nodes randomly crash and are restarted.  If the primary crashes, a replica is promoted.
- *
- * Merges are currently first finished on the primary and then pre-copied out to replicas with a merged segment warmer so they don't block
- * ongoing NRT reopens.  Probably replicas could do their own merging instead, but this is more complex and may not be better overall
- * (merging takes a lot of IO resources).
- *
- * Slow network is simulated with a RateLimiter.
- */
-
-// nocommit randomly delete all doc sometimes, 1) using IW.deleteAll and 2) doing it inefficiently (by query, by id)
-
 // MockRandom's .sd file has no index header/footer:
 @SuppressCodecs({"MockRandom", "Memory", "Direct", "SimpleText"})
 @SuppressSysoutChecks(bugUrl = "Stuff gets printed, important stuff for debugging a failure")
 public class TestNRTReplication extends LuceneTestCase {
 
-  // Test evilness controls:
-
-  /** Randomly crash the current primary (losing data!) and promote the "next best" replica. */
-  static final boolean DO_CRASH_PRIMARY = true;
-
-  /** Randomly crash (JVM core dumps) a replica; it will later randomly be restarted and sync itself. */
-  static final boolean DO_CRASH_REPLICA = true;
-
-  /** Randomly gracefully close a replica; it will later be restarted and sync itself. */
-  static final boolean DO_CLOSE_REPLICA = true;
-
-  /** If false, all child + parent output is interleaved into single stdout/err */
-  static final boolean SEPARATE_CHILD_OUTPUT = false;
-
-  // nocommit DO_CLOSE_PRIMARY?
-
-  /** Randomly crash whole cluster and then restart it */
-  static final boolean DO_FULL_CLUSTER_CRASH = true;
-
-  /** True if we randomly flip a bit while copying files out */
-  static final boolean DO_BIT_FLIPS_DURING_COPY = true;
-
-  /** Set to a non-null value to force exactly that many nodes; else, it's random. */
-  static final Integer NUM_NODES = null;
-
-  static final boolean DO_RANDOM_XLOG_REPLAY = false;
-
-  final AtomicBoolean failed = new AtomicBoolean();
-
-  final AtomicBoolean stop = new AtomicBoolean();
-
   /** cwd where we start each child (server) node */
   private Path childTempDir;
 
-  long primaryGen;
-
-  volatile long lastPrimaryVersion;
-
-  volatile NodeProcess primary;
-  volatile NodeProcess[] nodes;
-  volatile long[] nodeTimeStamps;
-  volatile boolean[] starting;
-  
-  Path[] indexPaths;
-
-  Path transLogPath;
-  SimpleTransLog transLog;
-  final AtomicInteger markerUpto = new AtomicInteger();
-
-  /** Maps searcher version to how many hits the query body:the matched. */
-  final Map<Long,Integer> hitCounts = new ConcurrentHashMap<>();
-
-  /** Maps searcher version to how many marker documents matched.  This should only ever grow (we never delete marker documents). */
-  final Map<Long,Integer> versionToMarker = new ConcurrentHashMap<>();
-
-  /** Maps searcher version to xlog location when refresh of this version started. */
-  final Map<Long,Long> versionToTransLogLocation = new ConcurrentHashMap<>();
-
-  public void test() throws Exception {
-
-    Node.globalStartNS = System.nanoTime();
-
-    message("change thread name from " + Thread.currentThread().getName());
-    Thread.currentThread().setName("main");
-
-    childTempDir = createTempDir("child");
-
-    // We are parent process:
-
-    // Silly bootstrapping:
-    versionToTransLogLocation.put(0L, 0L);
-    versionToTransLogLocation.put(1L, 0L);
-
-    int numNodes;
-
-    if (NUM_NODES == null) {
-      numNodes = TestUtil.nextInt(random(), 2, 10);
-    } else {
-      numNodes = NUM_NODES.intValue();
-    }
-
-    System.out.println("TEST: using " + numNodes + " nodes");
-
-    transLogPath = createTempDir("NRTReplication").resolve("translog");
-    transLog = new SimpleTransLog(transLogPath);
-
-    //state.rateLimiters = new RateLimiter[numNodes];
-    indexPaths = new Path[numNodes];
-    nodes = new NodeProcess[numNodes];
-    nodeTimeStamps = new long[numNodes];
-    Arrays.fill(nodeTimeStamps, Node.globalStartNS);
-    starting = new boolean[numNodes];
-    
-    for(int i=0;i<numNodes;i++) {
-      indexPaths[i] = createTempDir("index" + i);
-    }
-
-    Thread[] indexers = new Thread[TestUtil.nextInt(random(), 1, 3)];
-    System.out.println("TEST: launch " + indexers.length + " indexer threads");
-    for(int i=0;i<indexers.length;i++) {
-      indexers[i] = new IndexThread();
-      indexers[i].setName("indexer" + i);
-      indexers[i].setDaemon(true);
-      indexers[i].start();
-    }
-
-    Thread[] searchers = new Thread[TestUtil.nextInt(random(), 1, 3)];
-    System.out.println("TEST: launch " + searchers.length + " searcher threads");
-    for(int i=0;i<searchers.length;i++) {
-      searchers[i] = new SearchThread();
-      searchers[i].setName("searcher" + i);
-      searchers[i].setDaemon(true);
-      searchers[i].start();
-    }
-
-    Thread restarter = new RestartThread();
-    restarter.setName("restarter");
-    restarter.setDaemon(true);
-    restarter.start();
-
-    int runTimeSec;
-    if (TEST_NIGHTLY) {
-      runTimeSec = RANDOM_MULTIPLIER * TestUtil.nextInt(random(), 120, 240);
-    } else {
-      runTimeSec = RANDOM_MULTIPLIER * TestUtil.nextInt(random(), 45, 120);
-    }
-
-    System.out.println("TEST: will run for " + runTimeSec + " sec");
-
-    long endTime = System.nanoTime() + runTimeSec*1000000000L;
-
-    sendReplicasToPrimary();
-
-    while (failed.get() == false && System.nanoTime() < endTime) {
-
-      // Wait a bit:
-      Thread.sleep(TestUtil.nextInt(random(), Math.min(runTimeSec*4, 200), runTimeSec*4));
-      if (primary != null && random().nextBoolean()) {
-        message("top: now flush primary");
-        NodeProcess curPrimary = primary;
-        if (curPrimary != null) {
-
-          // Save these before we start flush:
-          long nextTransLogLoc = transLog.getNextLocation();
-          int markerUptoSav = markerUpto.get();
-
-          long result;
-          try {
-            result = primary.flush();
-          } catch (Throwable t) {
-            message("top: flush failed; skipping: " + t.getMessage());
-            result = -1;
-          }
-          if (result > 0) {
-            // There were changes
-            lastPrimaryVersion = result;
-            addTransLogLoc(lastPrimaryVersion, nextTransLogLoc);
-            addVersionMarker(lastPrimaryVersion, markerUptoSav);
-          }
-        }
-      }
-
-      StringBuilder sb = new StringBuilder();
-      int liveCount = 0;
-      for(int i=0;i<nodes.length;i++) {
-        NodeProcess node = nodes[i];
-        if (node != null) {
-          if (sb.length() != 0) {
-            sb.append(" ");
-          }
-          liveCount++;
-          if (node.isPrimary) {
-            sb.append('P');
-          } else {
-            sb.append('R');
-          }
-          sb.append(i);
-        }
-      }
-
-      message("PG=" + (primary == null ? "X" : primaryGen) + " " + liveCount + " (of " + nodes.length + ") nodes running: " + sb);
-
-      // Commit a random node, primary or replica
-
-      {
-        NodeProcess node = nodes[random().nextInt(nodes.length)];
-        if (node != null) {
-          // TODO: if this node is primary, it means we committed a "partial" version (not exposed as an NRT point)... not sure it matters.
-          // maybe we somehow allow IW to commit a specific sis (the one we just flushed)?
-          message("top: now commit node=" + node);
-          node.commitAsync();
-        }
-      }
-    }
-
-    message("TEST: top: test done, now close");
-    stop.set(true);
-    for(Thread thread : indexers) {
-      thread.join();
-    }
-    for(Thread thread : searchers) {
-      thread.join();
-    }
-    restarter.join();
-
-    // Close replicas before primary so we cancel any in-progres replications:
-    System.out.println("TEST: top: now close replicas");
-    List<Closeable> toClose = new ArrayList<>();
-    for(NodeProcess node : nodes) {
-      if (node != primary && node != null) {
-        toClose.add(node);
-      }
-    }
-    IOUtils.close(toClose);
-    IOUtils.close(primary);
-    IOUtils.close(transLog);
-
-    if (failed.get() == false) {
-      message("TEST: top: now checkIndex");    
-      for(Path path : indexPaths) {
-        message("TEST: check " + path);
-        MockDirectoryWrapper dir = newMockFSDirectory(path);
-        // Just too slow otherwise
-        dir.setCrossCheckTermVectorsOnClose(false);
-        dir.close();
-      }
-    } else {
-      message("TEST: failed; skip checkIndex");
-    }
-  }
-
-  private boolean anyNodesStarting() {
-    for(int id=0;id<nodes.length;id++) {
-      if (starting[id]) {
-        return true;
-      }
-    }
-
-    return false;
-  }
-
-  /** Picks a replica and promotes it as new primary. */
-  private void promoteReplica() throws IOException {
-    message("top: primary crashed; now pick replica to promote");
-    long maxSearchingVersion = -1;
-    NodeProcess replicaToPromote = null;
-
-    // We must promote the most current replica, because otherwise file name reuse can cause a replication to fail when it needs to copy
-    // over a file currently held open for searching.  This also minimizes recovery work since the most current replica means less xlog
-    // replay to catch up:
-    for (NodeProcess node : nodes) {
-      if (node != null) {
-        message("ask " + node + " for its current searching version");
-        long searchingVersion = node.getSearchingVersion();
-        message(node + " has searchingVersion=" + searchingVersion);
-        if (searchingVersion > maxSearchingVersion) {
-          maxSearchingVersion = searchingVersion;
-          replicaToPromote = node;
-        }
-      }
-    }
-
-    if (replicaToPromote == null) {
-      message("top: no replicas running; skipping primary promotion");
-      return;
-    }
-
-    message("top: promote " + replicaToPromote + " version=" + maxSearchingVersion + "; now commit");
-    if (replicaToPromote.commit() == false) {
-      message("top: commit failed; skipping primary promotion");
-      return;
-    }
-
-    message("top: now shutdown " + replicaToPromote);
-    if (replicaToPromote.shutdown() == false) {
-      message("top: shutdown failed for R" + replicaToPromote.id + "; skipping primary promotion");
-      return;
-    }
-
-    int id = replicaToPromote.id;
-    message("top: now startPrimary " + replicaToPromote);
-    startPrimary(replicaToPromote.id);
-  }
-
-  void startPrimary(int id) throws IOException {
-    message(id + ": top: startPrimary lastPrimaryVersion=" + lastPrimaryVersion);
-    assert nodes[id] == null;
-
-    // Force version of new primary to advance beyond where old primary was, so we never re-use versions.  It may have
-    // already advanced beyond newVersion, e.g. if it flushed new segments while during xlog replay:
-
-    // First start node as primary (it opens an IndexWriter) but do not publish it for searching until we replay xlog:
-    NodeProcess newPrimary = startNode(id, indexPaths[id], true, lastPrimaryVersion+1);
-    if (newPrimary == null) {
-      message("top: newPrimary failed to start; abort");
-      return;
-    }
-
-    // Get xlog location that this node was guaranteed to already have indexed through; this may replay some ops already indexed but it's OK
-    // because the ops are idempotent: we updateDocument (by docid) on replay even for original addDocument:
-    Long startTransLogLoc;
-    Integer markerCount;
-    if (newPrimary.initCommitVersion == 0) {
-      startTransLogLoc = 0L;
-      markerCount = 0;
-    } else {
-      startTransLogLoc = versionToTransLogLocation.get(newPrimary.initCommitVersion);
-      markerCount = versionToMarker.get(newPrimary.initCommitVersion);
-    }
-    assert startTransLogLoc != null: "newPrimary.initCommitVersion=" + newPrimary.initCommitVersion + " is missing from versionToTransLogLocation: keys=" + versionToTransLogLocation.keySet();
-    assert markerCount != null: "newPrimary.initCommitVersion=" + newPrimary.initCommitVersion + " is missing from versionToMarker: keys=" + versionToMarker.keySet();
-
-    // When the primary starts, the userData in its latest commit point tells us which version it had indexed up to, so we know where to
-    // replay from in the xlog.  However, we forcefuly advance the version, and then IW on init (or maybe getReader) also adds 1 to it.
-    // Since we publish the primary in this state (before xlog replay is done), a replica can start up at this point and pull this version,
-    // and possibly later be chosen as a primary, causing problems if the version is known recorded in the translog map.  So we record it
-    // here:
-
-    addTransLogLoc(newPrimary.initInfosVersion, startTransLogLoc);
-    addVersionMarker(newPrimary.initInfosVersion, markerCount);
-
-    assert newPrimary.initInfosVersion >= lastPrimaryVersion;
-    message("top: now change lastPrimaryVersion from " + lastPrimaryVersion + " to " + newPrimary.initInfosVersion);
-    lastPrimaryVersion = newPrimary.initInfosVersion;
-
-    // Publish new primary, before replaying xlog.  This means other indexing ops can come in at the same time as we catch up indexing
-    // previous ops.  Effectively, we have "forked" the indexing ops, by rolling back in time a bit, and replaying old indexing ops (from
-    // translog) concurrently with new incoming ops.
-    nodes[id] = newPrimary;
-    primary = newPrimary;
-
-    sendReplicasToPrimary();
-
-    long nextTransLogLoc = transLog.getNextLocation();
-    int nextMarkerUpto = markerUpto.get();
-    message("top: replay trans log " + startTransLogLoc + " (version=" + newPrimary.initCommitVersion + ") to " + nextTransLogLoc + " (translog end)");
-    try {
-      transLog.replay(newPrimary, startTransLogLoc, nextTransLogLoc);
-    } catch (IOException ioe) {
-      message("top: replay xlog failed; abort");
-      return;
-    }
-    message("top: done replay trans log");
-  }
-
   final AtomicLong nodeStartCounter = new AtomicLong();
 
-  final Set<Integer> crashingNodes = Collections.synchronizedSet(new HashSet<>());
-
   /** Launches a child "server" (separate JVM), which is either primary or replica node */
-  NodeProcess startNode(final int id, Path indexPath, boolean isPrimary, long forcePrimaryVersion) throws IOException {
-    nodeTimeStamps[id] = System.nanoTime();
+  NodeProcess startNode(int primaryTCPPort, final int id, Path indexPath, boolean isPrimary, long forcePrimaryVersion) throws IOException {
     List<String> cmd = new ArrayList<>();
 
-    NodeProcess curPrimary = primary;
-
     cmd.add(System.getProperty("java.home") 
         + System.getProperty("file.separator")
         + "bin"
@@ -518,12 +58,13 @@ public class TestNRTReplication extends LuceneTestCase {
         + "java");
     cmd.add("-Xmx512m");
 
-    if (curPrimary != null) {
-      cmd.add("-Dtests.nrtreplication.primaryTCPPort=" + curPrimary.tcpPort);
+    if (primaryTCPPort != -1) {
+      cmd.add("-Dtests.nrtreplication.primaryTCPPort=" + primaryTCPPort);
     } else if (isPrimary == false) {
       // We cannot start a replica when there is no primary:
       return null;
     }
+    cmd.add("-Dtests.nrtreplication.closeorcrash=false");
 
     cmd.add("-Dtests.nrtreplication.node=true");
     cmd.add("-Dtests.nrtreplication.nodeid=" + id);
@@ -534,7 +75,7 @@ public class TestNRTReplication extends LuceneTestCase {
       cmd.add("-Dtests.nrtreplication.forcePrimaryVersion=" + forcePrimaryVersion);
     }
 
-    long myPrimaryGen = primaryGen;
+    long myPrimaryGen = 0;
     cmd.add("-Dtests.nrtreplication.primaryGen=" + myPrimaryGen);
 
     // Mixin our own counter because this is called from a fresh thread which means the seed otherwise isn't changing each time we spawn a
@@ -548,17 +89,6 @@ public class TestNRTReplication extends LuceneTestCase {
     cmd.add("org.junit.runner.JUnitCore");
     cmd.add(getClass().getName().replace(getClass().getSimpleName(), "SimpleServer"));
 
-    Writer childLog;
-
-    if (SEPARATE_CHILD_OUTPUT) {
-      Path childOut = childTempDir.resolve(id + ".log");
-      message("logging to " + childOut);
-      childLog = Files.newBufferedWriter(childOut, StandardCharsets.UTF_8, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
-      childLog.write("\n\nSTART NEW CHILD:\n");
-    } else {
-      childLog = null;
-    }
-
     message("child process command: " + cmd);
     ProcessBuilder pb = new ProcessBuilder(cmd);
     pb.redirectErrorStream(true);
@@ -592,41 +122,11 @@ public class TestNRTReplication extends LuceneTestCase {
           throw new RuntimeException(ie);
         }
         message("exit value=" + p.exitValue());
-
-        // Hackity hack, in case primary crashed/closed and we haven't noticed (reaped the process) yet:
-        if (isPrimary == false) {
-          if (sawExistingSegmentsFile) {
-            // This means MDW's virus checker blocked us from deleting segments_N that we must delete in order to start ... just return null
-            // and retry again later:
-            message("failed to remove segments_N; skipping");
-            return null;
-          }
-          for(int i=0;i<10;i++) {
-            if (primaryGen != myPrimaryGen || primary == null) {
-              // OK: primary crashed while we were trying to start, so it's expected/allowed that we could not start the replica:
-              message("primary crashed/closed while replica R" + id + " tried to start; skipping");
-              return null;
-            } else {
-              try {
-                Thread.sleep(10);
-              } catch (InterruptedException ie) {
-                throw new ThreadInterruptedException(ie);
-              }
-            }
-          }
-        }
-
-        // Should fail the test:
         message("top: now fail test replica R" + id + " failed to start");
-        failed.set(true);
         throw new RuntimeException("replica R" + id + " failed to start");
       }
 
-      if (childLog != null) {
-        childLog.write(l);
-        childLog.write("\n");
-        childLog.flush();
-      } else if (logTimeStart.matcher(l).matches()) {
+      if (logTimeStart.matcher(l).matches()) {
         // Already a well-formed log output:
         System.out.println(l);
       } else {
@@ -650,7 +150,7 @@ public class TestNRTReplication extends LuceneTestCase {
 
     final boolean finalWillCrash = willCrash;
 
-    // Baby sits the child process, pulling its stdout and printing to our stdout, calling nodeClosed once it exits:
+    // Baby sits the child process, pulling its stdout and printing to our stdout:
     Thread pumper = ThreadPumper.start(
                                        new Runnable() {
                                          @Override
@@ -665,503 +165,147 @@ public class TestNRTReplication extends LuceneTestCase {
                                            message("done wait for process " + p);
                                            int exitValue = p.exitValue();
                                            message("exit value=" + exitValue + " willCrash=" + finalWillCrash);
-                                           if (childLog != null) {
-                                             try {
-                                               childLog.write("process done; exitValue=" + exitValue + "\n");
-                                               childLog.close();
-                                             } catch (IOException ioe) {
-                                               throw new RuntimeException(ioe);
-                                             }
-                                           }
-                                           if (exitValue != 0 && finalWillCrash == false && crashingNodes.remove(id) == false) {
+                                           if (exitValue != 0) {
                                              // should fail test
-                                             failed.set(true);
-                                             if (childLog != null) {
-                                               throw new RuntimeException("node " + id + " process had unexpected non-zero exit status=" + exitValue + "; see " + childLog + " for details");
-                                             } else {
-                                               throw new RuntimeException("node " + id + " process had unexpected non-zero exit status=" + exitValue);
-                                             }
+                                             throw new RuntimeException("node " + id + " process had unexpected non-zero exit status=" + exitValue);
                                            }
-                                           nodeClosed(id);
                                          }
-                                       }, r, System.out, childLog);
+                                       }, r, System.out, null);
     pumper.setName("pump" + id);
 
     message("top: node=" + id + " started at tcpPort=" + tcpPort + " initCommitVersion=" + initCommitVersion + " initInfosVersion=" + initInfosVersion);
     return new NodeProcess(p, id, tcpPort, pumper, isPrimary, initCommitVersion, initInfosVersion);
   }
 
-  private void nodeClosed(int id) {
-    NodeProcess oldNode = nodes[id];
-    if (primary != null && oldNode == primary) {
-      message("top: " + primary + ": primary process finished");
-      primary = null;
-      primaryGen++;
-    } else {
-      message("top: " + oldNode + ": replica process finished");
+  public void testReplicateDeleteAllDocuments() throws Exception {
+
+    Node.globalStartNS = System.nanoTime();
+    childTempDir = createTempDir("child");
+
+    message("change thread name from " + Thread.currentThread().getName());
+    Thread.currentThread().setName("main");
+    
+    Path primaryPath = createTempDir("primary");
+    NodeProcess primary = startNode(-1, 0, primaryPath, true, -1);
+
+    Path replicaPath = createTempDir("replica");
+    NodeProcess replica = startNode(primary.tcpPort, 1, replicaPath, false, -1);
+
+    // Tell primary current replicas:
+    try (Connection c = new Connection(primary.tcpPort)) {
+      c.out.writeByte(SimplePrimaryNode.CMD_SET_REPLICAS);
+      c.out.writeVInt(1);
+      c.out.writeVInt(replica.id);
+      c.out.writeVInt(replica.tcpPort);
+      c.flush();
+      c.in.readByte();
     }
-    if (oldNode != null) {
-      oldNode.isOpen = false;
+
+    // Index 10 docs into primary:
+    LineFileDocs docs = new LineFileDocs(random());
+    Connection primaryC = new Connection(primary.tcpPort);
+    primaryC.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+    for(int i=0;i<10;i++) {
+      Document doc = docs.nextDoc();
+      primary.addOrUpdateDocument(primaryC, doc, false);
     }
-    nodes[id] = null;
-    nodeTimeStamps[id] = System.nanoTime();
 
-    sendReplicasToPrimary();
-  }
+    // Nothing in replica index yet
+    Connection replicaC = new Connection(replica.tcpPort);
+    replicaC.out.writeByte(SimplePrimaryNode.CMD_SEARCH_ALL);
+    replicaC.flush();
+    long version1 = replicaC.in.readVLong();
+    assertEquals(0L, version1);
+    int hitCount = replicaC.in.readVInt();
+    assertEquals(0, hitCount);
 
-  /** Sends currently alive replicas to primary, which uses this to know who to notify when it does a refresh */
-  private void sendReplicasToPrimary() {
-    NodeProcess curPrimary = primary;
-    if (curPrimary != null) {
-      List<NodeProcess> replicas = new ArrayList<>();
-      for (NodeProcess node : nodes) {
-        if (node != null && node.isPrimary == false) {
-          replicas.add(node);
-        }
-      }
+    // Refresh primary, which also pushes to replica:
+    long primaryVersion1 = primary.flush();
+    assertTrue(primaryVersion1 > 0);
 
-      message("top: send " + replicas.size() + " replicas to primary");
+    long version2;
 
-      try (Connection c = new Connection(curPrimary.tcpPort)) {
-        c.out.writeByte(SimplePrimaryNode.CMD_SET_REPLICAS);
-        c.out.writeVInt(replicas.size());        
-        for(NodeProcess replica : replicas) {
-          c.out.writeVInt(replica.id);
-          c.out.writeVInt(replica.tcpPort);
-        }
-        c.flush();
-        c.in.readByte();
-      } catch (Throwable t) {
-        message("top: ignore exc sending replicas to primary: " + t);
+    // Wait for replica to show the change
+    while (true) {
+      replicaC.out.writeByte(SimplePrimaryNode.CMD_SEARCH_ALL);
+      replicaC.flush();
+      version2 = replicaC.in.readVLong();
+      hitCount = replicaC.in.readVInt();
+      if (hitCount == 10) {
+        assertTrue(version2 > version1);
+        // good!
+        break;
       }
+      Thread.sleep(10);
     }
-  }
 
-  void addVersionMarker(long version, int count) {
-    //System.out.println("ADD VERSION MARKER version=" + version + " count=" + count);
-    if (versionToMarker.containsKey(version)) {
-      int curCount = versionToMarker.get(version);
-      if (curCount != count) {
-        message("top: wrong marker count version=" + version + " count=" + count + " curCount=" + curCount);
-        throw new IllegalStateException("version=" + version + " count=" + count + " curCount=" + curCount);
+    // Delete all docs from primary
+    if (random().nextBoolean()) {
+      // Inefficiently:
+      for(int id=0;id<10;id++) {
+        primary.deleteDocument(primaryC, Integer.toString(id));
       }
     } else {
-      message("top: record marker count: version=" + version + " count=" + count);
-      versionToMarker.put(version, count);
+      // Efficiently:
+      primary.deleteAllDocuments(primaryC);
     }
-  }
-
-  void addTransLogLoc(long version, long loc) {
-    message("top: record transLogLoc: version=" + version + " loc=" + loc);
-    versionToTransLogLocation.put(version, loc);
-  }
-
-  // Periodically wakes up and starts up any down nodes:
-  private class RestartThread extends Thread {
-    @Override
-    public void run() {
-
-      List<Thread> startupThreads = Collections.synchronizedList(new ArrayList<>());
-
-      try {
-        while (stop.get() == false) {
-          Thread.sleep(TestUtil.nextInt(random(), 50, 500));
-          message("top: restarter cycle");
-
-          // Randomly crash full cluster:
-          if (DO_FULL_CLUSTER_CRASH && random().nextInt(50) == 17) {
-            message("top: full cluster crash");
-            for(int i=0;i<nodes.length;i++) {
-              if (starting[i]) {
-                message("N" + i + ": top: wait for startup so we can crash...");
-                while (starting[i]) {
-                  Thread.sleep(10);
-                }
-                message("N" + i + ": top: done wait for startup");
-              }
-              NodeProcess node = nodes[i];
-              if (node != null) {
-                crashingNodes.add(i);
-                message("top: N" + node.id + ": top: now crash node");
-                node.crash();
-                message("top: N" + node.id + ": top: done crash node");
-              }
-            }
-          }
-
-          List<Integer> downNodes = new ArrayList<>();
-          StringBuilder b = new StringBuilder();
-          long nowNS = System.nanoTime();
-          for(int i=0;i<nodes.length;i++) {
-            b.append(' ');
-            double sec = (nowNS - nodeTimeStamps[i])/1000000000.0;
-            String prefix;
-            if (nodes[i] == null) {
-              downNodes.add(i);
-              if (starting[i]) {
-                prefix = "s";
-              } else {
-                prefix = "x";
-              }
-            } else {
-              prefix = "";
-            }
-            if (primary != null && nodes[i] == primary) {
-              prefix += "p";
-            }
-            b.append(String.format(Locale.ROOT, "%s%d(%.1fs)", prefix, i, sec));
-          }
-          message("node status" + b.toString());
-          message("downNodes=" + downNodes);
-
-          // If primary is down, promote a replica:
-          if (primary == null) {
-            if (anyNodesStarting()) {
-              message("top: skip promote replica: nodes are still starting");
-              continue;
-            }
-            promoteReplica();
-          }
-
-          // Randomly start up a down a replica:
-
-          // Stop or start a replica
-          if (downNodes.isEmpty() == false) {
-            int idx = downNodes.get(random().nextInt(downNodes.size()));
-            if (starting[idx] == false) {
-              if (primary == null) {
-                if (downNodes.size() == nodes.length) {
-                  // Cold start: entire cluster is down, start this node up as the new primary
-                  message("N" + idx + ": top: cold start as primary");
-                  startPrimary(idx);
-                }
-              } else if (random().nextDouble() < ((double) downNodes.size())/nodes.length) {
-                // Start up replica:
-                starting[idx] = true;
-                message("N" + idx + ": top: start up: launch thread");
-                Thread t = new Thread() {
-                    @Override
-                    public void run() {
-                      try {
-                        message("N" + idx + ": top: start up thread");
-                        nodes[idx] = startNode(idx, indexPaths[idx], false, -1);
-                        sendReplicasToPrimary();
-                      } catch (Throwable t) {
-                        failed.set(true);
-                        stop.set(true);
-                        throw new RuntimeException(t);
-                      } finally {
-                        starting[idx] = false;
-                        startupThreads.remove(Thread.currentThread());
-                      }
-                    }
-                  };
-                t.setName("start R" + idx);
-                t.start();
-                startupThreads.add(t);
-              }
-            } else {
-              message("node " + idx + " still starting");
-            }
-          }
-        }
 
-        System.out.println("Restarter: now stop: join " + startupThreads.size() + " startup threads");
-
-        while (startupThreads.size() > 0) {
-          Thread.sleep(10);
-        }
+    // Replica still shows 10 docs:
+    replicaC.out.writeByte(SimplePrimaryNode.CMD_SEARCH_ALL);
+    replicaC.flush();
+    long version3 = replicaC.in.readVLong();
+    assertEquals(version2, version3);
+    hitCount = replicaC.in.readVInt();
+    assertEquals(10, hitCount);
+    
+    // Refresh primary, which also pushes to replica:
+    long primaryVersion2 = primary.flush();
+    assertTrue(primaryVersion2 > primaryVersion1);
 
-      } catch (Throwable t) {
-        failed.set(true);
-        stop.set(true);
-        throw new RuntimeException(t);
+    // Wait for replica to show the change
+    long version4;
+    while (true) {
+      replicaC.out.writeByte(SimplePrimaryNode.CMD_SEARCH_ALL);
+      replicaC.flush();
+      version4 = replicaC.in.readVLong();
+      hitCount = replicaC.in.readVInt();
+      if (hitCount == 0) {
+        assertTrue(version4 > version3);
+        // good!
+        break;
       }
+      Thread.sleep(10);
     }
-  }
-
-  /** Randomly picks a node and runs a search against it */
-  private class SearchThread extends Thread {
-
-    @Override
-    public void run() {
-      // Maps version to number of hits for silly 'the' TermQuery:
-      Query theQuery = new TermQuery(new Term("body", "the"));
-
-      // Persists connections
-      Map<Integer,Connection> connections = new HashMap<>();
-
-      while (stop.get() == false) {
-        NodeProcess node = nodes[random().nextInt(nodes.length)];
-        if (node == null || node.isOpen == false) {
-          continue;
-        }
-
-        if (node.lock.tryLock() == false) {
-          // Node is in the process of closing or crashing or something
-          continue;
-        }
-
-        try {
-
-          Thread.currentThread().setName("Searcher node=" + node);
-
-          //System.out.println("S: cycle; conns=" + connections);
-
-          Connection c = connections.get(node.id);
-
-          long version;
-          try {
-            if (c == null) {
-              //System.out.println("S: new connection " + node.id + " " + Thread.currentThread().getName());
-              c = new Connection(node.tcpPort);
-              connections.put(node.id, c);
-            } else {
-              //System.out.println("S: reuse connection " + node.id + " " + Thread.currentThread().getName());
-            }
-
-            c.out.writeByte(SimplePrimaryNode.CMD_SEARCH);
-            c.flush();
-
-            while (c.sockIn.available() == 0) {
-              if (stop.get()) {
-                break;
-              }
-              if (node.isOpen == false) {
-                throw new IOException("node closed");
-              }
-              Thread.sleep(1);
-            }
-            version = c.in.readVLong();
-
-            while (c.sockIn.available() == 0) {
-              if (stop.get()) {
-                break;
-              }
-              if (node.isOpen == false) {
-                throw new IOException("node closed");
-              }
-              Thread.sleep(1);
-            }
-            int hitCount = c.in.readVInt();
-
-            Integer oldHitCount = hitCounts.get(version);
-
-            // TODO: we never prune this map...
-            if (oldHitCount == null) {
-              hitCounts.put(version, hitCount);
-              message("top: searcher: record search hitCount version=" + version + " hitCount=" + hitCount + " node=" + node);
-            } else {
-              // Just ensure that all nodes show the same hit count for
-              // the same version, i.e. they really are replicas of one another:
-              if (oldHitCount.intValue() != hitCount) {
-                failed.set(true);
-                stop.set(true);
-                message("top: searcher: wrong version hitCount: version=" + version + " oldHitCount=" + oldHitCount.intValue() + " hitCount=" + hitCount);
-                fail("version=" + version + " oldHitCount=" + oldHitCount.intValue() + " hitCount=" + hitCount);
-              }
-            }
-          } catch (IOException ioe) {
-            //message("top: searcher: ignore exc talking to node " + node + ": " + ioe);
-            //ioe.printStackTrace(System.out);
-            IOUtils.closeWhileHandlingException(c);
-            connections.remove(node.id);
-            continue;
-          }
-
-          // This can be null if we got the new primary after crash and that primary is still catching up (replaying xlog):
-          Integer expectedAtLeastHitCount = versionToMarker.get(version);
-
-          if (expectedAtLeastHitCount != null && expectedAtLeastHitCount > 0 && random().nextInt(10) == 7) {
-            try {
-              c.out.writeByte(SimplePrimaryNode.CMD_MARKER_SEARCH);
-              c.flush();
-              while (c.sockIn.available() == 0) {
-                if (stop.get()) {
-                  break;
-                }
-                if (node.isOpen == false) {
-                  throw new IOException("node died");
-                }
-                Thread.sleep(1);
-              }
-
-              version = c.in.readVLong();
 
-              while (c.sockIn.available() == 0) {
-                if (stop.get()) {
-                  break;
-                }
-                if (node.isOpen == false) {
-                  throw new IOException("node died");
-                }
-                Thread.sleep(1);
-              }
-
-              int hitCount = c.in.readVInt();
-
-              // Look for data loss: make sure all marker docs are visible:
-            
-              if (hitCount < expectedAtLeastHitCount) {
-
-                String failMessage = "node=" + node + ": documents were lost version=" + version + " hitCount=" + hitCount + " vs expectedAtLeastHitCount=" + expectedAtLeastHitCount;
-                message(failMessage);
-                failed.set(true);
-                stop.set(true);
-                fail(failMessage);
-              }
-            } catch (IOException ioe) {
-              //message("top: searcher: ignore exc talking to node " + node + ": " + ioe);
-              //throw new RuntimeException(ioe);
-              //ioe.printStackTrace(System.out);
-              IOUtils.closeWhileHandlingException(c);
-              connections.remove(node.id);
-              continue;
-            }
-          }
+    // Index 10 docs again:
+    for(int i=0;i<10;i++) {
+      Document doc = docs.nextDoc();
+      primary.addOrUpdateDocument(primaryC, doc, false);
+    }
 
-          Thread.sleep(10);
+    // Refresh primary, which also pushes to replica:
+    long primaryVersion3 = primary.flush();
+    assertTrue(primaryVersion3 > primaryVersion2);
 
-        } catch (Throwable t) {
-          failed.set(true);
-          stop.set(true);
-          throw new RuntimeException(t);
-        } finally {
-          node.lock.unlock();
-        }
+    // Wait for replica to show the change
+    while (true) {
+      replicaC.out.writeByte(SimplePrimaryNode.CMD_SEARCH_ALL);
+      replicaC.flush();
+      long version5 = replicaC.in.readVLong();
+      hitCount = replicaC.in.readVInt();
+      if (hitCount == 10) {
+        assertTrue(version5 > version4);
+        // good!
+        break;
       }
-      System.out.println("Searcher: now stop");
-      IOUtils.closeWhileHandlingException(connections.values());
+      Thread.sleep(10);
     }
-  }
-
-  private class IndexThread extends Thread {
-
-    @Override
-    public void run() {
-
-      try {
-        LineFileDocs docs = new LineFileDocs(random());
-        int docCount = 0;
-
-        // How often we do an update/delete vs add:
-        double updatePct = random().nextDouble();
-
-        // Varies how many docs/sec we index:
-        int sleepChance = TestUtil.nextInt(random(), 4, 100);
 
-        message("top: indexer: updatePct=" + updatePct + " sleepChance=" + sleepChance);
+    replicaC.close();
+    primaryC.close();
 
-        long lastTransLogLoc = transLog.getNextLocation();
-        
-        NodeProcess curPrimary = null;
-        Connection c = null;
-
-        while (stop.get() == false) {
-
-          try {
-            while (stop.get() == false && curPrimary == null) {
-              Thread.sleep(10);
-              curPrimary = primary;
-              if (curPrimary != null) {
-                c = new Connection(curPrimary.tcpPort);
-                c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
-                break;
-              }
-            }
-
-            if (stop.get()) {
-              break;
-            }
-
-            Thread.currentThread().setName("indexer p" + curPrimary.id);
-
-            if (random().nextInt(10) == 7) {
-              // We use the marker docs to check for data loss in search thread:
-              Document doc = new Document();
-              int id = markerUpto.getAndIncrement();
-              String idString = "m"+id;
-              doc.add(newStringField("docid", idString, Field.Store.YES));
-              doc.add(newStringField("marker", "marker", Field.Store.YES));
-              curPrimary.addOrUpdateDocument(c, doc, false);
-              transLog.addDocument(idString, doc);
-              message("index marker=" + idString + "; translog is " + Node.bytesToString(Files.size(transLogPath)));
-            }
-
-            if (docCount > 0 && random().nextDouble() < updatePct) {
-              int randomID = random().nextInt(docCount);
-              String randomIDString = Integer.toString(randomID);
-              if (random().nextBoolean()) {
-                // Replace previous doc
-                Document doc = docs.nextDoc();
-                ((Field) doc.getField("docid")).setStringValue(randomIDString);
-                curPrimary.addOrUpdateDocument(c, doc, true);
-                transLog.updateDocument(randomIDString, doc);
-              } else {
-                // Delete previous doc
-                curPrimary.deleteDocument(c, randomIDString);
-                transLog.deleteDocuments(randomIDString);
-              }
-            } else {
-              // Add new doc:
-              Document doc = docs.nextDoc();
-              String idString = Integer.toString(docCount++);
-              ((Field) doc.getField("docid")).setStringValue(idString);
-              curPrimary.addOrUpdateDocument(c, doc, false);
-              transLog.addDocument(idString, doc);
-
-              if (DO_RANDOM_XLOG_REPLAY && random().nextInt(10) == 7) {
-                long curLoc = transLog.getNextLocation();
-                // randomly replay chunks of translog just to test replay:
-                message("now randomly replay translog from " + lastTransLogLoc + " to " + curLoc);
-                transLog.replay(curPrimary, lastTransLogLoc, curLoc);
-                lastTransLogLoc = curLoc;
-              }
-            }
-          } catch (IOException se) {
-            // Assume primary crashed
-            message("top: indexer lost connection to primary");
-            try {
-              c.close();
-            } catch (Throwable t) {
-            }
-            curPrimary = null;
-            c = null;
-          }
-
-          if (random().nextInt(sleepChance) == 0) {
-            Thread.sleep(1);
-          }
-
-          if (random().nextInt(100) == 17) {
-            System.out.println("Indexer: now pause for a bit...");
-            Thread.sleep(TestUtil.nextInt(random(), 500, 2000));
-            System.out.println("Indexer: done pause for a bit...");
-          }
-        }
-        if (curPrimary != null) {
-          try {
-            c.out.writeByte(SimplePrimaryNode.CMD_INDEXING_DONE);
-            c.flush();
-            c.in.readByte();
-          } catch (IOException se) {
-            // Assume primary crashed
-            message("top: indexer lost connection to primary");
-            try {
-              c.close();
-            } catch (Throwable t) {
-            }
-            curPrimary = null;
-            c = null;
-          }
-        }
-        System.out.println("Indexer: now stop");
-      } catch (Throwable t) {
-        failed.set(true);
-        stop.set(true);
-        throw new RuntimeException(t);
-      }
-    }
+    replica.close();
+    primary.close();
   }
 
   static void message(String message) {