You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/02/11 17:42:42 UTC

[08/31] lucene-solr git commit: reduce some verbosity; fix a silly vInt/int disagreement in the test protocol; reduce indexing rate so translog replay can keep up vs primary crashing itself

reduce some verbosity; fix a silly vInt/int disagreement in the test protocol; reduce indexing rate so translog replay can keep up vs primary crashing itself


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/8889469b
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/8889469b
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/8889469b

Branch: refs/heads/master
Commit: 8889469b8bb1445133c39069adfdb5db028dbad1
Parents: 022540e
Author: Mike McCandless <mi...@apache.org>
Authored: Tue Feb 2 06:09:57 2016 -0500
Committer: Mike McCandless <mi...@apache.org>
Committed: Tue Feb 2 06:09:57 2016 -0500

----------------------------------------------------------------------
 .../org/apache/lucene/index/SegmentInfos.java   |   2 +-
 .../lucene/replicator/nrt/PrimaryNode.java      |   7 +-
 .../lucene/replicator/nrt/ReplicaNode.java      |  26 +++--
 .../lucene/replicator/nrt/NodeProcess.java      |  22 +++-
 .../replicator/nrt/SimplePrimaryNode.java       |  55 ++++++++--
 .../replicator/nrt/SimpleReplicaNode.java       |  27 ++++-
 .../lucene/replicator/nrt/SimpleServer.java     |  20 +++-
 .../lucene/replicator/nrt/SimpleTransLog.java   |   2 +-
 .../replicator/nrt/TestNRTReplication.java      |  86 ++++++++++++++-
 .../nrt/TestStressNRTReplication.java           | 107 ++++++++++++-------
 .../lucene/replicator/nrt/ThreadPumper.java     |   6 +-
 lucene/replicator/test.cmd                      |   2 +-
 12 files changed, 285 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java b/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
index 2f8d914..046f3ea 100644
--- a/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
+++ b/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
@@ -879,7 +879,7 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
    *  segments. */
   public void changed() {
     version++;
-    System.out.println(Thread.currentThread().getName() + ": SIS.change to version=" + version);
+    //System.out.println(Thread.currentThread().getName() + ": SIS.change to version=" + version);
     //new Throwable().printStackTrace(System.out);
   }
   

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
index ccd8848..3cff95e 100644
--- a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
@@ -104,7 +104,8 @@ public abstract class PrimaryNode extends Node {
 
       IndexSearcher s = mgr.acquire();
       try {
-        message("init: marker hit count: " + s.search(new TermQuery(new Term("marker", "marker")), 1).totalHits);
+        // TODO: this is test code specific!!
+        message("init: marker count: " + s.count(new TermQuery(new Term("marker", "marker"))));
       } finally {
         mgr.release(s);
       }
@@ -135,7 +136,7 @@ public abstract class PrimaryNode extends Node {
     if (result) {
       message("top: opened NRT reader version=" + curInfos.getVersion());
       finishedMergedFiles.removeAll(completedMergeFiles);
-      message("flushAndRefresh:  version=" + curInfos.getVersion() + " completedMergeFiles=" + completedMergeFiles + " finishedMergedFiles=" + finishedMergedFiles);
+      message("flushAndRefresh: version=" + curInfos.getVersion() + " completedMergeFiles=" + completedMergeFiles + " finishedMergedFiles=" + finishedMergedFiles);
     } else {
       message("top: no changes in flushAndRefresh; still version=" + curInfos.getVersion());
     }
@@ -208,6 +209,8 @@ public abstract class PrimaryNode extends Node {
     try {
       searcher = mgr.acquire();
       infos = ((StandardDirectoryReader) searcher.getIndexReader()).getSegmentInfos();
+      // TODO: this is test code specific!!
+      message("setCurrentInfos: marker count: " + searcher.count(new TermQuery(new Term("marker", "marker"))) + " version=" + infos.getVersion() + " searcher=" + searcher);
     } finally {
       if (searcher != null) {
         mgr.release(searcher);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
index 713c6f1..c7af429 100644
--- a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexFileNames;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.SegmentInfos;
@@ -152,8 +153,6 @@ abstract class ReplicaNode extends Node {
       deleter.deleteUnknownFiles(segmentsFileName);
       message("top: done delete unknown files on init: all files=" + Arrays.toString(dir.listAll()));
 
-      // nocommit make test where index has all docs deleted (all segments dropped, so 0 segments) and is then replicated
-
       String s = infos.getUserData().get(PRIMARY_GEN_KEY);
       long myPrimaryGen;
       if (s == null) {
@@ -277,6 +276,15 @@ abstract class ReplicaNode extends Node {
       // Finally, we are open for business, since our index now "agrees" with the primary:
       mgr = new SegmentInfosSearcherManager(dir, this, infos, searcherFactory);
 
+      IndexSearcher searcher = mgr.acquire();
+      try {
+        // TODO: this is test specific:
+        int hitCount = searcher.count(new TermQuery(new Term("marker", "marker")));
+        message("top: marker count=" + hitCount + " version=" + ((DirectoryReader) searcher.getIndexReader()).getVersion());
+      } finally {
+        mgr.release(searcher);
+      }
+
       // Must commit after init mgr:
       if (doCommit) {
         // Very important to commit what we just sync'd over, because we removed the pre-existing commit point above if we had to
@@ -287,9 +295,13 @@ abstract class ReplicaNode extends Node {
       message("top: done start");
       state = "idle";
     } catch (Throwable t) {
-      message("exc on start:");
-      t.printStackTrace(System.out);
-      throw new RuntimeException(t);
+      if (t.getMessage().startsWith("replica cannot start") == false) {
+        message("exc on start:");
+        t.printStackTrace(System.out);
+      } else {
+        dir.close();
+      }
+      IOUtils.reThrow(t);
     }
   }
   
@@ -418,7 +430,7 @@ abstract class ReplicaNode extends Node {
     int markerCount;
     IndexSearcher s = mgr.acquire();
     try {
-      markerCount = s.search(new TermQuery(new Term("marker", "marker")), 1).totalHits;
+      markerCount = s.count(new TermQuery(new Term("marker", "marker")));
     } finally {
       mgr.release(s);
     }
@@ -496,7 +508,7 @@ abstract class ReplicaNode extends Node {
     } catch (NodeCommunicationException nce) {
       // E.g. primary could crash/close when we are asking it for the copy state:
       message("top: ignoring communication exception creating CopyJob: " + nce);
-      nce.printStackTrace(System.out);
+      //nce.printStackTrace(System.out);
       if (state.equals("syncing")) {
         state = "idle";
       }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
index 9de2c04..dcbef87 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
@@ -19,6 +19,7 @@ package org.apache.lucene.replicator.nrt;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.lucene.document.Document;
@@ -47,7 +48,9 @@ class NodeProcess implements Closeable {
 
   volatile boolean isOpen = true;
 
-  public NodeProcess(Process p, int id, int tcpPort, Thread pumper, boolean isPrimary, long initCommitVersion, long initInfosVersion) {
+  final AtomicBoolean nodeIsClosing;
+
+  public NodeProcess(Process p, int id, int tcpPort, Thread pumper, boolean isPrimary, long initCommitVersion, long initInfosVersion, AtomicBoolean nodeIsClosing) {
     this.p = p;
     this.id = id;
     this.tcpPort = tcpPort;
@@ -55,6 +58,7 @@ class NodeProcess implements Closeable {
     this.isPrimary = isPrimary;
     this.initCommitVersion = initCommitVersion;
     this.initInfosVersion = initInfosVersion;
+    this.nodeIsClosing = nodeIsClosing;
     assert initInfosVersion >= initCommitVersion: "initInfosVersion=" + initInfosVersion + " initCommitVersion=" + initCommitVersion;
     lock = new ReentrantLock();
   }
@@ -122,10 +126,11 @@ class NodeProcess implements Closeable {
 
   /** Ask the primary node process to flush.  We send it all currently up replicas so it can notify them about the new NRT point.  Returns the newly
    *  flushed version, or a negative (current) version if there were no changes. */
-  public synchronized long flush() throws IOException {
+  public synchronized long flush(int atLeastMarkerCount) throws IOException {
     assert isPrimary;
     try (Connection c = new Connection(tcpPort)) {
       c.out.writeByte(SimplePrimaryNode.CMD_FLUSH);
+      c.out.writeVInt(atLeastMarkerCount);
       c.flush();
       c.s.shutdownOutput();
       return c.in.readLong();
@@ -218,7 +223,7 @@ class NodeProcess implements Closeable {
 
   public void deleteDocument(Connection c, String docid) throws IOException {
     if (isPrimary == false) {
-      throw new IllegalStateException("only primary can index");
+      throw new IllegalStateException("only primary can delete documents");
     }
     c.out.writeByte(SimplePrimaryNode.CMD_DELETE_DOC);
     c.out.writeString(docid);
@@ -228,11 +233,20 @@ class NodeProcess implements Closeable {
 
   public void deleteAllDocuments(Connection c) throws IOException {
     if (isPrimary == false) {
-      throw new IllegalStateException("only primary can index");
+      throw new IllegalStateException("only primary can delete documents");
     }
     c.out.writeByte(SimplePrimaryNode.CMD_DELETE_ALL_DOCS);
     c.flush();
     c.in.readByte();
   }
+
+  public void forceMerge(Connection c) throws IOException {
+    if (isPrimary == false) {
+      throw new IllegalStateException("only primary can force merge");
+    }
+    c.out.writeByte(SimplePrimaryNode.CMD_FORCE_MERGE);
+    c.flush();
+    c.in.readByte();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
index 0afd1b4..b9ecced 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
@@ -53,8 +53,10 @@ import org.apache.lucene.index.Term;
 import org.apache.lucene.index.TieredMergePolicy;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.SearcherFactory;
 import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
 import org.apache.lucene.store.DataInput;
 import org.apache.lucene.store.DataOutput;
 import org.apache.lucene.store.Directory;
@@ -272,6 +274,8 @@ class SimplePrimaryNode extends PrimaryNode {
   private void handleFlush(DataInput topIn, DataOutput topOut, BufferedOutputStream bos) throws IOException {
     Thread.currentThread().setName("flush");
 
+    int atLeastMarkerCount = topIn.readVInt();
+
     int[] replicaTCPPorts;
     int[] replicaIDs;
     synchronized (this) {
@@ -284,6 +288,8 @@ class SimplePrimaryNode extends PrimaryNode {
     if (flushAndRefresh()) {
       // Something did get flushed (there were indexing ops since the last flush):
 
+      verifyAtLeastMarkerCount(atLeastMarkerCount, null);
+
       // Tell caller the version before pushing to replicas, so that even if we crash after this, caller will know what version we
       // (possibly) pushed to some replicas.  Alternatively we could make this 2 separate ops?
       long version = getCopyStateVersion();
@@ -295,6 +301,7 @@ class SimplePrimaryNode extends PrimaryNode {
       for(int i=0;i<replicaIDs.length;i++) {
         int replicaID = replicaIDs[i];
         try (Connection c = new Connection(replicaTCPPorts[i])) {
+          message("send NEW_NRT_POINT to R" + replicaID + " at tcpPort=" + replicaTCPPorts[i]);
           c.out.writeByte(SimpleReplicaNode.CMD_NEW_NRT_POINT);
           c.out.writeVLong(version);
           c.out.writeInt(tcpPort);
@@ -452,6 +459,10 @@ class SimplePrimaryNode extends PrimaryNode {
         writer.deleteAll();
         out.writeByte((byte) 1);
         bos.flush();
+      } else if (cmd == CMD_FORCE_MERGE) {
+        writer.forceMerge(1);
+        out.writeByte((byte) 1);
+        bos.flush();
       } else if (cmd == CMD_INDEXING_DONE) {
         out.writeByte((byte) 1);
         bos.flush();
@@ -480,7 +491,6 @@ class SimplePrimaryNode extends PrimaryNode {
         throw new IllegalArgumentException("unhandled field name " + name);
       }
     }
-
     writer.addDocument(doc);
   }
 
@@ -537,6 +547,7 @@ class SimplePrimaryNode extends PrimaryNode {
   static final byte CMD_DELETE_DOC = 8;
   static final byte CMD_INDEXING_DONE = 19;
   static final byte CMD_DELETE_ALL_DOCS = 22;
+  static final byte CMD_FORCE_MERGE = 23;
 
   // Sent by replica to primary when replica first starts up, so primary can add it to any warming merges:
   static final byte CMD_NEW_REPLICA = 20;
@@ -617,15 +628,8 @@ class SimplePrimaryNode extends PrimaryNode {
       case CMD_MARKER_SEARCH:
         {
           Thread.currentThread().setName("msearch");
-          IndexSearcher searcher = mgr.acquire();
-          try {
-            long version = ((DirectoryReader) searcher.getIndexReader()).getVersion();
-            int hitCount = searcher.search(new TermQuery(new Term("marker", "marker")), 1).totalHits;
-            out.writeVLong(version);
-            out.writeVInt(hitCount);
-          } finally {
-            mgr.release(searcher);
-          }
+          int expectedAtLeastCount = in.readVInt();
+          verifyAtLeastMarkerCount(expectedAtLeastCount, out);
         }
         continue outer;
 
@@ -706,4 +710,35 @@ class SimplePrimaryNode extends PrimaryNode {
       break;
     }
   }
+
+  private void verifyAtLeastMarkerCount(int expectedAtLeastCount, DataOutput out) throws IOException {
+    IndexSearcher searcher = mgr.acquire();
+    try {
+      long version = ((DirectoryReader) searcher.getIndexReader()).getVersion();
+      int hitCount = searcher.count(new TermQuery(new Term("marker", "marker")));
+
+      if (hitCount < expectedAtLeastCount) {
+        message("marker search: expectedAtLeastCount=" + expectedAtLeastCount + " but hitCount=" + hitCount);
+        TopDocs hits = searcher.search(new TermQuery(new Term("marker", "marker")), expectedAtLeastCount);
+        List<Integer> seen = new ArrayList<>();
+        for(ScoreDoc hit : hits.scoreDocs) {
+          Document doc = searcher.doc(hit.doc);
+          seen.add(Integer.parseInt(doc.get("docid").substring(1)));
+        }
+        Collections.sort(seen);
+        message("saw markers:");
+        for(int marker : seen) {
+          message("saw m" + marker);
+        }
+        throw new IllegalStateException("at flush: marker count " + hitCount + " but expected at least " + expectedAtLeastCount + " version=" + version);
+      }
+
+      if (out != null) {
+        out.writeVLong(version);
+        out.writeVInt(hitCount);
+      }
+    } finally {
+      mgr.release(searcher);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
index 27a5547..bc8bb03 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
@@ -24,18 +24,24 @@ import java.io.InputStream;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.lucene.document.Document;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.SearcherFactory;
 import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
 import org.apache.lucene.store.DataInput;
 import org.apache.lucene.store.DataOutput;
 import org.apache.lucene.store.Directory;
@@ -148,6 +154,7 @@ class SimpleReplicaNode extends ReplicaNode {
 
   /** Handles incoming request to the naive TCP server wrapping this node */
   void handleOneConnection(ServerSocket ss, AtomicBoolean stop, InputStream is, Socket socket, DataInput in, DataOutput out, BufferedOutputStream bos) throws IOException, InterruptedException {
+    //message("one connection: " + socket);
     outer:
     while (true) {
       byte cmd;
@@ -173,6 +180,7 @@ class SimpleReplicaNode extends ReplicaNode {
           long version = in.readVLong();
           Thread.currentThread().setName("recv-" + version);
           curPrimaryTCPPort = in.readInt();
+          message("newNRTPoint primaryTCPPort=" + curPrimaryTCPPort);
           newNRTPoint(version);
         }
         break;
@@ -235,10 +243,26 @@ class SimpleReplicaNode extends ReplicaNode {
       case SimplePrimaryNode.CMD_MARKER_SEARCH:
         {
           Thread.currentThread().setName("msearch");
+          int expectedAtLeastCount = in.readVInt();
           IndexSearcher searcher = mgr.acquire();
           try {
             long version = ((DirectoryReader) searcher.getIndexReader()).getVersion();
-            int hitCount = searcher.search(new TermQuery(new Term("marker", "marker")), 1).totalHits;
+            int hitCount = searcher.count(new TermQuery(new Term("marker", "marker")));
+            if (hitCount < expectedAtLeastCount) {
+              message("marker search: expectedAtLeastCount=" + expectedAtLeastCount + " but hitCount=" + hitCount);
+              TopDocs hits = searcher.search(new TermQuery(new Term("marker", "marker")), expectedAtLeastCount);
+              List<Integer> seen = new ArrayList<>();
+              for(ScoreDoc hit : hits.scoreDocs) {
+                Document doc = searcher.doc(hit.doc);
+                seen.add(Integer.parseInt(doc.get("docid").substring(1)));
+              }
+              Collections.sort(seen);
+              message("saw markers:");
+              for(int marker : seen) {
+                message("saw m" + marker);
+              }
+            }
+
             out.writeVLong(version);
             out.writeVInt(hitCount);
             bos.flush();
@@ -305,7 +329,6 @@ class SimpleReplicaNode extends ReplicaNode {
       default:
         throw new IllegalArgumentException("unrecognized cmd=" + cmd);
       }
-      System.out.println("NOW FLUSH");
       bos.flush();
 
       break;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java
index 5b04721..3fdc45f 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java
@@ -22,6 +22,7 @@ import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.Method;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
@@ -42,9 +43,9 @@ import org.apache.lucene.store.InputStreamDataInput;
 import org.apache.lucene.store.OutputStreamDataOutput;
 import org.apache.lucene.util.Constants;
 import org.apache.lucene.util.IOUtils;
-import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
 import org.apache.lucene.util.LuceneTestCase.SuppressSysoutChecks;
+import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.TestUtil;
 
 /** Child process with silly naive TCP socket server to handle
@@ -99,8 +100,9 @@ public class SimpleServer extends LuceneTestCase {
         success = true;
       } catch (Throwable t) {
         if (t instanceof SocketException == false && t instanceof NodeCommunicationException == false) {
-          node.message("unexpected exception handling client connection:");
+          node.message("unexpected exception handling client connection; now failing test:");
           t.printStackTrace(System.out);
+          IOUtils.closeWhileHandlingException(ss);
           // Test should fail with this:
           throw new RuntimeException(t);
         } else {
@@ -218,7 +220,7 @@ public class SimpleServer extends LuceneTestCase {
   public void test() throws Exception {
 
     int id = Integer.parseInt(System.getProperty("tests.nrtreplication.nodeid"));
-    Thread.currentThread().setName("init child " + id);
+    Thread.currentThread().setName("main child " + id);
     Path indexPath = Paths.get(System.getProperty("tests.nrtreplication.indexpath"));
     boolean isPrimary = System.getProperty("tests.nrtreplication.isPrimary") != null;
     int primaryTCPPort;
@@ -238,7 +240,7 @@ public class SimpleServer extends LuceneTestCase {
     boolean doFlipBitsDuringCopy = "true".equals(System.getProperty("tests.nrtreplication.doFlipBitsDuringCopy"));
 
     // Create server socket that we listen for incoming requests on:
-    try (final ServerSocket ss = new ServerSocket(0)) {
+    try (final ServerSocket ss = new ServerSocket(0, 0, InetAddress.getLoopbackAddress())) {
 
       int tcpPort = ((InetSocketAddress) ss.getLocalSocketAddress()).getPort();
       System.out.println("\nPORT: " + tcpPort);
@@ -247,7 +249,15 @@ public class SimpleServer extends LuceneTestCase {
         node = new SimplePrimaryNode(random(), indexPath, id, tcpPort, primaryGen, forcePrimaryVersion, null, doFlipBitsDuringCopy);
         System.out.println("\nCOMMIT VERSION: " + ((PrimaryNode) node).getLastCommitVersion());
       } else {
-        node = new SimpleReplicaNode(random(), id, tcpPort, indexPath, primaryGen, primaryTCPPort, null);
+        try {
+          node = new SimpleReplicaNode(random(), id, tcpPort, indexPath, primaryGen, primaryTCPPort, null);
+        } catch (RuntimeException re) {
+          if (re.getMessage().startsWith("replica cannot start")) {
+            // this is "OK": it means MDW's refusal to delete a segments_N commit point means we cannot start:
+            assumeTrue(re.getMessage(), false);
+          }
+          throw re;
+        }
       }
       System.out.println("\nINFOS VERSION: " + node.getCurrentSearchingVersion());
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleTransLog.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleTransLog.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleTransLog.java
index d409ffc..4c11e52 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleTransLog.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleTransLog.java
@@ -197,7 +197,7 @@ class SimpleTransLog implements Closeable {
     }
     String marker = readNullableString(in);
     if (marker != null) {
-      //System.out.println("xlog: replay marker=" + id);
+      //TestStressNRTReplication.message("xlog: replay marker=" + id);
       doc.add(new StringField("marker", marker, Field.Store.YES));
     }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
index 755b234..b2240eb 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
@@ -231,8 +231,8 @@ public class TestNRTReplication extends LuceneTestCase {
       replicaC.flush();
       version2 = replicaC.in.readVLong();
       hitCount = replicaC.in.readVInt();
-      if (hitCount == 10) {
-        assertTrue(version2 > version1);
+      if (version2 == primaryVersion1) {
+        assertEquals(10, hitCount);
         // good!
         break;
       }
@@ -269,8 +269,9 @@ public class TestNRTReplication extends LuceneTestCase {
       replicaC.flush();
       version4 = replicaC.in.readVLong();
       hitCount = replicaC.in.readVInt();
-      if (hitCount == 0) {
+      if (version4 == primaryVersion2) {
         assertTrue(version4 > version3);
+        assertEquals(0, hitCount);
         // good!
         break;
       }
@@ -293,7 +294,8 @@ public class TestNRTReplication extends LuceneTestCase {
       replicaC.flush();
       long version5 = replicaC.in.readVLong();
       hitCount = replicaC.in.readVInt();
-      if (hitCount == 10) {
+      if (version5 == primaryVersion3) {
+        assertEquals(10, hitCount);
         assertTrue(version5 > version4);
         // good!
         break;
@@ -308,6 +310,82 @@ public class TestNRTReplication extends LuceneTestCase {
     primary.close();
   }
 
+  public void testReplicateForceMerge() throws Exception {
+
+    Node.globalStartNS = System.nanoTime();
+    childTempDir = createTempDir("child");
+
+    message("change thread name from " + Thread.currentThread().getName());
+    Thread.currentThread().setName("main");
+    
+    Path primaryPath = createTempDir("primary");
+    NodeProcess primary = startNode(-1, 0, primaryPath, true, -1);
+
+    Path replicaPath = createTempDir("replica");
+    NodeProcess replica = startNode(primary.tcpPort, 1, replicaPath, false, -1);
+
+    // Tell primary current replicas:
+    try (Connection c = new Connection(primary.tcpPort)) {
+      c.out.writeByte(SimplePrimaryNode.CMD_SET_REPLICAS);
+      c.out.writeVInt(1);
+      c.out.writeVInt(replica.id);
+      c.out.writeVInt(replica.tcpPort);
+      c.flush();
+      c.in.readByte();
+    }
+
+    // Index 10 docs into primary:
+    LineFileDocs docs = new LineFileDocs(random());
+    Connection primaryC = new Connection(primary.tcpPort);
+    primaryC.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+    for(int i=0;i<10;i++) {
+      Document doc = docs.nextDoc();
+      primary.addOrUpdateDocument(primaryC, doc, false);
+    }
+
+    // Refresh primary, which also pushes to replica:
+    long primaryVersion1 = primary.flush();
+    assertTrue(primaryVersion1 > 0);
+
+    // Index 10 more docs into primary:
+    for(int i=0;i<10;i++) {
+      Document doc = docs.nextDoc();
+      primary.addOrUpdateDocument(primaryC, doc, false);
+    }
+
+    // Refresh primary, which also pushes to replica:
+    long primaryVersion2 = primary.flush();
+    assertTrue(primaryVersion2 > primaryVersion1);
+
+    primary.forceMerge(primaryC);
+
+    // Refresh primary, which also pushes to replica:
+    long primaryVersion3 = primary.flush();
+    assertTrue(primaryVersion3 > primaryVersion2);
+
+    Connection replicaC = new Connection(replica.tcpPort);
+
+    // Wait for replica to show the change
+    while (true) {
+      replicaC.out.writeByte(SimplePrimaryNode.CMD_SEARCH_ALL);
+      replicaC.flush();
+      long version = replicaC.in.readVLong();
+      int hitCount = replicaC.in.readVInt();
+      if (version == primaryVersion3) {
+        assertEquals(20, hitCount);
+        // good!
+        break;
+      }
+      Thread.sleep(10);
+    }
+
+    replicaC.close();
+    primaryC.close();
+
+    replica.close();
+    primary.close();
+  }
+
   static void message(String message) {
     long now = System.nanoTime();
     System.out.println(String.format(Locale.ROOT,

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java
index 271c5d2..63ff12a 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java
@@ -58,10 +58,21 @@ import org.apache.lucene.util.ThreadInterruptedException;
 
 import com.carrotsearch.randomizedtesting.SeedUtils;
 
+// nocommit why so many "hit SocketException during commit with R0"?
+
+// nocommit why so much time when so many nodes are down
+
+// nocommit indexing is too fast?  (xlog replay fails to finish before primary crashes itself)
+
+// nocommit why all these NodeCommunicationExcs?
+
+// nocommit the sockets are a pita on jvm crashing ...
+
 /*
   TODO
     - fangs
       - sometimes have one replica be really slow at copying / have random pauses (fake GC) / etc.
+      - graceful primary close
     - why do we do the "rename temp to actual" all at the end...?  what really does that buy us?
     - replica should also track maxSegmentName its seen, and tap into inflateGens if it's later promoted to primary?
     - test should not print scary exceptions and then succeed!
@@ -137,11 +148,12 @@ public class TestStressNRTReplication extends LuceneTestCase {
   /** Randomly gracefully close a replica; it will later be restarted and sync itself. */
   static final boolean DO_CLOSE_REPLICA = true;
 
+  /** Randomly gracefully close the primary; it will later be restarted and sync itself. */
+  static final boolean DO_CLOSE_PRIMARY = true;
+
   /** If false, all child + parent output is interleaved into single stdout/err */
   static final boolean SEPARATE_CHILD_OUTPUT = false;
 
-  // nocommit DO_CLOSE_PRIMARY?
-
   /** Randomly crash whole cluster and then restart it */
   static final boolean DO_FULL_CLUSTER_CRASH = true;
 
@@ -151,8 +163,6 @@ public class TestStressNRTReplication extends LuceneTestCase {
   /** Set to a non-null value to force exactly that many nodes; else, it's random. */
   static final Integer NUM_NODES = null;
 
-  static final boolean DO_RANDOM_XLOG_REPLAY = false;
-
   final AtomicBoolean failed = new AtomicBoolean();
 
   final AtomicBoolean stop = new AtomicBoolean();
@@ -174,6 +184,7 @@ public class TestStressNRTReplication extends LuceneTestCase {
   Path transLogPath;
   SimpleTransLog transLog;
   final AtomicInteger markerUpto = new AtomicInteger();
+  final AtomicInteger markerID = new AtomicInteger();
 
   /** Maps searcher version to how many hits the query body:the matched. */
   final Map<Long,Integer> hitCounts = new ConcurrentHashMap<>();
@@ -201,7 +212,11 @@ public class TestStressNRTReplication extends LuceneTestCase {
 
     // Silly bootstrapping:
     versionToTransLogLocation.put(0L, 0L);
-    versionToTransLogLocation.put(1L, 0L);
+
+    // nocommit why also 1?
+    //versionToTransLogLocation.put(1L, 0L);
+
+    versionToMarker.put(0L, 0);
 
     int numNodes;
 
@@ -268,23 +283,24 @@ public class TestStressNRTReplication extends LuceneTestCase {
       // Wait a bit:
       Thread.sleep(TestUtil.nextInt(random(), Math.min(runTimeSec*4, 200), runTimeSec*4));
       if (primary != null && random().nextBoolean()) {
-        message("top: now flush primary");
         NodeProcess curPrimary = primary;
         if (curPrimary != null) {
 
           // Save these before we start flush:
           long nextTransLogLoc = transLog.getNextLocation();
           int markerUptoSav = markerUpto.get();
+          message("top: now flush primary; at least marker count=" + markerUptoSav);
 
           long result;
           try {
-            result = primary.flush();
+            result = primary.flush(markerUptoSav);
           } catch (Throwable t) {
             message("top: flush failed; skipping: " + t.getMessage());
             result = -1;
           }
           if (result > 0) {
             // There were changes
+            message("top: flush finished with changed; new primary version=" + result);
             lastPrimaryVersion = result;
             addTransLogLoc(lastPrimaryVersion, nextTransLogLoc);
             addVersionMarker(lastPrimaryVersion, markerUptoSav);
@@ -316,7 +332,7 @@ public class TestStressNRTReplication extends LuceneTestCase {
 
       {
         NodeProcess node = nodes[random().nextInt(nodes.length)];
-        if (node != null) {
+        if (node != null && node.nodeIsClosing.get() == false) {
           // TODO: if this node is primary, it means we committed a "partial" version (not exposed as an NRT point)... not sure it matters.
           // maybe we somehow allow IW to commit a specific sis (the one we just flushed)?
           message("top: now commit node=" + node);
@@ -452,27 +468,33 @@ public class TestStressNRTReplication extends LuceneTestCase {
     addVersionMarker(newPrimary.initInfosVersion, markerCount);
 
     assert newPrimary.initInfosVersion >= lastPrimaryVersion;
-    message("top: now change lastPrimaryVersion from " + lastPrimaryVersion + " to " + newPrimary.initInfosVersion);
+    message("top: now change lastPrimaryVersion from " + lastPrimaryVersion + " to " + newPrimary.initInfosVersion + "; startup marker count " + markerCount);
     lastPrimaryVersion = newPrimary.initInfosVersion;
 
-    // Publish new primary, before replaying xlog.  This means other indexing ops can come in at the same time as we catch up indexing
-    // previous ops.  Effectively, we have "forked" the indexing ops, by rolling back in time a bit, and replaying old indexing ops (from
-    // translog) concurrently with new incoming ops.
-    nodes[id] = newPrimary;
-    primary = newPrimary;
-
-    sendReplicasToPrimary();
-
     long nextTransLogLoc = transLog.getNextLocation();
-    int nextMarkerUpto = markerUpto.get();
-    message("top: replay trans log " + startTransLogLoc + " (version=" + newPrimary.initCommitVersion + ") to " + nextTransLogLoc + " (translog end)");
+    long t0 = System.nanoTime();
+    message("top: start translog replay " + startTransLogLoc + " (version=" + newPrimary.initCommitVersion + ") to " + nextTransLogLoc + " (translog end)");
     try {
       transLog.replay(newPrimary, startTransLogLoc, nextTransLogLoc);
     } catch (IOException ioe) {
+      // nocommit what if primary node is still running here, and we failed for some other reason?
       message("top: replay xlog failed; abort");
       return;
     }
-    message("top: done replay trans log");
+
+    long t1 = System.nanoTime();
+    message("top: done translog replay; took " + ((t1 - t0)/1000000.0) + " msec; now publish primary");
+
+    // Publish new primary only after translog has succeeded in replaying; this is important, for this test anyway, so we keep a "linear"
+    // history so enforcing marker counts is correct.  E.g., if we publish first and replay translog concurrently with incoming ops, then
+    // a primary commit that happens while translog is still replaying will incorrectly record the translog loc into the commit user data
+    // when in fact that commit did NOT reflect all prior ops.  So if we crash and start up again from that commit point, we are missing
+    // ops.
+    nodes[id] = newPrimary;
+    primary = newPrimary;
+
+    sendReplicasToPrimary();
+
   }
 
   /** Launches a child "server" (separate JVM), which is either primary or replica node */
@@ -506,6 +528,9 @@ public class TestStressNRTReplication extends LuceneTestCase {
       if (DO_CRASH_PRIMARY) {
         cmd.add("-Dtests.nrtreplication.doRandomCrash=true");
       }
+      if (DO_CLOSE_PRIMARY) {
+        cmd.add("-Dtests.nrtreplication.doRandomClose=true");
+      }
     } else {
       if (DO_CRASH_REPLICA) {
         cmd.add("-Dtests.nrtreplication.doRandomCrash=true");
@@ -544,7 +569,7 @@ public class TestStressNRTReplication extends LuceneTestCase {
       childLog = null;
     }
 
-    message("child process command: " + cmd);
+    //message("child process command: " + cmd);
     ProcessBuilder pb = new ProcessBuilder(cmd);
     pb.redirectErrorStream(true);
 
@@ -577,6 +602,10 @@ public class TestStressNRTReplication extends LuceneTestCase {
           throw new RuntimeException(ie);
         }
         message("exit value=" + p.exitValue());
+        if (p.exitValue() == 0) {
+          message("zero exit status; assuming failed to remove segments_N; skipping");
+          return null;
+        }
 
         // Hackity hack, in case primary crashed/closed and we haven't noticed (reaped the process) yet:
         if (isPrimary == false) {
@@ -586,8 +615,9 @@ public class TestStressNRTReplication extends LuceneTestCase {
             message("failed to remove segments_N; skipping");
             return null;
           }
-          for(int i=0;i<10;i++) {
-            if (primaryGen != myPrimaryGen || primary == null) {
+          for(int i=0;i<100;i++) {
+            NodeProcess primary2 = primary;
+            if (primaryGen != myPrimaryGen || primary2 == null || primary2.nodeIsClosing.get()) {
               // OK: primary crashed while we were trying to start, so it's expected/allowed that we could not start the replica:
               message("primary crashed/closed while replica R" + id + " tried to start; skipping");
               return null;
@@ -634,6 +664,7 @@ public class TestStressNRTReplication extends LuceneTestCase {
     }
 
     final boolean finalWillCrash = willCrash;
+    final AtomicBoolean nodeIsClosing = new AtomicBoolean();
 
     // Baby sits the child process, pulling its stdout and printing to our stdout, calling nodeClosed once it exits:
     Thread pumper = ThreadPumper.start(
@@ -669,11 +700,11 @@ public class TestStressNRTReplication extends LuceneTestCase {
                                            }
                                            nodeClosed(id);
                                          }
-                                       }, r, System.out, childLog);
+                                       }, r, System.out, childLog, nodeIsClosing);
     pumper.setName("pump" + id);
 
     message("top: node=" + id + " started at tcpPort=" + tcpPort + " initCommitVersion=" + initCommitVersion + " initInfosVersion=" + initInfosVersion);
-    return new NodeProcess(p, id, tcpPort, pumper, isPrimary, initCommitVersion, initInfosVersion);
+    return new NodeProcess(p, id, tcpPort, pumper, isPrimary, initCommitVersion, initInfosVersion, nodeIsClosing);
   }
 
   private void nodeClosed(int id) {
@@ -754,7 +785,7 @@ public class TestStressNRTReplication extends LuceneTestCase {
           message("top: restarter cycle");
 
           // Randomly crash full cluster:
-          if (DO_FULL_CLUSTER_CRASH && random().nextInt(50) == 17) {
+          if (DO_FULL_CLUSTER_CRASH && random().nextInt(500) == 17) {
             message("top: full cluster crash");
             for(int i=0;i<nodes.length;i++) {
               if (starting[i]) {
@@ -954,12 +985,15 @@ public class TestStressNRTReplication extends LuceneTestCase {
             continue;
           }
 
+          // nocommit not anymore?
           // This can be null if we got the new primary after crash and that primary is still catching up (replaying xlog):
           Integer expectedAtLeastHitCount = versionToMarker.get(version);
+          assertNotNull("version=" + version, expectedAtLeastHitCount);
 
           if (expectedAtLeastHitCount != null && expectedAtLeastHitCount > 0 && random().nextInt(10) == 7) {
             try {
               c.out.writeByte(SimplePrimaryNode.CMD_MARKER_SEARCH);
+              c.out.writeVInt(expectedAtLeastHitCount);
               c.flush();
               while (c.sockIn.available() == 0) {
                 if (stop.get()) {
@@ -1064,13 +1098,15 @@ public class TestStressNRTReplication extends LuceneTestCase {
             if (random().nextInt(10) == 7) {
               // We use the marker docs to check for data loss in search thread:
               Document doc = new Document();
-              int id = markerUpto.getAndIncrement();
+              int id = markerID.getAndIncrement();
               String idString = "m"+id;
               doc.add(newStringField("docid", idString, Field.Store.YES));
               doc.add(newStringField("marker", "marker", Field.Store.YES));
               curPrimary.addOrUpdateDocument(c, doc, false);
               transLog.addDocument(idString, doc);
-              message("index marker=" + idString + "; translog is " + Node.bytesToString(Files.size(transLogPath)));
+              // Only increment after primary replies:
+              markerUpto.getAndIncrement();
+              //message("index marker=" + idString + "; translog is " + Node.bytesToString(Files.size(transLogPath)));
             }
 
             if (docCount > 0 && random().nextDouble() < updatePct) {
@@ -1094,14 +1130,6 @@ public class TestStressNRTReplication extends LuceneTestCase {
               ((Field) doc.getField("docid")).setStringValue(idString);
               curPrimary.addOrUpdateDocument(c, doc, false);
               transLog.addDocument(idString, doc);
-
-              if (DO_RANDOM_XLOG_REPLAY && random().nextInt(10) == 7) {
-                long curLoc = transLog.getNextLocation();
-                // randomly replay chunks of translog just to test replay:
-                message("now randomly replay translog from " + lastTransLogLoc + " to " + curLoc);
-                transLog.replay(curPrimary, lastTransLogLoc, curLoc);
-                lastTransLogLoc = curLoc;
-              }
             }
           } catch (IOException se) {
             // Assume primary crashed
@@ -1115,12 +1143,13 @@ public class TestStressNRTReplication extends LuceneTestCase {
           }
 
           if (random().nextInt(sleepChance) == 0) {
-            Thread.sleep(1);
+            Thread.sleep(10);
           }
 
           if (random().nextInt(100) == 17) {
-            System.out.println("Indexer: now pause for a bit...");
-            Thread.sleep(TestUtil.nextInt(random(), 500, 2000));
+            int pauseMS = TestUtil.nextInt(random(), 500, 2000);
+            System.out.println("Indexer: now pause for " + pauseMS + " msec...");
+            Thread.sleep(pauseMS);
             System.out.println("Indexer: done pause for a bit...");
           }
         }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java
index 6ddb777..d74e170 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java
@@ -21,11 +21,12 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.io.Writer;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 
 /** A pipe thread. It'd be nice to reuse guava's implementation for this... */
 class ThreadPumper {
-  public static Thread start(final Runnable onExit, final BufferedReader from, final PrintStream to, final Writer toFile) {
+  public static Thread start(final Runnable onExit, final BufferedReader from, final PrintStream to, final Writer toFile, final AtomicBoolean nodeClosing) {
     Thread t = new Thread() {
         @Override
         public void run() {
@@ -43,6 +44,9 @@ class ThreadPumper {
               } else {
                 TestNRTReplication.message(line);
               }
+              if (line.contains("now force close server socket after")) {
+                nodeClosing.set(true);
+              }
             }
             // Sub-process finished
           } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/replicator/test.cmd
----------------------------------------------------------------------
diff --git a/lucene/replicator/test.cmd b/lucene/replicator/test.cmd
index 14e3bd2..c38fc96 100644
--- a/lucene/replicator/test.cmd
+++ b/lucene/replicator/test.cmd
@@ -1 +1 @@
-python -u /l/util/src/python/repeatLuceneTest.py -tmpDir /b/tmp -logDir /l/logs  TestNRTReplication -jvms 1 -mult 4 -nightly
+python -u /l/util/src/python/repeatLuceneTest.py -tmpDir /b/tmp -logDir /l/logs TestStressNRTReplication -jvms 1 -mult 4 -nightly