You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/02/11 17:42:40 UTC

[06/31] lucene-solr git commit: fix more nocommits; add separate test that deleteAll can replicate

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/022540e8/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java
new file mode 100644
index 0000000..271c5d2
--- /dev/null
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java
@@ -0,0 +1,1160 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.LineFileDocs;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
+import org.apache.lucene.util.LuceneTestCase.SuppressSysoutChecks;
+import org.apache.lucene.util.TestUtil;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+import com.carrotsearch.randomizedtesting.SeedUtils;
+
+/*
+  TODO
+    - fangs
+      - sometimes have one replica be really slow at copying / have random pauses (fake GC) / etc.
+    - why do we do the "rename temp to actual" all at the end...?  what really does that buy us?
+    - replica should also track maxSegmentName its seen, and tap into inflateGens if it's later promoted to primary?
+    - test should not print scary exceptions and then succeed!
+    - since all nodes are local, we could have a different test only impl that just does local file copies instead of via tcp...
+    - are the pre-copied-completed-merged files not being cleared in primary?
+      - hmm the logic isn't right today?  a replica may skip pulling a given copy state, that recorded the finished merged segments?
+    - beast & fix bugs
+    - graceful cluster restart
+    - better translog integration
+    - get "graceful primary shutdown" working
+    - there is still some global state we rely on for "correctness", e.g. lastPrimaryVersion
+    - clean up how version is persisted in commit data
+    - why am i not using hashes here?  how does ES use them?
+    - get all other "single shard" functions working too: this cluster should "act like" a single shard
+      - SLM
+      - controlled nrt reopen thread / returning long gen on write
+      - live field values
+      - add indexes
+    - make cluster level APIs to search, index, that deal w/ primary failover, etc.
+    - must prune xlog
+      - refuse to start primary unless we have quorum
+    - later
+      - if we named index files using segment's ID we wouldn't have file name conflicts after primary crash / rollback?
+      - back pressure on indexing if replicas can't keep up?
+      - get xlog working on top?  needs to be checkpointed, so we can correlate IW ops to NRT reader version and prune xlog based on commit
+        quorum
+        - maybe fix IW to return "gen" or "seq id" or "segment name" or something?
+      - replica can copy files from other replicas too / use multicast / rsync / something
+      - each replica could also pre-open a SegmentReader after pre-copy when warming a merge
+      - we can pre-copy newly flushed files too, for cases where reopen rate is low vs IW's flushing because RAM buffer is full
+      - opto: pre-copy files as they are written; if they will become CFS, we can build CFS on the replica?
+      - what about multiple commit points?
+      - fix primary to init directly from an open replica, instead of having to commit/close the replica first
+*/
+
+// Tricky cases:
+//   - we are pre-copying a merge, then replica starts up part way through, so it misses that pre-copy and must do it on next nrt point
+//   - a down replica starts up, but it's "from the future" vs the current primary, and must copy over file names with different contents
+//     but referenced by its latest commit point, so it must fully remove that commit ... which is a hazardous window
+//   - replica comes up just as the primary is crashing / moving
+//   - electing a new primary when a replica is just finishing its nrt sync: we need to wait for it so we are sure to get the "most up to
+//     date" replica
+//   - replica comes up after merged segment finished so it doesn't copy over the merged segment "promptly" (i.e. only sees it on NRT refresh)
+
+/**
+ * Test case showing how to implement NRT replication.  This test spawns a sub-process per-node, running TestNRTReplicationChild.
+ *
+ * One node is primary, and segments are periodically flushed there, then concurrently the N replica nodes copy the new files over and open new readers, while
+ * primary also opens a new reader.
+ *
+ * Nodes randomly crash and are restarted.  If the primary crashes, a replica is promoted.
+ *
+ * Merges are currently first finished on the primary and then pre-copied out to replicas with a merged segment warmer so they don't block
+ * ongoing NRT reopens.  Probably replicas could do their own merging instead, but this is more complex and may not be better overall
+ * (merging takes a lot of IO resources).
+ *
+ * Slow network is simulated with a RateLimiter.
+ */
+
+// MockRandom's .sd file has no index header/footer:
+@SuppressCodecs({"MockRandom", "Memory", "Direct", "SimpleText"})
+@SuppressSysoutChecks(bugUrl = "Stuff gets printed, important stuff for debugging a failure")
+public class TestStressNRTReplication extends LuceneTestCase {
+
+  // Test evilness controls:
+
+  /** Randomly crash the current primary (losing data!) and promote the "next best" replica. */
+  static final boolean DO_CRASH_PRIMARY = true;
+
+  /** Randomly crash (JVM core dumps) a replica; it will later randomly be restarted and sync itself. */
+  static final boolean DO_CRASH_REPLICA = true;
+
+  /** Randomly gracefully close a replica; it will later be restarted and sync itself. */
+  static final boolean DO_CLOSE_REPLICA = true;
+
+  /** If false, all child + parent output is interleaved into single stdout/err */
+  static final boolean SEPARATE_CHILD_OUTPUT = false;
+
+  // nocommit DO_CLOSE_PRIMARY?
+
+  /** Randomly crash whole cluster and then restart it */
+  static final boolean DO_FULL_CLUSTER_CRASH = true;
+
+  /** True if we randomly flip a bit while copying files out */
+  static final boolean DO_BIT_FLIPS_DURING_COPY = true;
+
+  /** Set to a non-null value to force exactly that many nodes; else, it's random. */
+  static final Integer NUM_NODES = null;
+
+  static final boolean DO_RANDOM_XLOG_REPLAY = false;
+
+  final AtomicBoolean failed = new AtomicBoolean();
+
+  final AtomicBoolean stop = new AtomicBoolean();
+
+  /** cwd where we start each child (server) node */
+  private Path childTempDir;
+
+  long primaryGen;
+
+  volatile long lastPrimaryVersion;
+
+  volatile NodeProcess primary;
+  volatile NodeProcess[] nodes;
+  volatile long[] nodeTimeStamps;
+  volatile boolean[] starting;
+  
+  Path[] indexPaths;
+
+  Path transLogPath;
+  SimpleTransLog transLog;
+  final AtomicInteger markerUpto = new AtomicInteger();
+
+  /** Maps searcher version to how many hits the query body:the matched. */
+  final Map<Long,Integer> hitCounts = new ConcurrentHashMap<>();
+
+  /** Maps searcher version to how many marker documents matched.  This should only ever grow (we never delete marker documents). */
+  final Map<Long,Integer> versionToMarker = new ConcurrentHashMap<>();
+
+  /** Maps searcher version to xlog location when refresh of this version started. */
+  final Map<Long,Long> versionToTransLogLocation = new ConcurrentHashMap<>();
+
+  final AtomicLong nodeStartCounter = new AtomicLong();
+
+  final Set<Integer> crashingNodes = Collections.synchronizedSet(new HashSet<>());
+
+  public void test() throws Exception {
+
+    Node.globalStartNS = System.nanoTime();
+
+    message("change thread name from " + Thread.currentThread().getName());
+    Thread.currentThread().setName("main");
+
+    childTempDir = createTempDir("child");
+
+    // We are parent process:
+
+    // Silly bootstrapping:
+    versionToTransLogLocation.put(0L, 0L);
+    versionToTransLogLocation.put(1L, 0L);
+
+    int numNodes;
+
+    if (NUM_NODES == null) {
+      numNodes = TestUtil.nextInt(random(), 2, 10);
+    } else {
+      numNodes = NUM_NODES.intValue();
+    }
+
+    System.out.println("TEST: using " + numNodes + " nodes");
+
+    transLogPath = createTempDir("NRTReplication").resolve("translog");
+    transLog = new SimpleTransLog(transLogPath);
+
+    //state.rateLimiters = new RateLimiter[numNodes];
+    indexPaths = new Path[numNodes];
+    nodes = new NodeProcess[numNodes];
+    nodeTimeStamps = new long[numNodes];
+    Arrays.fill(nodeTimeStamps, Node.globalStartNS);
+    starting = new boolean[numNodes];
+    
+    for(int i=0;i<numNodes;i++) {
+      indexPaths[i] = createTempDir("index" + i);
+    }
+
+    Thread[] indexers = new Thread[TestUtil.nextInt(random(), 1, 3)];
+    System.out.println("TEST: launch " + indexers.length + " indexer threads");
+    for(int i=0;i<indexers.length;i++) {
+      indexers[i] = new IndexThread();
+      indexers[i].setName("indexer" + i);
+      indexers[i].setDaemon(true);
+      indexers[i].start();
+    }
+
+    Thread[] searchers = new Thread[TestUtil.nextInt(random(), 1, 3)];
+    System.out.println("TEST: launch " + searchers.length + " searcher threads");
+    for(int i=0;i<searchers.length;i++) {
+      searchers[i] = new SearchThread();
+      searchers[i].setName("searcher" + i);
+      searchers[i].setDaemon(true);
+      searchers[i].start();
+    }
+
+    Thread restarter = new RestartThread();
+    restarter.setName("restarter");
+    restarter.setDaemon(true);
+    restarter.start();
+
+    int runTimeSec;
+    if (TEST_NIGHTLY) {
+      runTimeSec = RANDOM_MULTIPLIER * TestUtil.nextInt(random(), 120, 240);
+    } else {
+      runTimeSec = RANDOM_MULTIPLIER * TestUtil.nextInt(random(), 45, 120);
+    }
+
+    System.out.println("TEST: will run for " + runTimeSec + " sec");
+
+    long endTime = System.nanoTime() + runTimeSec*1000000000L;
+
+    sendReplicasToPrimary();
+
+    while (failed.get() == false && System.nanoTime() < endTime) {
+
+      // Wait a bit:
+      Thread.sleep(TestUtil.nextInt(random(), Math.min(runTimeSec*4, 200), runTimeSec*4));
+      if (primary != null && random().nextBoolean()) {
+        message("top: now flush primary");
+        NodeProcess curPrimary = primary;
+        if (curPrimary != null) {
+
+          // Save these before we start flush:
+          long nextTransLogLoc = transLog.getNextLocation();
+          int markerUptoSav = markerUpto.get();
+
+          long result;
+          try {
+            result = primary.flush();
+          } catch (Throwable t) {
+            message("top: flush failed; skipping: " + t.getMessage());
+            result = -1;
+          }
+          if (result > 0) {
+            // There were changes
+            lastPrimaryVersion = result;
+            addTransLogLoc(lastPrimaryVersion, nextTransLogLoc);
+            addVersionMarker(lastPrimaryVersion, markerUptoSav);
+          }
+        }
+      }
+
+      StringBuilder sb = new StringBuilder();
+      int liveCount = 0;
+      for(int i=0;i<nodes.length;i++) {
+        NodeProcess node = nodes[i];
+        if (node != null) {
+          if (sb.length() != 0) {
+            sb.append(" ");
+          }
+          liveCount++;
+          if (node.isPrimary) {
+            sb.append('P');
+          } else {
+            sb.append('R');
+          }
+          sb.append(i);
+        }
+      }
+
+      message("PG=" + (primary == null ? "X" : primaryGen) + " " + liveCount + " (of " + nodes.length + ") nodes running: " + sb);
+
+      // Commit a random node, primary or replica
+
+      {
+        NodeProcess node = nodes[random().nextInt(nodes.length)];
+        if (node != null) {
+          // TODO: if this node is primary, it means we committed a "partial" version (not exposed as an NRT point)... not sure it matters.
+          // maybe we somehow allow IW to commit a specific sis (the one we just flushed)?
+          message("top: now commit node=" + node);
+          node.commitAsync();
+        }
+      }
+    }
+
+    message("TEST: top: test done, now close");
+    stop.set(true);
+    for(Thread thread : indexers) {
+      thread.join();
+    }
+    for(Thread thread : searchers) {
+      thread.join();
+    }
+    restarter.join();
+
+    // Close replicas before primary so we cancel any in-progres replications:
+    System.out.println("TEST: top: now close replicas");
+    List<Closeable> toClose = new ArrayList<>();
+    for(NodeProcess node : nodes) {
+      if (node != primary && node != null) {
+        toClose.add(node);
+      }
+    }
+    IOUtils.close(toClose);
+    IOUtils.close(primary);
+    IOUtils.close(transLog);
+
+    if (failed.get() == false) {
+      message("TEST: top: now checkIndex");    
+      for(Path path : indexPaths) {
+        message("TEST: check " + path);
+        MockDirectoryWrapper dir = newMockFSDirectory(path);
+        // Just too slow otherwise
+        dir.setCrossCheckTermVectorsOnClose(false);
+        dir.close();
+      }
+    } else {
+      message("TEST: failed; skip checkIndex");
+    }
+  }
+
+  private boolean anyNodesStarting() {
+    for(int id=0;id<nodes.length;id++) {
+      if (starting[id]) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  /** Picks a replica and promotes it as new primary. */
+  private void promoteReplica() throws IOException {
+    message("top: primary crashed; now pick replica to promote");
+    long maxSearchingVersion = -1;
+    NodeProcess replicaToPromote = null;
+
+    // We must promote the most current replica, because otherwise file name reuse can cause a replication to fail when it needs to copy
+    // over a file currently held open for searching.  This also minimizes recovery work since the most current replica means less xlog
+    // replay to catch up:
+    for (NodeProcess node : nodes) {
+      if (node != null) {
+        message("ask " + node + " for its current searching version");
+        long searchingVersion = node.getSearchingVersion();
+        message(node + " has searchingVersion=" + searchingVersion);
+        if (searchingVersion > maxSearchingVersion) {
+          maxSearchingVersion = searchingVersion;
+          replicaToPromote = node;
+        }
+      }
+    }
+
+    if (replicaToPromote == null) {
+      message("top: no replicas running; skipping primary promotion");
+      return;
+    }
+
+    message("top: promote " + replicaToPromote + " version=" + maxSearchingVersion + "; now commit");
+    if (replicaToPromote.commit() == false) {
+      message("top: commit failed; skipping primary promotion");
+      return;
+    }
+
+    message("top: now shutdown " + replicaToPromote);
+    if (replicaToPromote.shutdown() == false) {
+      message("top: shutdown failed for R" + replicaToPromote.id + "; skipping primary promotion");
+      return;
+    }
+
+    int id = replicaToPromote.id;
+    message("top: now startPrimary " + replicaToPromote);
+    startPrimary(replicaToPromote.id);
+  }
+
+  void startPrimary(int id) throws IOException {
+    message(id + ": top: startPrimary lastPrimaryVersion=" + lastPrimaryVersion);
+    assert nodes[id] == null;
+
+    // Force version of new primary to advance beyond where old primary was, so we never re-use versions.  It may have
+    // already advanced beyond newVersion, e.g. if it flushed new segments while during xlog replay:
+
+    // First start node as primary (it opens an IndexWriter) but do not publish it for searching until we replay xlog:
+    NodeProcess newPrimary = startNode(id, indexPaths[id], true, lastPrimaryVersion+1);
+    if (newPrimary == null) {
+      message("top: newPrimary failed to start; abort");
+      return;
+    }
+
+    // Get xlog location that this node was guaranteed to already have indexed through; this may replay some ops already indexed but it's OK
+    // because the ops are idempotent: we updateDocument (by docid) on replay even for original addDocument:
+    Long startTransLogLoc;
+    Integer markerCount;
+    if (newPrimary.initCommitVersion == 0) {
+      startTransLogLoc = 0L;
+      markerCount = 0;
+    } else {
+      startTransLogLoc = versionToTransLogLocation.get(newPrimary.initCommitVersion);
+      markerCount = versionToMarker.get(newPrimary.initCommitVersion);
+    }
+    assert startTransLogLoc != null: "newPrimary.initCommitVersion=" + newPrimary.initCommitVersion + " is missing from versionToTransLogLocation: keys=" + versionToTransLogLocation.keySet();
+    assert markerCount != null: "newPrimary.initCommitVersion=" + newPrimary.initCommitVersion + " is missing from versionToMarker: keys=" + versionToMarker.keySet();
+
+    // When the primary starts, the userData in its latest commit point tells us which version it had indexed up to, so we know where to
+    // replay from in the xlog.  However, we forcefuly advance the version, and then IW on init (or maybe getReader) also adds 1 to it.
+    // Since we publish the primary in this state (before xlog replay is done), a replica can start up at this point and pull this version,
+    // and possibly later be chosen as a primary, causing problems if the version is known recorded in the translog map.  So we record it
+    // here:
+
+    addTransLogLoc(newPrimary.initInfosVersion, startTransLogLoc);
+    addVersionMarker(newPrimary.initInfosVersion, markerCount);
+
+    assert newPrimary.initInfosVersion >= lastPrimaryVersion;
+    message("top: now change lastPrimaryVersion from " + lastPrimaryVersion + " to " + newPrimary.initInfosVersion);
+    lastPrimaryVersion = newPrimary.initInfosVersion;
+
+    // Publish new primary, before replaying xlog.  This means other indexing ops can come in at the same time as we catch up indexing
+    // previous ops.  Effectively, we have "forked" the indexing ops, by rolling back in time a bit, and replaying old indexing ops (from
+    // translog) concurrently with new incoming ops.
+    nodes[id] = newPrimary;
+    primary = newPrimary;
+
+    sendReplicasToPrimary();
+
+    long nextTransLogLoc = transLog.getNextLocation();
+    int nextMarkerUpto = markerUpto.get();
+    message("top: replay trans log " + startTransLogLoc + " (version=" + newPrimary.initCommitVersion + ") to " + nextTransLogLoc + " (translog end)");
+    try {
+      transLog.replay(newPrimary, startTransLogLoc, nextTransLogLoc);
+    } catch (IOException ioe) {
+      message("top: replay xlog failed; abort");
+      return;
+    }
+    message("top: done replay trans log");
+  }
+
+  /** Launches a child "server" (separate JVM), which is either primary or replica node */
+  NodeProcess startNode(final int id, Path indexPath, boolean isPrimary, long forcePrimaryVersion) throws IOException {
+    nodeTimeStamps[id] = System.nanoTime();
+    List<String> cmd = new ArrayList<>();
+
+    NodeProcess curPrimary = primary;
+
+    cmd.add(System.getProperty("java.home") 
+        + System.getProperty("file.separator")
+        + "bin"
+        + System.getProperty("file.separator")
+        + "java");
+    cmd.add("-Xmx512m");
+
+    if (curPrimary != null) {
+      cmd.add("-Dtests.nrtreplication.primaryTCPPort=" + curPrimary.tcpPort);
+    } else if (isPrimary == false) {
+      // We cannot start a replica when there is no primary:
+      return null;
+    }
+
+    cmd.add("-Dtests.nrtreplication.node=true");
+    cmd.add("-Dtests.nrtreplication.nodeid=" + id);
+    cmd.add("-Dtests.nrtreplication.startNS=" + Node.globalStartNS);
+    cmd.add("-Dtests.nrtreplication.indexpath=" + indexPath);
+    if (isPrimary) {
+      cmd.add("-Dtests.nrtreplication.isPrimary=true");
+      cmd.add("-Dtests.nrtreplication.forcePrimaryVersion=" + forcePrimaryVersion);
+      if (DO_CRASH_PRIMARY) {
+        cmd.add("-Dtests.nrtreplication.doRandomCrash=true");
+      }
+    } else {
+      if (DO_CRASH_REPLICA) {
+        cmd.add("-Dtests.nrtreplication.doRandomCrash=true");
+      }
+      if (DO_CLOSE_REPLICA) {
+        cmd.add("-Dtests.nrtreplication.doRandomClose=true");
+      }
+    }
+
+    if (DO_BIT_FLIPS_DURING_COPY) {
+      cmd.add("-Dtests.nrtreplication.doFlipBitsDuringCopy=true");
+    }
+
+    long myPrimaryGen = primaryGen;
+    cmd.add("-Dtests.nrtreplication.primaryGen=" + myPrimaryGen);
+
+    // Mixin our own counter because this is called from a fresh thread which means the seed otherwise isn't changing each time we spawn a
+    // new node:
+    long seed = random().nextLong() * nodeStartCounter.incrementAndGet();
+
+    cmd.add("-Dtests.seed=" + SeedUtils.formatSeed(seed));
+    cmd.add("-ea");
+    cmd.add("-cp");
+    cmd.add(System.getProperty("java.class.path"));
+    cmd.add("org.junit.runner.JUnitCore");
+    cmd.add(getClass().getName().replace(getClass().getSimpleName(), "SimpleServer"));
+
+    Writer childLog;
+
+    if (SEPARATE_CHILD_OUTPUT) {
+      Path childOut = childTempDir.resolve(id + ".log");
+      message("logging to " + childOut);
+      childLog = Files.newBufferedWriter(childOut, StandardCharsets.UTF_8, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
+      childLog.write("\n\nSTART NEW CHILD:\n");
+    } else {
+      childLog = null;
+    }
+
+    message("child process command: " + cmd);
+    ProcessBuilder pb = new ProcessBuilder(cmd);
+    pb.redirectErrorStream(true);
+
+    // Important, so that the scary looking hs_err_<pid>.log appear under our test temp dir:
+    pb.directory(childTempDir.toFile());
+
+    Process p = pb.start();
+
+    BufferedReader r;
+    try {
+      r = new BufferedReader(new InputStreamReader(p.getInputStream(), IOUtils.UTF_8));
+    } catch (UnsupportedEncodingException uee) {
+      throw new RuntimeException(uee);
+    }
+
+    int tcpPort = -1;
+    long initCommitVersion = -1;
+    long initInfosVersion = -1;
+    Pattern logTimeStart = Pattern.compile("^[0-9\\.]+s .*");
+    boolean willCrash = false;
+    boolean sawExistingSegmentsFile = false;
+
+    while (true) {
+      String l = r.readLine();
+      if (l == null) {
+        message("top: node=" + id + " failed to start");
+        try {
+          p.waitFor();
+        } catch (InterruptedException ie) {
+          throw new RuntimeException(ie);
+        }
+        message("exit value=" + p.exitValue());
+
+        // Hackity hack, in case primary crashed/closed and we haven't noticed (reaped the process) yet:
+        if (isPrimary == false) {
+          if (sawExistingSegmentsFile) {
+            // This means MDW's virus checker blocked us from deleting segments_N that we must delete in order to start ... just return null
+            // and retry again later:
+            message("failed to remove segments_N; skipping");
+            return null;
+          }
+          for(int i=0;i<10;i++) {
+            if (primaryGen != myPrimaryGen || primary == null) {
+              // OK: primary crashed while we were trying to start, so it's expected/allowed that we could not start the replica:
+              message("primary crashed/closed while replica R" + id + " tried to start; skipping");
+              return null;
+            } else {
+              try {
+                Thread.sleep(10);
+              } catch (InterruptedException ie) {
+                throw new ThreadInterruptedException(ie);
+              }
+            }
+          }
+        }
+
+        // Should fail the test:
+        message("top: now fail test replica R" + id + " failed to start");
+        failed.set(true);
+        throw new RuntimeException("replica R" + id + " failed to start");
+      }
+
+      if (childLog != null) {
+        childLog.write(l);
+        childLog.write("\n");
+        childLog.flush();
+      } else if (logTimeStart.matcher(l).matches()) {
+        // Already a well-formed log output:
+        System.out.println(l);
+      } else {
+        message(l);
+      }
+
+      if (l.startsWith("PORT: ")) {
+        tcpPort = Integer.parseInt(l.substring(6).trim());
+      } else if (l.startsWith("COMMIT VERSION: ")) {
+        initCommitVersion = Integer.parseInt(l.substring(16).trim());
+      } else if (l.startsWith("INFOS VERSION: ")) {
+        initInfosVersion = Integer.parseInt(l.substring(15).trim());
+      } else if (l.contains("will crash after")) {
+        willCrash = true;
+      } else if (l.startsWith("NODE STARTED")) {
+        break;
+      } else if (l.contains("replica cannot start: existing segments file=")) {
+        sawExistingSegmentsFile = true;
+      }
+    }
+
+    final boolean finalWillCrash = willCrash;
+
+    // Baby sits the child process, pulling its stdout and printing to our stdout, calling nodeClosed once it exits:
+    Thread pumper = ThreadPumper.start(
+                                       new Runnable() {
+                                         @Override
+                                         public void run() {
+                                           message("now wait for process " + p);
+                                           try {
+                                             p.waitFor();
+                                           } catch (Throwable t) {
+                                             throw new RuntimeException(t);
+                                           }
+
+                                           message("done wait for process " + p);
+                                           int exitValue = p.exitValue();
+                                           message("exit value=" + exitValue + " willCrash=" + finalWillCrash);
+                                           if (childLog != null) {
+                                             try {
+                                               childLog.write("process done; exitValue=" + exitValue + "\n");
+                                               childLog.close();
+                                             } catch (IOException ioe) {
+                                               throw new RuntimeException(ioe);
+                                             }
+                                           }
+                                           if (exitValue != 0 && finalWillCrash == false && crashingNodes.remove(id) == false) {
+                                             // should fail test
+                                             failed.set(true);
+                                             if (childLog != null) {
+                                               throw new RuntimeException("node " + id + " process had unexpected non-zero exit status=" + exitValue + "; see " + childLog + " for details");
+                                             } else {
+                                               throw new RuntimeException("node " + id + " process had unexpected non-zero exit status=" + exitValue);
+                                             }
+                                           }
+                                           nodeClosed(id);
+                                         }
+                                       }, r, System.out, childLog);
+    pumper.setName("pump" + id);
+
+    message("top: node=" + id + " started at tcpPort=" + tcpPort + " initCommitVersion=" + initCommitVersion + " initInfosVersion=" + initInfosVersion);
+    return new NodeProcess(p, id, tcpPort, pumper, isPrimary, initCommitVersion, initInfosVersion);
+  }
+
+  private void nodeClosed(int id) {
+    NodeProcess oldNode = nodes[id];
+    if (primary != null && oldNode == primary) {
+      message("top: " + primary + ": primary process finished");
+      primary = null;
+      primaryGen++;
+    } else {
+      message("top: " + oldNode + ": replica process finished");
+    }
+    if (oldNode != null) {
+      oldNode.isOpen = false;
+    }
+    nodes[id] = null;
+    nodeTimeStamps[id] = System.nanoTime();
+
+    sendReplicasToPrimary();
+  }
+
+  /** Sends currently alive replicas to primary, which uses this to know who to notify when it does a refresh */
+  private void sendReplicasToPrimary() {
+    NodeProcess curPrimary = primary;
+    if (curPrimary != null) {
+      List<NodeProcess> replicas = new ArrayList<>();
+      for (NodeProcess node : nodes) {
+        if (node != null && node.isPrimary == false) {
+          replicas.add(node);
+        }
+      }
+
+      message("top: send " + replicas.size() + " replicas to primary");
+
+      try (Connection c = new Connection(curPrimary.tcpPort)) {
+        c.out.writeByte(SimplePrimaryNode.CMD_SET_REPLICAS);
+        c.out.writeVInt(replicas.size());        
+        for(NodeProcess replica : replicas) {
+          c.out.writeVInt(replica.id);
+          c.out.writeVInt(replica.tcpPort);
+        }
+        c.flush();
+        c.in.readByte();
+      } catch (Throwable t) {
+        message("top: ignore exc sending replicas to primary: " + t);
+      }
+    }
+  }
+
+  void addVersionMarker(long version, int count) {
+    //System.out.println("ADD VERSION MARKER version=" + version + " count=" + count);
+    if (versionToMarker.containsKey(version)) {
+      int curCount = versionToMarker.get(version);
+      if (curCount != count) {
+        message("top: wrong marker count version=" + version + " count=" + count + " curCount=" + curCount);
+        throw new IllegalStateException("version=" + version + " count=" + count + " curCount=" + curCount);
+      }
+    } else {
+      message("top: record marker count: version=" + version + " count=" + count);
+      versionToMarker.put(version, count);
+    }
+  }
+
+  void addTransLogLoc(long version, long loc) {
+    message("top: record transLogLoc: version=" + version + " loc=" + loc);
+    versionToTransLogLocation.put(version, loc);
+  }
+
+  // Periodically wakes up and starts up any down nodes:
+  private class RestartThread extends Thread {
+    @Override
+    public void run() {
+
+      List<Thread> startupThreads = Collections.synchronizedList(new ArrayList<>());
+
+      try {
+        while (stop.get() == false) {
+          Thread.sleep(TestUtil.nextInt(random(), 50, 500));
+          message("top: restarter cycle");
+
+          // Randomly crash full cluster:
+          if (DO_FULL_CLUSTER_CRASH && random().nextInt(50) == 17) {
+            message("top: full cluster crash");
+            for(int i=0;i<nodes.length;i++) {
+              if (starting[i]) {
+                message("N" + i + ": top: wait for startup so we can crash...");
+                while (starting[i]) {
+                  Thread.sleep(10);
+                }
+                message("N" + i + ": top: done wait for startup");
+              }
+              NodeProcess node = nodes[i];
+              if (node != null) {
+                crashingNodes.add(i);
+                message("top: N" + node.id + ": top: now crash node");
+                node.crash();
+                message("top: N" + node.id + ": top: done crash node");
+              }
+            }
+          }
+
+          List<Integer> downNodes = new ArrayList<>();
+          StringBuilder b = new StringBuilder();
+          long nowNS = System.nanoTime();
+          for(int i=0;i<nodes.length;i++) {
+            b.append(' ');
+            double sec = (nowNS - nodeTimeStamps[i])/1000000000.0;
+            String prefix;
+            if (nodes[i] == null) {
+              downNodes.add(i);
+              if (starting[i]) {
+                prefix = "s";
+              } else {
+                prefix = "x";
+              }
+            } else {
+              prefix = "";
+            }
+            if (primary != null && nodes[i] == primary) {
+              prefix += "p";
+            }
+            b.append(String.format(Locale.ROOT, "%s%d(%.1fs)", prefix, i, sec));
+          }
+          message("node status" + b.toString());
+          message("downNodes=" + downNodes);
+
+          // If primary is down, promote a replica:
+          if (primary == null) {
+            if (anyNodesStarting()) {
+              message("top: skip promote replica: nodes are still starting");
+              continue;
+            }
+            promoteReplica();
+          }
+
+          // Randomly start up a down a replica:
+
+          // Stop or start a replica
+          if (downNodes.isEmpty() == false) {
+            int idx = downNodes.get(random().nextInt(downNodes.size()));
+            if (starting[idx] == false) {
+              if (primary == null) {
+                if (downNodes.size() == nodes.length) {
+                  // Cold start: entire cluster is down, start this node up as the new primary
+                  message("N" + idx + ": top: cold start as primary");
+                  startPrimary(idx);
+                }
+              } else if (random().nextDouble() < ((double) downNodes.size())/nodes.length) {
+                // Start up replica:
+                starting[idx] = true;
+                message("N" + idx + ": top: start up: launch thread");
+                Thread t = new Thread() {
+                    @Override
+                    public void run() {
+                      try {
+                        message("N" + idx + ": top: start up thread");
+                        nodes[idx] = startNode(idx, indexPaths[idx], false, -1);
+                        sendReplicasToPrimary();
+                      } catch (Throwable t) {
+                        failed.set(true);
+                        stop.set(true);
+                        throw new RuntimeException(t);
+                      } finally {
+                        starting[idx] = false;
+                        startupThreads.remove(Thread.currentThread());
+                      }
+                    }
+                  };
+                t.setName("start R" + idx);
+                t.start();
+                startupThreads.add(t);
+              }
+            } else {
+              message("node " + idx + " still starting");
+            }
+          }
+        }
+
+        System.out.println("Restarter: now stop: join " + startupThreads.size() + " startup threads");
+
+        while (startupThreads.size() > 0) {
+          Thread.sleep(10);
+        }
+
+      } catch (Throwable t) {
+        failed.set(true);
+        stop.set(true);
+        throw new RuntimeException(t);
+      }
+    }
+  }
+
+  /** Randomly picks a node and runs a search against it */
+  private class SearchThread extends Thread {
+
+    @Override
+    public void run() {
+      // Maps version to number of hits for silly 'the' TermQuery:
+      Query theQuery = new TermQuery(new Term("body", "the"));
+
+      // Persists connections
+      Map<Integer,Connection> connections = new HashMap<>();
+
+      while (stop.get() == false) {
+        NodeProcess node = nodes[random().nextInt(nodes.length)];
+        if (node == null || node.isOpen == false) {
+          continue;
+        }
+
+        if (node.lock.tryLock() == false) {
+          // Node is in the process of closing or crashing or something
+          continue;
+        }
+
+        try {
+
+          Thread.currentThread().setName("Searcher node=" + node);
+
+          //System.out.println("S: cycle; conns=" + connections);
+
+          Connection c = connections.get(node.id);
+
+          long version;
+          try {
+            if (c == null) {
+              //System.out.println("S: new connection " + node.id + " " + Thread.currentThread().getName());
+              c = new Connection(node.tcpPort);
+              connections.put(node.id, c);
+            } else {
+              //System.out.println("S: reuse connection " + node.id + " " + Thread.currentThread().getName());
+            }
+
+            c.out.writeByte(SimplePrimaryNode.CMD_SEARCH);
+            c.flush();
+
+            while (c.sockIn.available() == 0) {
+              if (stop.get()) {
+                break;
+              }
+              if (node.isOpen == false) {
+                throw new IOException("node closed");
+              }
+              Thread.sleep(1);
+            }
+            version = c.in.readVLong();
+
+            while (c.sockIn.available() == 0) {
+              if (stop.get()) {
+                break;
+              }
+              if (node.isOpen == false) {
+                throw new IOException("node closed");
+              }
+              Thread.sleep(1);
+            }
+            int hitCount = c.in.readVInt();
+
+            Integer oldHitCount = hitCounts.get(version);
+
+            // TODO: we never prune this map...
+            if (oldHitCount == null) {
+              hitCounts.put(version, hitCount);
+              message("top: searcher: record search hitCount version=" + version + " hitCount=" + hitCount + " node=" + node);
+            } else {
+              // Just ensure that all nodes show the same hit count for
+              // the same version, i.e. they really are replicas of one another:
+              if (oldHitCount.intValue() != hitCount) {
+                failed.set(true);
+                stop.set(true);
+                message("top: searcher: wrong version hitCount: version=" + version + " oldHitCount=" + oldHitCount.intValue() + " hitCount=" + hitCount);
+                fail("version=" + version + " oldHitCount=" + oldHitCount.intValue() + " hitCount=" + hitCount);
+              }
+            }
+          } catch (IOException ioe) {
+            //message("top: searcher: ignore exc talking to node " + node + ": " + ioe);
+            //ioe.printStackTrace(System.out);
+            IOUtils.closeWhileHandlingException(c);
+            connections.remove(node.id);
+            continue;
+          }
+
+          // This can be null if we got the new primary after crash and that primary is still catching up (replaying xlog):
+          Integer expectedAtLeastHitCount = versionToMarker.get(version);
+
+          if (expectedAtLeastHitCount != null && expectedAtLeastHitCount > 0 && random().nextInt(10) == 7) {
+            try {
+              c.out.writeByte(SimplePrimaryNode.CMD_MARKER_SEARCH);
+              c.flush();
+              while (c.sockIn.available() == 0) {
+                if (stop.get()) {
+                  break;
+                }
+                if (node.isOpen == false) {
+                  throw new IOException("node died");
+                }
+                Thread.sleep(1);
+              }
+
+              version = c.in.readVLong();
+
+              while (c.sockIn.available() == 0) {
+                if (stop.get()) {
+                  break;
+                }
+                if (node.isOpen == false) {
+                  throw new IOException("node died");
+                }
+                Thread.sleep(1);
+              }
+
+              int hitCount = c.in.readVInt();
+
+              // Look for data loss: make sure all marker docs are visible:
+            
+              if (hitCount < expectedAtLeastHitCount) {
+
+                String failMessage = "node=" + node + ": documents were lost version=" + version + " hitCount=" + hitCount + " vs expectedAtLeastHitCount=" + expectedAtLeastHitCount;
+                message(failMessage);
+                failed.set(true);
+                stop.set(true);
+                fail(failMessage);
+              }
+            } catch (IOException ioe) {
+              //message("top: searcher: ignore exc talking to node " + node + ": " + ioe);
+              //throw new RuntimeException(ioe);
+              //ioe.printStackTrace(System.out);
+              IOUtils.closeWhileHandlingException(c);
+              connections.remove(node.id);
+              continue;
+            }
+          }
+
+          Thread.sleep(10);
+
+        } catch (Throwable t) {
+          failed.set(true);
+          stop.set(true);
+          throw new RuntimeException(t);
+        } finally {
+          node.lock.unlock();
+        }
+      }
+      System.out.println("Searcher: now stop");
+      IOUtils.closeWhileHandlingException(connections.values());
+    }
+  }
+
+  private class IndexThread extends Thread {
+
+    @Override
+    public void run() {
+
+      try {
+        LineFileDocs docs = new LineFileDocs(random());
+        int docCount = 0;
+
+        // How often we do an update/delete vs add:
+        double updatePct = random().nextDouble();
+
+        // Varies how many docs/sec we index:
+        int sleepChance = TestUtil.nextInt(random(), 4, 100);
+
+        message("top: indexer: updatePct=" + updatePct + " sleepChance=" + sleepChance);
+
+        long lastTransLogLoc = transLog.getNextLocation();
+        
+        NodeProcess curPrimary = null;
+        Connection c = null;
+
+        while (stop.get() == false) {
+
+          try {
+            while (stop.get() == false && curPrimary == null) {
+              Thread.sleep(10);
+              curPrimary = primary;
+              if (curPrimary != null) {
+                c = new Connection(curPrimary.tcpPort);
+                c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+                break;
+              }
+            }
+
+            if (stop.get()) {
+              break;
+            }
+
+            Thread.currentThread().setName("indexer p" + curPrimary.id);
+
+            if (random().nextInt(10) == 7) {
+              // We use the marker docs to check for data loss in search thread:
+              Document doc = new Document();
+              int id = markerUpto.getAndIncrement();
+              String idString = "m"+id;
+              doc.add(newStringField("docid", idString, Field.Store.YES));
+              doc.add(newStringField("marker", "marker", Field.Store.YES));
+              curPrimary.addOrUpdateDocument(c, doc, false);
+              transLog.addDocument(idString, doc);
+              message("index marker=" + idString + "; translog is " + Node.bytesToString(Files.size(transLogPath)));
+            }
+
+            if (docCount > 0 && random().nextDouble() < updatePct) {
+              int randomID = random().nextInt(docCount);
+              String randomIDString = Integer.toString(randomID);
+              if (random().nextBoolean()) {
+                // Replace previous doc
+                Document doc = docs.nextDoc();
+                ((Field) doc.getField("docid")).setStringValue(randomIDString);
+                curPrimary.addOrUpdateDocument(c, doc, true);
+                transLog.updateDocument(randomIDString, doc);
+              } else {
+                // Delete previous doc
+                curPrimary.deleteDocument(c, randomIDString);
+                transLog.deleteDocuments(randomIDString);
+              }
+            } else {
+              // Add new doc:
+              Document doc = docs.nextDoc();
+              String idString = Integer.toString(docCount++);
+              ((Field) doc.getField("docid")).setStringValue(idString);
+              curPrimary.addOrUpdateDocument(c, doc, false);
+              transLog.addDocument(idString, doc);
+
+              if (DO_RANDOM_XLOG_REPLAY && random().nextInt(10) == 7) {
+                long curLoc = transLog.getNextLocation();
+                // randomly replay chunks of translog just to test replay:
+                message("now randomly replay translog from " + lastTransLogLoc + " to " + curLoc);
+                transLog.replay(curPrimary, lastTransLogLoc, curLoc);
+                lastTransLogLoc = curLoc;
+              }
+            }
+          } catch (IOException se) {
+            // Assume primary crashed
+            message("top: indexer lost connection to primary");
+            try {
+              c.close();
+            } catch (Throwable t) {
+            }
+            curPrimary = null;
+            c = null;
+          }
+
+          if (random().nextInt(sleepChance) == 0) {
+            Thread.sleep(1);
+          }
+
+          if (random().nextInt(100) == 17) {
+            System.out.println("Indexer: now pause for a bit...");
+            Thread.sleep(TestUtil.nextInt(random(), 500, 2000));
+            System.out.println("Indexer: done pause for a bit...");
+          }
+        }
+        if (curPrimary != null) {
+          try {
+            c.out.writeByte(SimplePrimaryNode.CMD_INDEXING_DONE);
+            c.flush();
+            c.in.readByte();
+          } catch (IOException se) {
+            // Assume primary crashed
+            message("top: indexer lost connection to primary");
+            try {
+              c.close();
+            } catch (Throwable t) {
+            }
+            curPrimary = null;
+            c = null;
+          }
+        }
+        System.out.println("Indexer: now stop");
+      } catch (Throwable t) {
+        failed.set(true);
+        stop.set(true);
+        throw new RuntimeException(t);
+      }
+    }
+  }
+
+  static void message(String message) {
+    long now = System.nanoTime();
+    System.out.println(String.format(Locale.ROOT,
+                                     "%5.3fs       :     parent [%11s] %s",
+                                     (now-Node.globalStartNS)/1000000000.,
+                                     Thread.currentThread().getName(),
+                                     message));
+  }
+}