You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/02/11 17:42:36 UTC

[02/31] lucene-solr git commit: current patch

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
new file mode 100644
index 0000000..4e29508
--- /dev/null
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
@@ -0,0 +1,238 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.DataOutput;
+import org.apache.lucene.store.InputStreamDataInput;
+import org.apache.lucene.store.OutputStreamDataOutput;
+
+/** Parent JVM hold this "wrapper" to refer to each child JVM.  This is roughly equivalent e.g. to a client-side "sugar" API. */
+class NodeProcess implements Closeable {
+  final Process p;
+
+  // Port sub-process is listening on
+  final int tcpPort;
+
+  final int id;
+
+  final Thread pumper;
+
+  // Acquired when searching or indexing wants to use this node:
+  final ReentrantLock lock;
+
+  final boolean isPrimary;
+
+  // Version in the commit point we opened on init:
+  final long initCommitVersion;
+
+  // SegmentInfos.version, which can be higher than the initCommitVersion
+  final long initInfosVersion;
+
+  volatile boolean isOpen = true;
+
+  public NodeProcess(Process p, int id, int tcpPort, Thread pumper, boolean isPrimary, long initCommitVersion, long initInfosVersion) {
+    this.p = p;
+    this.id = id;
+    this.tcpPort = tcpPort;
+    this.pumper = pumper;
+    this.isPrimary = isPrimary;
+    this.initCommitVersion = initCommitVersion;
+    this.initInfosVersion = initInfosVersion;
+    assert initInfosVersion >= initCommitVersion: "initInfosVersion=" + initInfosVersion + " initCommitVersion=" + initCommitVersion;
+    lock = new ReentrantLock();
+  }
+
+  @Override
+  public String toString() {
+    if (isPrimary) {
+      return "P" + id + " tcpPort=" + tcpPort;
+    } else {
+      return "R" + id + " tcpPort=" + tcpPort;
+    }
+  }
+
+  public synchronized void crash() {
+    if (isOpen) {
+      isOpen = false;
+      p.destroy();
+      try {
+        pumper.join();
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(ie);
+      }
+    }
+  }
+
+  public boolean commit() {
+    try (Connection c = new Connection(tcpPort)) {
+      c.out.writeByte(SimplePrimaryNode.CMD_COMMIT);
+      c.flush();
+      c.s.shutdownOutput();
+      if (c.in.readByte() != 1) {
+        throw new RuntimeException("commit failed");
+      }
+      return true;
+    } catch (Throwable t) {
+      // Something wrong with this replica; skip it:
+      System.out.println("PARENT: top: hit SocketException during commit with R" + id + ": " + t + "; skipping");
+      return false;
+    }
+  }
+
+  public void commitAsync() {
+    try (Connection c = new Connection(tcpPort)) {
+      c.out.writeByte(SimplePrimaryNode.CMD_COMMIT);
+      c.flush();
+    } catch (Throwable t) {
+      // Something wrong with this replica; skip it:
+      System.out.println("PARENT: top: hit SocketException during commit with R" + id + ": " + t + "; skipping");
+    }
+  }
+
+  public long getSearchingVersion() {
+    try (Connection c = new Connection(tcpPort)) {
+      c.out.writeByte(SimplePrimaryNode.CMD_GET_SEARCHING_VERSION);
+      c.flush();
+      c.s.shutdownOutput();
+      return c.in.readVLong();
+    } catch (Throwable t) {
+      // Something wrong with this replica; skip it:
+      System.out.println("PARENT: top: hit SocketException during getSearchingVersion with R" + id + "; skipping");
+      return -1L;
+    }
+  }
+
+  /** Ask the primary node process to flush.  We send it all currently up replicas so it can notify them about the new NRT point.  Returns the newly
+   *  flushed version, or a negative (current) version if there were no changes. */
+  public synchronized long flush() throws IOException {
+    assert isPrimary;
+    try (Connection c = new Connection(tcpPort)) {
+      c.out.writeByte(SimplePrimaryNode.CMD_FLUSH);
+      c.flush();
+      c.s.shutdownOutput();
+      return c.in.readLong();
+    }
+  }
+
+  @Override
+  public void close() {
+    shutdown();
+  }
+
+  public synchronized boolean shutdown() {
+    lock.lock();
+    try {
+      System.out.println("PARENT: now shutdown node=" + id + " isOpen=" + isOpen);
+      if (isOpen) {
+        // Ask the child process to shutdown gracefully:
+        isOpen = false;
+        System.out.println("PARENT: send CMD_CLOSE to node=" + id);
+        try (Connection c = new Connection(tcpPort)) {
+          c.out.writeByte(SimplePrimaryNode.CMD_CLOSE);
+          c.flush();
+          if (c.in.readByte() != 1) {
+            throw new RuntimeException("shutdown failed");
+          }
+        } catch (Throwable t) {
+          System.out.println("top: shutdown failed; skipping");
+          t.printStackTrace(System.out);
+          return false;
+        }
+        try {
+          p.waitFor();
+          pumper.join();
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException(ie);
+        }
+      }
+
+      return true;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public void addOrUpdateDocument(Connection c, Document doc, boolean isUpdate) throws IOException {
+    if (isPrimary == false) {
+      throw new IllegalStateException("only primary can index");
+    }
+    int fieldCount = 0;
+
+    String title = doc.get("title");
+    if (title != null) {
+      fieldCount++;
+    }
+
+    String docid = doc.get("docid");
+    assert docid != null;
+    fieldCount++;
+
+    String body = doc.get("body");
+    if (body != null) {
+      fieldCount++;
+    }
+
+    String marker = doc.get("marker");
+    if (marker != null) {
+      fieldCount++;
+    }
+
+    c.out.writeByte(isUpdate ? SimplePrimaryNode.CMD_UPDATE_DOC : SimplePrimaryNode.CMD_ADD_DOC);
+    c.out.writeVInt(fieldCount);
+    c.out.writeString("docid");
+    c.out.writeString(docid);
+    if (title != null) {
+      c.out.writeString("title");
+      c.out.writeString(title);
+    }
+    if (body != null) {
+      c.out.writeString("body");
+      c.out.writeString(body);
+    }
+    if (marker != null) {
+      c.out.writeString("marker");
+      c.out.writeString(marker);
+    }
+    c.flush();
+    c.in.readByte();
+  }
+
+  public void deleteDocument(Connection c, String docid) throws IOException {
+    if (isPrimary == false) {
+      throw new IllegalStateException("only primary can index");
+    }
+    c.out.writeByte(SimplePrimaryNode.CMD_DELETE_DOC);
+    c.out.writeString(docid);
+    c.flush();
+    c.in.readByte();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleCopyJob.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleCopyJob.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleCopyJob.java
new file mode 100644
index 0000000..1180967
--- /dev/null
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleCopyJob.java
@@ -0,0 +1,294 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.DataOutput;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.InputStreamDataInput;
+import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.store.OutputStreamDataOutput;
+import org.apache.lucene.store.OutputStreamIndexOutput;
+import org.apache.lucene.store.RateLimiter;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.TestUtil;
+
+/** Handles one set of files that need copying, either because we have a
+ *  new NRT point, or we are pre-copying merged files for merge warming. */
+class SimpleCopyJob extends CopyJob {
+  final Connection c;
+
+  final byte[] copyBuffer = new byte[65536];
+  final CopyState copyState;
+
+  private Iterator<Map.Entry<String,FileMetaData>> iter;
+
+  public SimpleCopyJob(String reason, Connection c, CopyState copyState, SimpleReplicaNode dest, Map<String,FileMetaData> files, boolean highPriority, OnceDone onceDone)
+    throws IOException {
+    super(reason, files, dest, highPriority, onceDone);
+    dest.message("create SimpleCopyJob o" + ord);
+    this.c = c;
+    this.copyState = copyState;
+  }
+
+  @Override
+  public synchronized void start() throws IOException {
+    if (iter == null) {
+      iter = toCopy.iterator();
+
+      // Send all file names / offsets up front to avoid ping-ping latency:
+      try {
+
+        // This means we resumed an already in-progress copy; we do this one first:
+        if (current != null) {
+          c.out.writeByte((byte) 0);
+          c.out.writeString(current.name);
+          c.out.writeVLong(current.getBytesCopied());
+          totBytes += current.metaData.length;
+        }
+
+        for (Map.Entry<String,FileMetaData> ent : toCopy) {
+          String fileName = ent.getKey();
+          FileMetaData metaData = ent.getValue();
+          totBytes += metaData.length;
+          c.out.writeByte((byte) 0);
+          c.out.writeString(fileName);
+          c.out.writeVLong(0);
+        }
+        c.out.writeByte((byte) 1);
+        c.flush();
+        c.s.shutdownOutput();
+
+        if (current != null) {
+          // Do this only at the end, after sending all requested files, so we don't deadlock due to socket buffering waiting for primary to
+          // send us this length:
+          long len = c.in.readVLong();
+          if (len != current.metaData.length) {
+            throw new IllegalStateException("file " + current.name + ": meta data says length=" + current.metaData.length + " but c.in says " + len);
+          }
+        }
+
+        dest.message("SimpleCopyJob.init: done start files count=" + toCopy.size() + " totBytes=" + totBytes);
+
+      } catch (Throwable t) {
+        cancel("exc during start", t);
+        throw new NodeCommunicationException("exc during start", t);
+      }
+    } else {
+      throw new IllegalStateException("already started");
+    }
+  }
+
+  @Override
+  public long getTotalBytesCopied() {
+    return totBytesCopied;
+  }
+
+  @Override
+  public Set<String> getFileNamesToCopy() {
+    Set<String> fileNames = new HashSet<>();
+    for(Map.Entry<String,FileMetaData> ent : toCopy) {
+      fileNames.add(ent.getKey());
+    }
+    return fileNames;
+  }
+
+  @Override
+  public Set<String> getFileNames() {
+    return files.keySet();
+  }
+
+  /** Higher priority and then "first come first serve" order. */
+  @Override
+  public int compareTo(CopyJob _other) {
+    SimpleCopyJob other = (SimpleCopyJob) _other;
+    if (highPriority != other.highPriority) {
+      return highPriority ? -1 : 1;
+    } else if (ord < other.ord) {
+      return -1;
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public void finish() throws IOException {
+    dest.message(String.format(Locale.ROOT,
+                               "top: file copy done; took %.1f msec to copy %d bytes; now rename %d tmp files",
+                               (System.nanoTime() - startNS)/1000000.0,
+                               totBytesCopied,
+                               copiedFiles.size()));
+
+    // NOTE: if any of the files we copied overwrote a file in the current commit point, we (ReplicaNode) removed the commit point up
+    // front so that the commit is not corrupt.  This way if we hit exc here, or if we crash here, we won't leave a corrupt commit in
+    // the index:
+    for(Map.Entry<String,String> ent : copiedFiles.entrySet()) {
+      String tmpFileName = ent.getValue();
+      String fileName = ent.getKey();
+
+      // Tricky: if primary crashes while warming (pre-copying) a merged segment _X, the new primary can easily flush or merge to _X (since we don't
+      // have a distributed inflateGens for the new primary) and _X file names will be reused.  In this case, our local deleter will be
+      // thinking it must remove _X's files (from the warmed merge that never went live), but this is dangerous when virus checker is active
+      // since deleter may finally succeed in deleting the file after we have copied the new _X flushed files.  So at this point was ask the
+      // deleter to NOT delete the file anymore:
+      dest.deleter.clearPending(Collections.singleton(fileName));
+
+      if (Node.VERBOSE_FILES) {
+        dest.message("rename file " + tmpFileName + " to " + fileName);
+      }
+
+      // NOTE: if this throws exception, then some files have been moved to their true names, and others are leftover .tmp files.  I don't
+      // think heroic exception handling is necessary (no harm will come, except some leftover files),  nor warranted here (would make the
+      // code more complex, for the exceptional cases when something is wrong w/ your IO system):
+      dest.dir.renameFile(tmpFileName, fileName);
+    }
+
+    copiedFiles.clear();
+  }
+
+  /** Do an iota of work; returns true if all copying is done */
+  synchronized boolean visit() throws IOException {
+    if (exc != null) {
+      // We were externally cancelled:
+      return true;
+    }
+
+    if (current == null) {
+      if (iter.hasNext() == false) {
+        c.close();
+        return true;
+      }
+
+      Map.Entry<String,FileMetaData> next = iter.next();
+      FileMetaData metaData = next.getValue();
+      String fileName = next.getKey();
+      long len = c.in.readVLong();
+      if (len != metaData.length) {
+        throw new IllegalStateException("file " + fileName + ": meta data says length=" + metaData.length + " but c.in says " + len);
+      }
+      current = new CopyOneFile(c.in, dest, fileName, metaData, copyBuffer);
+    }
+
+    if (current.visit()) {
+      // This file is done copying
+      copiedFiles.put(current.name, current.tmpName);
+      totBytesCopied += current.getBytesCopied();
+      assert totBytesCopied <= totBytes: "totBytesCopied=" + totBytesCopied + " totBytes=" + totBytes;
+      current = null;
+      return false;
+    }
+
+    return false;
+  }
+
+  protected CopyOneFile newCopyOneFile(CopyOneFile prev) {
+    return new CopyOneFile(prev, c.in);
+  }
+
+  @Override
+  public synchronized void transferAndCancel(CopyJob prevJob) throws IOException {
+    try {
+      super.transferAndCancel(prevJob);
+    } finally {
+      IOUtils.closeWhileHandlingException(((SimpleCopyJob) prevJob).c);
+    }
+  }
+
+  public synchronized void cancel(String reason, Throwable exc) {
+    try {
+      super.cancel(reason, exc);
+    } finally {
+      IOUtils.closeWhileHandlingException(c);
+    }
+  }
+
+  @Override
+  public boolean getFailed() {
+    return exc != null;
+  }
+  
+  @Override
+  public String toString() {
+    return "SimpleCopyJob(ord=" + ord + " " + reason + " highPriority=" + highPriority + " files count=" + files.size() + " bytesCopied=" + totBytesCopied + " (of " + totBytes + ") filesCopied=" + copiedFiles.size() + ")";
+  }
+
+  @Override
+  public void runBlocking() throws IOException {
+    while (visit() == false);
+
+    if (getFailed()) {
+      throw new RuntimeException("copy failed: " + cancelReason, exc);
+    }
+  }
+
+  @Override
+  public CopyState getCopyState() {
+    return copyState;
+  }
+
+  @Override
+  public synchronized boolean conflicts(CopyJob _other) {
+    Set<String> filesToCopy = new HashSet<>();
+    for(Map.Entry<String,FileMetaData> ent : toCopy) {
+      filesToCopy.add(ent.getKey());
+    }
+
+    SimpleCopyJob other = (SimpleCopyJob) _other;
+    synchronized (other) {
+      for(Map.Entry<String,FileMetaData> ent : other.toCopy) {
+        if (filesToCopy.contains(ent.getKey())) {
+          return true;
+        }
+      }
+    }
+
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
new file mode 100644
index 0000000..18e77ef
--- /dev/null
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
@@ -0,0 +1,674 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.FieldType;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.IndexOptions;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.LogMergePolicy;
+import org.apache.lucene.index.MergePolicy;
+import org.apache.lucene.index.SegmentCommitInfo;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.SearcherFactory;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.DataOutput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.store.NIOFSDirectory;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.TestUtil;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/** A primary node that uses simple TCP connections to send commands and copy files */
+
+class SimplePrimaryNode extends PrimaryNode {
+
+  final int tcpPort;
+
+  // These are updated by parent test process whenever replicas change:
+  int[] replicaTCPPorts;
+  int[] replicaIDs;
+
+  // So we only flip a bit once per file name:
+  final Set<String> bitFlipped = Collections.synchronizedSet(new HashSet<>());
+
+  static class MergePreCopy {
+    final List<Connection> connections = Collections.synchronizedList(new ArrayList<>());
+    final Map<String,FileMetaData> files;
+    private boolean finished;
+
+    public MergePreCopy(Map<String,FileMetaData> files) {
+      this.files = files;
+    }
+
+    public synchronized boolean tryAddConnection(Connection c) {
+      if (finished == false) {
+        connections.add(c);
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    public synchronized boolean finished() {
+      if (connections.isEmpty()) {
+        finished = true;
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  final List<MergePreCopy> warmingSegments = Collections.synchronizedList(new ArrayList<>());
+
+  public SimplePrimaryNode(Random random, Path indexPath, int id, int tcpPort, long primaryGen, long forcePrimaryVersion, SearcherFactory searcherFactory) throws IOException {
+    super(initWriter(id, random, indexPath), id, primaryGen, forcePrimaryVersion, searcherFactory);
+    this.tcpPort = tcpPort;
+  }
+
+  /** Records currently alive replicas. */
+  public synchronized void setReplicas(int[] replicaIDs, int[] replicaTCPPorts) {
+    message("top: set replicasIDs=" + Arrays.toString(replicaIDs) + " tcpPorts=" + Arrays.toString(replicaTCPPorts));
+    this.replicaIDs = replicaIDs;
+    this.replicaTCPPorts = replicaTCPPorts;
+  }
+
+  private static IndexWriter initWriter(int id, Random random, Path indexPath) throws IOException {
+    Directory dir = SimpleReplicaNode.getDirectory(random, id, indexPath);
+
+    MockAnalyzer analyzer = new MockAnalyzer(random);
+    analyzer.setMaxTokenLength(TestUtil.nextInt(random, 1, IndexWriter.MAX_TERM_LENGTH));
+    IndexWriterConfig iwc = LuceneTestCase.newIndexWriterConfig(random, analyzer);
+
+    MergePolicy mp = iwc.getMergePolicy();
+    //iwc.setInfoStream(new PrintStreamInfoStream(System.out));
+
+    // Force more frequent merging so we stress merge warming:
+    if (mp instanceof TieredMergePolicy) {
+      TieredMergePolicy tmp = (TieredMergePolicy) mp;
+      tmp.setSegmentsPerTier(3);
+      tmp.setMaxMergeAtOnce(3);
+    } else if (mp instanceof LogMergePolicy) {
+      LogMergePolicy lmp = (LogMergePolicy) mp;
+      lmp.setMergeFactor(3);
+    }
+
+    IndexWriter writer = new IndexWriter(dir, iwc);
+
+    TestUtil.reduceOpenFiles(writer);
+    return writer;
+  }
+
+  @Override
+  protected void preCopyMergedSegmentFiles(SegmentCommitInfo info, Map<String,FileMetaData> files) throws IOException {
+    int[] replicaTCPPorts = this.replicaTCPPorts;
+    if (replicaTCPPorts == null) {
+      message("no replicas; skip warming " + info);
+      return;
+    }
+
+    message("top: warm merge " + info + " to " + replicaTCPPorts.length + " replicas: files=" + files.keySet());
+
+    MergePreCopy preCopy = new MergePreCopy(files);
+    warmingSegments.add(preCopy);
+
+    try {
+
+      Set<String> fileNames = files.keySet();
+
+      // Ask all currently known replicas to pre-copy this newly merged segment's files:
+      for (int replicaTCPPort : replicaTCPPorts) {
+        try {
+          Connection c = new Connection(replicaTCPPort);
+          c.out.writeByte(SimpleReplicaNode.CMD_PRE_COPY_MERGE);
+          c.out.writeVLong(primaryGen);
+          c.out.writeVInt(tcpPort);
+          SimpleServer.writeFilesMetaData(c.out, files);
+          c.flush();
+          c.s.shutdownOutput();
+          message("warm connection " + c.s);
+          preCopy.connections.add(c);
+        } catch (Throwable t) {
+          message("top: ignore exception trying to warm to replica port " + replicaTCPPort + ": " + t);
+          //t.printStackTrace(System.out);
+        }
+      }
+
+      long startNS = System.nanoTime();
+      long lastWarnNS = startNS;
+
+      // TODO: maybe ... place some sort of time limit on how long we are willing to wait for slow replicas to finish copying?
+      while (preCopy.finished() == false) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException ie) {
+          throw new ThreadInterruptedException(ie);
+        }
+
+        if (isClosed()) {
+          message("top: primary is closing: now cancel segment warming");
+          synchronized(preCopy.connections) {
+            IOUtils.closeWhileHandlingException(preCopy.connections);
+          }
+          return;
+        }
+
+        long ns = System.nanoTime();
+        if (ns - lastWarnNS > 1000000000L) {
+          message(String.format(Locale.ROOT, "top: warning: still warming merge " + info + " to " + preCopy.connections.size() + " replicas for %.1f sec...", (ns - startNS)/1000000000.0));
+          lastWarnNS = ns;
+        }
+
+        // Process keep-alives:
+        synchronized(preCopy.connections) {
+          Iterator<Connection> it = preCopy.connections.iterator();
+          while (it.hasNext()) {
+            Connection c = it.next();
+            try {
+              long nowNS = System.nanoTime();
+              boolean done = false;
+              while (c.sockIn.available() > 0) {
+                byte b = c.in.readByte();
+                if (b == 0) {
+                  // keep-alive
+                  c.lastKeepAliveNS = nowNS;
+                  message("keep-alive for socket=" + c.s + " merge files=" + files.keySet());
+                } else {
+                  // merge is done pre-copying to this node
+                  if (b != 1) {
+                    throw new IllegalArgumentException();
+                  }
+                  message("connection socket=" + c.s + " is done warming its merge " + info + " files=" + files.keySet());
+                  IOUtils.closeWhileHandlingException(c);
+                  it.remove();
+                  done = true;
+                  break;
+                }
+              }
+
+              // If > 2 sec since we saw a keep-alive, assume this replica is dead:
+              if (done == false && nowNS - c.lastKeepAliveNS > 2000000000L) {
+                message("top: warning: replica socket=" + c.s + " for segment=" + info + " seems to be dead; closing files=" + files.keySet());
+                IOUtils.closeWhileHandlingException(c);
+                it.remove();
+              }
+            } catch (Throwable t) {
+              message("top: ignore exception trying to read byte during warm for segment=" + info + " to replica socket=" + c.s + ": " + t + " files=" + files.keySet());
+              IOUtils.closeWhileHandlingException(c);
+              it.remove();
+            }
+          }
+        }
+      }
+    } finally {
+      warmingSegments.remove(preCopy);
+    }
+  }
+
+  /** Flushes all indexing ops to disk and notifies all replicas that they should now copy */
+  private void handleFlush(DataInput topIn, DataOutput topOut, BufferedOutputStream bos) throws IOException {
+    Thread.currentThread().setName("flush");
+
+    int[] replicaTCPPorts;
+    int[] replicaIDs;
+    synchronized (this) {
+      replicaTCPPorts = this.replicaTCPPorts;
+      replicaIDs = this.replicaIDs;
+    }
+
+    message("now flush; " + replicaIDs.length + " replicas");
+
+    if (flushAndRefresh()) {
+      // Something did get flushed (there were indexing ops since the last flush):
+
+      // Tell caller the version before pushing to replicas, so that even if we crash after this, caller will know what version we
+      // (possibly) pushed to some replicas.  Alternatively we could make this 2 separate ops?
+      long version = getCopyStateVersion();
+      message("send flushed version=" + version);
+      topOut.writeLong(version);
+      bos.flush();
+
+      // Notify current replicas:
+      for(int i=0;i<replicaIDs.length;i++) {
+        int replicaID = replicaIDs[i];
+        try (Connection c = new Connection(replicaTCPPorts[i])) {
+          c.out.writeByte(SimpleReplicaNode.CMD_NEW_NRT_POINT);
+          c.out.writeVLong(version);
+          c.out.writeInt(tcpPort);
+          c.flush();
+          // TODO: we should use multicast to broadcast files out to replicas
+          // TODO: ... replicas could copy from one another instead of just primary
+          // TODO: we could also prioritize one replica at a time?
+        } catch (Throwable t) {
+          message("top: failed to connect R" + replicaID + " for newNRTPoint; skipping: " + t.getMessage());
+        }
+      }
+    } else {
+      // No changes flushed:
+      topOut.writeLong(-getCopyStateVersion());
+    }
+  }
+
+  /** Pushes CopyState on the wire */
+  private static void writeCopyState(CopyState state, DataOutput out) throws IOException {
+    // TODO (opto): we could encode to byte[] once when we created the copyState, and then just send same byts to all replicas...
+    out.writeVInt(state.infosBytes.length);
+    out.writeBytes(state.infosBytes, 0, state.infosBytes.length);
+    out.writeVLong(state.gen);
+    out.writeVLong(state.version);
+    SimpleServer.writeFilesMetaData(out, state.files);
+
+    out.writeVInt(state.completedMergeFiles.size());
+    for(String fileName : state.completedMergeFiles) {
+      out.writeString(fileName);
+    }
+    out.writeVLong(state.primaryGen);
+  }
+
+  /** Called when another node (replica) wants to copy files from us */
+  private boolean handleFetchFiles(Random random, Socket socket, DataInput destIn, DataOutput destOut, BufferedOutputStream bos) throws IOException {
+    Thread.currentThread().setName("send");
+
+    int replicaID = destIn.readVInt();
+    message("top: start fetch for R" + replicaID + " socket=" + socket);
+    byte b = destIn.readByte();
+    CopyState copyState;
+    if (b == 0) {
+      // Caller already has CopyState
+      copyState = null;
+    } else if (b == 1) {
+      // Caller does not have CopyState; we pull the latest one:
+      copyState = getCopyState();
+      Thread.currentThread().setName("send-R" + replicaID + "-" + copyState.version);
+    } else {
+      // Protocol error:
+      throw new IllegalArgumentException("invalid CopyState byte=" + b);
+    }
+
+    try {
+      if (copyState != null) {
+        // Serialize CopyState on the wire to the client:
+        writeCopyState(copyState, destOut);
+        bos.flush();
+      }
+
+      byte[] buffer = new byte[16384];
+      int fileCount = 0;
+      long totBytesSent = 0;
+      while (true) {
+        byte done = destIn.readByte();
+        if (done == 1) {
+          break;
+        } else if (done != 0) {
+          throw new IllegalArgumentException("expected 0 or 1 byte but got " + done);
+        }
+
+        // Name of the file the replica wants us to send:
+        String fileName = destIn.readString();
+
+        // Starting offset in the file we should start sending bytes from:
+        long fpStart = destIn.readVLong();
+
+        try (IndexInput in = dir.openInput(fileName, IOContext.DEFAULT)) {
+          long len = in.length();
+          //message("fetch " + fileName + ": send len=" + len);
+          destOut.writeVLong(len);
+          in.seek(fpStart);
+          long upto = fpStart;
+          while (upto < len) {
+            int chunk = (int) Math.min(buffer.length, (len-upto));
+            in.readBytes(buffer, 0, chunk);
+            if (TestNRTReplication.DO_BIT_FLIPS_DURING_COPY) {
+              if (random.nextInt(3000) == 17 && bitFlipped.contains(fileName) == false) {
+                bitFlipped.add(fileName);
+                message("file " + fileName + " to R" + replicaID + ": now randomly flipping a bit at byte=" + upto);
+                int x = random.nextInt(chunk);
+                int bit = random.nextInt(8);
+                buffer[x] ^= 1 << bit;
+              }
+            }
+            destOut.writeBytes(buffer, 0, chunk);
+            upto += chunk;
+            totBytesSent += chunk;
+          }
+        }
+
+        fileCount++;
+      }
+
+      message("top: done fetch files for R" + replicaID + ": sent " + fileCount + " files; sent " + totBytesSent + " bytes");
+    } catch (Throwable t) {
+      message("top: exception during fetch: " + t.getMessage() + "; now close socket");
+      socket.close();
+      return false;
+    } finally {
+      if (copyState != null) {
+        message("top: fetch: now release CopyState");
+        releaseCopyState(copyState);
+      }
+    }
+
+    return true;
+  }
+
+  static final FieldType tokenizedWithTermVectors;
+
+  static {
+    tokenizedWithTermVectors = new FieldType(TextField.TYPE_STORED);
+    tokenizedWithTermVectors.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS);
+    tokenizedWithTermVectors.setStoreTermVectors(true);
+    tokenizedWithTermVectors.setStoreTermVectorOffsets(true);
+    tokenizedWithTermVectors.setStoreTermVectorPositions(true);
+  }
+
+  private void handleIndexing(Socket socket, DataInput in, DataOutput out, BufferedOutputStream bos) throws IOException {
+    Thread.currentThread().setName("indexing");
+    message("start handling indexing socket=" + socket);
+    while (true) {
+      byte cmd;
+      try {
+        cmd = in.readByte();
+      } catch (EOFException eofe) {
+        // done
+        return;
+      }
+      //message("INDEXING OP " + cmd);
+      if (cmd == CMD_ADD_DOC) {
+        handleAddDocument(in, out);
+        out.writeByte((byte) 1);
+        bos.flush();
+      } else if (cmd == CMD_UPDATE_DOC) {
+        handleUpdateDocument(in, out);
+        out.writeByte((byte) 1);
+        bos.flush();
+      } else if (cmd == CMD_DELETE_DOC) {
+        handleDeleteDocument(in, out);
+        out.writeByte((byte) 1);
+        bos.flush();
+      } else if (cmd == CMD_INDEXING_DONE) {
+        out.writeByte((byte) 1);
+        bos.flush();
+        break;
+      } else {
+        throw new IllegalArgumentException("cmd must be add, update or delete; got " + cmd);
+      }
+    }
+  }
+
+  private void handleAddDocument(DataInput in, DataOutput out) throws IOException {
+    int fieldCount = in.readVInt();
+    Document doc = new Document();
+    for(int i=0;i<fieldCount;i++) {
+      String name = in.readString();
+      String value = in.readString();
+      // NOTE: clearly NOT general!
+      if (name.equals("docid") || name.equals("marker")) {
+        doc.add(new StringField(name, value, Field.Store.YES));
+      } else if (name.equals("title")) {
+        doc.add(new StringField("title", value, Field.Store.YES));
+        doc.add(new Field("titleTokenized", value, tokenizedWithTermVectors));
+      } else if (name.equals("body")) {
+        doc.add(new Field("body", value, tokenizedWithTermVectors));
+      } else {
+        throw new IllegalArgumentException("unhandled field name " + name);
+      }
+    }
+
+    writer.addDocument(doc);
+  }
+
+  private void handleUpdateDocument(DataInput in, DataOutput out) throws IOException {
+    int fieldCount = in.readVInt();
+    Document doc = new Document();
+    String docid = null;
+    for(int i=0;i<fieldCount;i++) {
+      String name = in.readString();
+      String value = in.readString();
+      // NOTE: clearly NOT general!
+      if (name.equals("docid")) {
+        docid = value;
+        doc.add(new StringField("docid", value, Field.Store.YES));
+      } else if (name.equals("marker")) {
+        doc.add(new StringField("marker", value, Field.Store.YES));
+      } else if (name.equals("title")) {
+        doc.add(new StringField("title", value, Field.Store.YES));
+        doc.add(new Field("titleTokenized", value, tokenizedWithTermVectors));
+      } else if (name.equals("body")) {
+        doc.add(new Field("body", value, tokenizedWithTermVectors));
+      } else {
+        throw new IllegalArgumentException("unhandled field name " + name);
+      }
+    }
+
+    writer.updateDocument(new Term("docid", docid), doc);
+  }
+
+  private void handleDeleteDocument(DataInput in, DataOutput out) throws IOException {
+    String docid = in.readString();
+    writer.deleteDocuments(new Term("docid", docid));
+  }
+
+  // Sent to primary to cutover new SIS:
+  static final byte CMD_FLUSH = 10;
+
+  // Sent by replica to primary asking to copy a set of files over:
+  static final byte CMD_FETCH_FILES = 1;
+  static final byte CMD_GET_SEARCHING_VERSION = 12;
+  static final byte CMD_SEARCH = 2;
+  static final byte CMD_MARKER_SEARCH = 3;
+  static final byte CMD_COMMIT = 4;
+  static final byte CMD_CLOSE = 5;
+
+  // Send (to primary) the list of currently running replicas:
+  static final byte CMD_SET_REPLICAS = 16;
+
+  // Multiple indexing ops
+  static final byte CMD_INDEXING = 18;
+  static final byte CMD_ADD_DOC = 6;
+  static final byte CMD_UPDATE_DOC = 7;
+  static final byte CMD_DELETE_DOC = 8;
+  static final byte CMD_INDEXING_DONE = 19;
+
+  // Sent by replica to primary when replica first starts up, so primary can add it to any warming merges:
+  static final byte CMD_NEW_REPLICA = 20;
+
+  /** Handles incoming request to the naive TCP server wrapping this node */
+  void handleOneConnection(Random random, ServerSocket ss, AtomicBoolean stop, InputStream is, Socket socket, DataInput in, DataOutput out, BufferedOutputStream bos) throws IOException, InterruptedException {
+
+    outer:
+    while (true) {
+      byte cmd;
+      while (true) {
+        if (is.available() > 0) {
+          break;
+        }
+        if (stop.get()) {
+          return;
+        }
+        Thread.sleep(10);
+      }
+
+      try {
+        cmd = in.readByte();
+      } catch (EOFException eofe) {
+        break;
+      }
+
+      switch (cmd) {
+
+      case CMD_FLUSH:
+        handleFlush(in, out, bos);
+        break;
+
+      case CMD_FETCH_FILES:
+        // Replica (other node) is asking us (primary node) for files to copy
+        handleFetchFiles(random, socket, in, out, bos);
+        break;
+
+      case CMD_INDEXING:
+        handleIndexing(socket, in, out, bos);
+        break;
+
+      case CMD_GET_SEARCHING_VERSION:
+        out.writeVLong(getCurrentSearchingVersion());
+        break;
+
+      case CMD_SEARCH:
+        {
+          Thread.currentThread().setName("search");
+          IndexSearcher searcher = mgr.acquire();
+          try {
+            long version = ((DirectoryReader) searcher.getIndexReader()).getVersion();
+            int hitCount = searcher.search(new TermQuery(new Term("body", "the")), 1).totalHits;
+            //message("version=" + version + " searcher=" + searcher);
+            out.writeVLong(version);
+            out.writeVInt(hitCount);
+          } finally {
+            mgr.release(searcher);
+          }
+        }
+        continue outer;
+
+      case CMD_MARKER_SEARCH:
+        {
+          Thread.currentThread().setName("msearch");
+          IndexSearcher searcher = mgr.acquire();
+          try {
+            long version = ((DirectoryReader) searcher.getIndexReader()).getVersion();
+            int hitCount = searcher.search(new TermQuery(new Term("marker", "marker")), 1).totalHits;
+            out.writeVLong(version);
+            out.writeVInt(hitCount);
+          } finally {
+            mgr.release(searcher);
+          }
+        }
+        continue outer;
+
+      case CMD_COMMIT:
+        Thread.currentThread().setName("commit");
+        commit();
+        out.writeByte((byte) 1);
+        break;
+
+      case CMD_CLOSE:
+        Thread.currentThread().setName("close");
+        message("top close: now close server socket");
+        ss.close();
+        out.writeByte((byte) 1);
+        message("top close: done close server socket");
+        break;
+
+      case CMD_SET_REPLICAS:
+        Thread.currentThread().setName("set repls");
+        int count = in.readVInt();
+        int[] replicaIDs = new int[count];
+        int[] replicaTCPPorts = new int[count];
+        for(int i=0;i<count;i++) {
+          replicaIDs[i] = in.readVInt();
+          replicaTCPPorts[i] = in.readVInt();
+        }
+        out.writeByte((byte) 1);
+        setReplicas(replicaIDs, replicaTCPPorts);
+        break;
+
+      case CMD_NEW_REPLICA:
+        Thread.currentThread().setName("new repl");
+        int replicaTCPPort = in.readVInt();
+        message("new replica: " + warmingSegments.size() + " current warming merges");
+        // Step through all currently warming segments and try to add this replica if it isn't there already:
+        for(MergePreCopy preCopy : warmingSegments) {
+          message("warming segment " + preCopy.files.keySet());
+          boolean found = false;
+          synchronized (preCopy.connections) {
+            for(Connection c : preCopy.connections) {
+              if (c.destTCPPort == replicaTCPPort) {
+                found = true;
+                break;
+              }
+            }
+          }
+
+          if (found) {
+            message("this replica is already warming this segment; skipping");
+            // It's possible (maybe) that the replica started up, then a merge kicked off, and it warmed to this new replica, all before the
+            // replica sent us this command:
+            continue;
+          }
+
+          // OK, this new replica is not already warming this segment, so attempt (could fail) to start warming now:
+
+          Connection c = new Connection(replicaTCPPort);
+          if (preCopy.tryAddConnection(c) == false) {
+            // This can happen, if all other replicas just now finished warming this segment, and so we were just a bit too late.  In this
+            // case the segment will be copied over in the next nrt point sent to this replica
+            message("failed to add connection to segment warmer (too late); closing");
+            c.close();
+          }
+          c.out.writeByte(SimpleReplicaNode.CMD_PRE_COPY_MERGE);
+          c.out.writeVLong(primaryGen);
+          c.out.writeVInt(tcpPort);
+          SimpleServer.writeFilesMetaData(c.out, preCopy.files);
+          c.flush();
+          c.s.shutdownOutput();
+          message("successfully started warming");
+        }
+        break;
+
+      default:
+        throw new IllegalArgumentException("unrecognized cmd=" + cmd + " via socket=" + socket);
+      }
+      bos.flush();
+      break;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
new file mode 100644
index 0000000..8667df1
--- /dev/null
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
@@ -0,0 +1,316 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.SearcherFactory;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.DataOutput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.store.NIOFSDirectory;
+import org.apache.lucene.store.RateLimitedIndexOutput;
+import org.apache.lucene.store.RateLimiter;
+import org.apache.lucene.util.LuceneTestCase;
+
+class SimpleReplicaNode extends ReplicaNode {
+  final int tcpPort;
+  final Jobs jobs;
+
+  // Rate limits incoming bytes/sec when fetching files:
+  final RateLimiter fetchRateLimiter;
+  final AtomicLong bytesSinceLastRateLimiterCheck = new AtomicLong();
+  final Random random;
+
+  /** Changes over time, as primary node crashes and moves around */
+  int curPrimaryTCPPort;
+
+  public SimpleReplicaNode(Random random, int id, int tcpPort, Path indexPath, long curPrimaryGen, int primaryTCPPort, SearcherFactory searcherFactory) throws IOException {
+    super(id, getDirectory(random, id, indexPath), searcherFactory);
+    this.tcpPort = tcpPort;
+    this.random = new Random(random.nextLong());
+
+    // Random IO throttling on file copies: 5 - 20 MB/sec:
+    double mbPerSec = 5 * (1.0 + 3*random.nextDouble());
+    message(String.format(Locale.ROOT, "top: will rate limit file fetch to %.2f MB/sec", mbPerSec));
+    fetchRateLimiter = new RateLimiter.SimpleRateLimiter(mbPerSec);
+    this.curPrimaryTCPPort = primaryTCPPort;
+    
+    start(curPrimaryGen);
+
+    // Handles fetching files from primary:
+    jobs = new Jobs(this);
+    jobs.setName("R" + id + ".copyJobs");
+    jobs.setDaemon(true);
+    jobs.start();
+  }
+
+  @Override
+  protected void launch(CopyJob job) {
+    jobs.launch(job);
+  }
+
+  @Override
+  public void close() throws IOException {
+    // Can't be sync'd when calling jobs since it can lead to deadlock:
+    jobs.close();
+    message("top: jobs closed");
+    synchronized(mergeCopyJobs) {
+      for (CopyJob job : mergeCopyJobs) {
+        message("top: cancel merge copy job " + job);
+        job.cancel("jobs closing", null);
+      }
+    }
+    super.close();
+  }
+
+  @Override
+  protected CopyJob newCopyJob(String reason, Map<String,FileMetaData> files, Map<String,FileMetaData> prevFiles,
+                               boolean highPriority, CopyJob.OnceDone onceDone) throws IOException {
+    Connection c;
+    CopyState copyState;
+
+    // Exceptions in here mean something went wrong talking over the socket, which are fine (e.g. primary node crashed):
+    try {
+      c = new Connection(curPrimaryTCPPort);
+      c.out.writeByte(SimplePrimaryNode.CMD_FETCH_FILES);
+      c.out.writeVInt(id);
+      if (files == null) {
+        // No incoming CopyState: ask primary for latest one now
+        c.out.writeByte((byte) 1);
+        c.flush();
+        copyState = SimpleServer.readCopyState(c.in);
+        files = copyState.files;
+      } else {
+        c.out.writeByte((byte) 0);
+        copyState = null;
+      }
+    } catch (Throwable t) {
+      throw new NodeCommunicationException("exc while reading files to copy", t);
+    }
+
+    return new SimpleCopyJob(reason, c, copyState, this, files, highPriority, onceDone);
+  }
+
+  static Directory getDirectory(Random random, int id, Path path) throws IOException {
+    MockDirectoryWrapper dir = LuceneTestCase.newMockFSDirectory(path);
+    
+    dir.setAssertNoUnrefencedFilesOnClose(true);
+    // This is very costly (takes more time to check than it did to index); we do this ourselves in the end instead of each time a replica
+    // is restarted:
+    dir.setCheckIndexOnClose(false);
+
+    // Corrupt any index files not referenced by current commit point; this is important (increases test evilness) because we may have done
+    // a hard crash of the previous JVM writing to this directory and so MDW's corrupt-unknown-files-on-close never ran:
+    Node.nodeMessage(id, "top: corrupt unknown files");
+    dir.corruptUnknownFiles();
+
+    return dir;
+  }
+
+  static final byte CMD_NEW_NRT_POINT = 0;
+
+  // Sent by primary to replica to pre-copy merge files:
+  static final byte CMD_PRE_COPY_MERGE = 17;
+
+  /** Handles incoming request to the naive TCP server wrapping this node */
+  void handleOneConnection(ServerSocket ss, AtomicBoolean stop, InputStream is, Socket socket, DataInput in, DataOutput out, BufferedOutputStream bos) throws IOException, InterruptedException {
+    outer:
+    while (true) {
+      byte cmd;
+      while (true) {
+        if (is.available() > 0) {
+          break;
+        }
+        if (stop.get()) {
+          return;
+        }
+        Thread.sleep(10);
+      }
+
+      try {
+        cmd = in.readByte();
+      } catch (EOFException eofe) {
+        break;
+      }
+
+      switch(cmd) {
+      case CMD_NEW_NRT_POINT:
+        {
+          long version = in.readVLong();
+          Thread.currentThread().setName("recv-" + version);
+          curPrimaryTCPPort = in.readInt();
+          newNRTPoint(version);
+        }
+        break;
+
+      case SimplePrimaryNode.CMD_GET_SEARCHING_VERSION:
+        // nocommit this is hacky:
+
+        // Tricky: if a sync is just finishing up, i.e. managed to finish copying all files just before we crashed primary, and is now
+        // in the process of opening a new reader, we need to wait for it, to be sure we really pick the most current replica:
+        if (isCopying()) {
+          message("top: getSearchingVersion: now wait for finish sync");
+          // TODO: use immediate concurrency instead of polling:
+          while (isCopying() && stop.get() == false) {
+            Thread.sleep(50);
+            message("top: curNRTCopy=" + curNRTCopy);
+          }
+          message("top: getSearchingVersion: done wait for finish sync");
+        }
+        if (stop.get() == false) {
+          out.writeVLong(getCurrentSearchingVersion());
+        } else {
+          message("top: getSearchingVersion: stop waiting for finish sync: stop is set");
+        }
+        break;
+
+      case SimplePrimaryNode.CMD_SEARCH:
+        {
+          Thread.currentThread().setName("search");
+          IndexSearcher searcher = mgr.acquire();
+          try {
+            long version = ((DirectoryReader) searcher.getIndexReader()).getVersion();
+            int hitCount = searcher.search(new TermQuery(new Term("body", "the")), 1).totalHits;
+            //node.message("version=" + version + " searcher=" + searcher);
+            out.writeVLong(version);
+            out.writeVInt(hitCount);
+          } finally {
+            mgr.release(searcher);
+          }
+        }
+        continue outer;
+
+      case SimplePrimaryNode.CMD_MARKER_SEARCH:
+        {
+          Thread.currentThread().setName("msearch");
+          IndexSearcher searcher = mgr.acquire();
+          try {
+            long version = ((DirectoryReader) searcher.getIndexReader()).getVersion();
+            int hitCount = searcher.search(new TermQuery(new Term("marker", "marker")), 1).totalHits;
+            out.writeVLong(version);
+            out.writeVInt(hitCount);
+          } finally {
+            mgr.release(searcher);
+          }
+        }
+        continue outer;
+
+      case SimplePrimaryNode.CMD_COMMIT:
+        Thread.currentThread().setName("commit");
+        commit();
+        out.writeByte((byte) 1);
+        break;
+
+      case SimplePrimaryNode.CMD_CLOSE:
+        Thread.currentThread().setName("close");
+        ss.close();
+        out.writeByte((byte) 1);
+        break outer;
+
+      case CMD_PRE_COPY_MERGE:
+        Thread.currentThread().setName("merge copy");
+
+        long newPrimaryGen = in.readVLong();
+        curPrimaryTCPPort = in.readVInt();
+        Map<String,FileMetaData> files = SimpleServer.readFilesMetaData(in);
+        message("done reading files to copy files=" + files.keySet());
+        AtomicBoolean finished = new AtomicBoolean();
+        CopyJob job = launchPreCopyMerge(finished, newPrimaryGen, files);
+        message("done launching copy job files=" + files.keySet());
+
+        // Silly keep alive mechanism, else if e.g. we (replica node) crash, the primary
+        // won't notice for a very long time:
+        boolean success = false;
+        try {
+          int count = 0;
+          while (true) {
+            if (finished.get() || stop.get()) {
+              break;
+            }
+            Thread.sleep(10);
+            count++;
+            if (count == 100) {
+              // Once per second or so, we send a keep alive
+              message("send merge pre copy keep alive... files=" + files.keySet());
+
+              // To be evil, we sometimes fail to keep-alive, e.g. simulating a long GC pausing us:
+              if (random.nextBoolean()) {
+                out.writeByte((byte) 0);
+                count = 0;
+              }
+            }
+          }
+
+          out.writeByte((byte) 1);
+          bos.flush();
+          success = true;
+        } finally {
+          message("done merge copy files=" + files.keySet() + " success=" + success);
+        }
+        break;
+
+      default:
+        throw new IllegalArgumentException("unrecognized cmd=" + cmd);
+      }
+      bos.flush();
+
+      break;
+    }
+  }
+
+  @Override
+  protected void sendNewReplica() throws IOException {
+    message("send new_replica to primary tcpPort=" + curPrimaryTCPPort);
+    try (Connection c = new Connection(curPrimaryTCPPort)) {
+      c.out.writeByte(SimplePrimaryNode.CMD_NEW_REPLICA);
+      c.out.writeVInt(tcpPort);
+      c.flush();
+      c.s.shutdownOutput();
+    } catch (Throwable t) {
+      message("ignoring exc " + t + " sending new_replica to primary tcpPort=" + curPrimaryTCPPort);
+    }
+  }
+
+  @Override
+  public IndexOutput createTempOutput(String prefix, String suffix, IOContext ioContext) throws IOException {
+    return new RateLimitedIndexOutput(fetchRateLimiter, super.createTempOutput(prefix, suffix, ioContext));
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java
new file mode 100644
index 0000000..f03a5c3
--- /dev/null
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java
@@ -0,0 +1,390 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.file.Path;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.FieldType;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexOptions;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.DataOutput;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.InputStreamDataInput;
+import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.store.NIOFSDirectory;
+import org.apache.lucene.store.OutputStreamDataOutput;
+import org.apache.lucene.util.Constants;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
+import org.apache.lucene.util.LuceneTestCase.SuppressSysoutChecks;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.TestUtil;
+
+/** Child process with silly naive TCP socket server to handle
+ *  between-node commands, launched for each node  by TestNRTReplication. */
+@SuppressCodecs({"MockRandom", "Memory", "Direct", "SimpleText"})
+@SuppressSysoutChecks(bugUrl = "Stuff gets printed, important stuff for debugging a failure")
+public class SimpleServer extends LuceneTestCase {
+
+  final static Set<Thread> clientThreads = Collections.synchronizedSet(new HashSet<>());
+  final static AtomicBoolean stop = new AtomicBoolean();
+
+  /** Handles one client connection */
+  private static class ClientHandler extends Thread {
+
+    // We hold this just so we can close it to exit the process:
+    private final ServerSocket ss;
+    private final Socket socket;
+    private final Node node;
+    private final int bufferSize;
+
+    public ClientHandler(ServerSocket ss, Node node, Socket socket) {
+      this.ss = ss;
+      this.node = node;
+      this.socket = socket;
+      this.bufferSize = TestUtil.nextInt(random(), 128, 65536);
+      if (Node.VERBOSE_CONNECTIONS) {
+        node.message("new connection socket=" + socket);
+      }
+    }
+
+    @Override
+    public void run() {
+      boolean success = false;
+      try {
+        //node.message("using stream buffer size=" + bufferSize);
+        InputStream is = new BufferedInputStream(socket.getInputStream(), bufferSize);
+        DataInput in = new InputStreamDataInput(is);
+        BufferedOutputStream bos = new BufferedOutputStream(socket.getOutputStream(), bufferSize);
+        DataOutput out = new OutputStreamDataOutput(bos);
+
+        if (node instanceof SimplePrimaryNode) {
+          ((SimplePrimaryNode) node).handleOneConnection(random(), ss, stop, is, socket, in, out, bos);
+        } else {
+          ((SimpleReplicaNode) node).handleOneConnection(ss, stop, is, socket, in, out, bos);
+        }
+
+        bos.flush();
+        if (Node.VERBOSE_CONNECTIONS) {
+          node.message("bos.flush done");
+        }
+
+        success = true;
+      } catch (Throwable t) {
+        if (t instanceof SocketException == false) {
+          node.message("unexpected exception handling client connection:");
+          t.printStackTrace(System.out);
+          // Test should fail with this:
+          throw new RuntimeException(t);
+        } else {
+          node.message("SocketException " + t + " handling client connection; ignoring");
+        }
+      } finally {
+        if (success) {
+          try {
+            IOUtils.close(socket);
+          } catch (IOException ioe) {
+            throw new RuntimeException(ioe);
+          }
+        } else {
+          IOUtils.closeWhileHandlingException(socket);
+        }
+      }
+      if (Node.VERBOSE_CONNECTIONS) {
+        node.message("socket.close done");
+      }
+    }
+  }
+
+  /**
+   * currently, this only works/tested on Sun and IBM.
+   */
+
+  // poached from TestIndexWriterOnJRECrash ... should we factor out to TestUtil?  seems dangerous to give it such "publicity"?
+  private static void crashJRE() {
+    final String vendor = Constants.JAVA_VENDOR;
+    final boolean supportsUnsafeNpeDereference = 
+        vendor.startsWith("Oracle") || 
+        vendor.startsWith("Sun") || 
+        vendor.startsWith("Apple");
+
+    try {
+      if (supportsUnsafeNpeDereference) {
+        try {
+          Class<?> clazz = Class.forName("sun.misc.Unsafe");
+          java.lang.reflect.Field field = clazz.getDeclaredField("theUnsafe");
+          field.setAccessible(true);
+          Object o = field.get(null);
+          Method m = clazz.getMethod("putAddress", long.class, long.class);
+          m.invoke(o, 0L, 0L);
+        } catch (Throwable e) {
+          System.out.println("Couldn't kill the JVM via Unsafe.");
+          e.printStackTrace(System.out); 
+        }
+      }
+
+      // Fallback attempt to Runtime.halt();
+      Runtime.getRuntime().halt(-1);
+    } catch (Exception e) {
+      System.out.println("Couldn't kill the JVM.");
+      e.printStackTrace(System.out); 
+    }
+
+    // We couldn't get the JVM to crash for some reason.
+    throw new RuntimeException("JVM refuses to die!");
+  }
+
+  static void writeFilesMetaData(DataOutput out, Map<String,FileMetaData> files) throws IOException {
+    out.writeVInt(files.size());
+    for(Map.Entry<String,FileMetaData> ent : files.entrySet()) {
+      out.writeString(ent.getKey());
+
+      FileMetaData fmd = ent.getValue();
+      out.writeVLong(fmd.length);
+      out.writeVLong(fmd.checksum);
+      out.writeVInt(fmd.header.length);
+      out.writeBytes(fmd.header, 0, fmd.header.length);
+      out.writeVInt(fmd.footer.length);
+      out.writeBytes(fmd.footer, 0, fmd.footer.length);
+    }
+  }
+
+  static Map<String,FileMetaData> readFilesMetaData(DataInput in) throws IOException {
+    int fileCount = in.readVInt();
+    //System.out.println("readFilesMetaData: fileCount=" + fileCount);
+    Map<String,FileMetaData> files = new HashMap<>();
+    for(int i=0;i<fileCount;i++) {
+      String fileName = in.readString();
+      //System.out.println("readFilesMetaData: fileName=" + fileName);
+      long length = in.readVLong();
+      long checksum = in.readVLong();
+      byte[] header = new byte[in.readVInt()];
+      in.readBytes(header, 0, header.length);
+      byte[] footer = new byte[in.readVInt()];
+      in.readBytes(footer, 0, footer.length);
+      files.put(fileName, new FileMetaData(header, footer, length, checksum));
+    }
+    return files;
+  }
+
+  /** Pulls CopyState off the wire */
+  static CopyState readCopyState(DataInput in) throws IOException {
+
+    // Decode a new CopyState
+    byte[] infosBytes = new byte[in.readVInt()];
+    in.readBytes(infosBytes, 0, infosBytes.length);
+
+    long gen = in.readVLong();
+    long version = in.readVLong();
+    Map<String,FileMetaData> files = readFilesMetaData(in);
+
+    int count = in.readVInt();
+    Set<String> completedMergeFiles = new HashSet<>();
+    for(int i=0;i<count;i++) {
+      completedMergeFiles.add(in.readString());
+    }
+    long primaryGen = in.readVLong();
+
+    return new CopyState(files, version, gen, infosBytes, completedMergeFiles, primaryGen, null);
+  }
+
+  public void test() throws Exception {
+
+    int id = Integer.parseInt(System.getProperty("tests.nrtreplication.nodeid"));
+    Thread.currentThread().setName("init child " + id);
+    Path indexPath = Paths.get(System.getProperty("tests.nrtreplication.indexpath"));
+    boolean isPrimary = System.getProperty("tests.nrtreplication.isPrimary") != null;
+    int primaryTCPPort;
+    long forcePrimaryVersion;
+    if (isPrimary == false) {
+      forcePrimaryVersion = -1;
+      primaryTCPPort = Integer.parseInt(System.getProperty("tests.nrtreplication.primaryTCPPort"));
+    } else {
+      primaryTCPPort = -1;
+      forcePrimaryVersion = Long.parseLong(System.getProperty("tests.nrtreplication.forcePrimaryVersion"));
+    }
+    long primaryGen = Long.parseLong(System.getProperty("tests.nrtreplication.primaryGen"));
+    Node.globalStartNS = Long.parseLong(System.getProperty("tests.nrtreplication.startNS"));
+
+    boolean doRandomCrash = isPrimary ? TestNRTReplication.DO_CRASH_PRIMARY : TestNRTReplication.DO_CRASH_REPLICA;
+    boolean doRandomClose = isPrimary ? false : TestNRTReplication.DO_CLOSE_REPLICA;
+
+    // Create server socket that we listen for incoming requests on:
+    try (final ServerSocket ss = new ServerSocket(0)) {
+
+      int tcpPort = ((InetSocketAddress) ss.getLocalSocketAddress()).getPort();
+      System.out.println("\nPORT: " + tcpPort);
+      final Node node;
+      if (isPrimary) {
+        node = new SimplePrimaryNode(random(), indexPath, id, tcpPort, primaryGen, forcePrimaryVersion, null);
+        System.out.println("\nCOMMIT VERSION: " + ((PrimaryNode) node).getLastCommitVersion());
+      } else {
+        node = new SimpleReplicaNode(random(), id, tcpPort, indexPath, primaryGen, primaryTCPPort, null);
+      }
+      System.out.println("\nINFOS VERSION: " + node.getCurrentSearchingVersion());
+
+      if (doRandomClose || doRandomCrash) {
+        final int waitForMS;
+        if (isPrimary) {
+          waitForMS = TestUtil.nextInt(random(), 20000, 60000);
+        } else {
+          waitForMS = TestUtil.nextInt(random(), 5000, 60000);
+        }
+
+        boolean doClose;
+        if (doRandomCrash == false) {
+          doClose = true;
+        } else if (doRandomClose) {
+          doClose = random().nextBoolean();
+        } else {
+          doClose = false;
+        }
+
+        if (doClose) {
+          node.message("top: will close after " + (waitForMS/1000.0) + " seconds");
+        } else {
+          node.message("top: will crash after " + (waitForMS/1000.0) + " seconds");
+        }
+
+        Thread t = new Thread() {
+            @Override
+            public void run() {
+              long endTime = System.nanoTime() + waitForMS*1000000L;
+              while (System.nanoTime() < endTime) {
+                try {
+                  Thread.sleep(10);
+                } catch (InterruptedException e) {
+                }
+                if (stop.get()) {
+                  break;
+                }
+              }
+
+              if (stop.get() == false) {
+                if (doClose) {
+                  try {
+                    node.message("top: now force close server socket after " + (waitForMS/1000.0) + " seconds");
+                    node.state = "top-closing";
+                    ss.close();
+                  } catch (IOException ioe) {     
+                    throw new RuntimeException(ioe);
+                  }
+                } else {        
+                  node.message("top: now crash JVM after " + (waitForMS/1000.0) + " seconds");
+                  crashJRE();
+                }
+              }
+            }
+          };
+
+        if (isPrimary) {
+          t.setName("crasher P" + id);
+        } else {
+          t.setName("crasher R" + id);
+        }
+
+        // So that if node exits naturally, this thread won't prevent process exit:
+        t.setDaemon(true);
+        t.start();
+      }
+      System.out.println("\nNODE STARTED");
+
+      //List<Thread> clientThreads = new ArrayList<>();
+
+      // Naive thread-per-connection server:
+      while (true) {
+        Socket socket;
+        try {
+          socket = ss.accept();
+        } catch (SocketException se) {
+          // when ClientHandler closes our ss we will hit this
+          node.message("top: server socket exc; now exit");
+          break;
+        }
+        Thread thread = new ClientHandler(ss, node, socket);
+        thread.setDaemon(true);
+        thread.start();
+
+        clientThreads.add(thread);
+
+        // Prune finished client threads:
+        Iterator<Thread> it = clientThreads.iterator();
+        while (it.hasNext()) {
+          Thread t = it.next();
+          if (t.isAlive() == false) {
+            it.remove();
+          }
+        }
+        //node.message(clientThreads.size() + " client threads are still alive");
+      }
+
+      stop.set(true);
+
+      // Make sure all client threads are done, else we get annoying (yet ultimately "harmless") messages about threads still running /
+      // lingering for them to finish from the child processes:
+      for(Thread clientThread : clientThreads) {
+        node.message("top: join clientThread=" + clientThread);
+        clientThread.join();
+        node.message("top: done join clientThread=" + clientThread);
+      }
+      node.message("done join all client threads; now close node");
+      node.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleTransLog.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleTransLog.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleTransLog.java
new file mode 100644
index 0000000..d409ffc
--- /dev/null
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleTransLog.java
@@ -0,0 +1,250 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.RandomIndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.store.ByteArrayDataInput;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.RAMOutputStream;
+
+/** This is a stupid yet functional transaction log: it never fsync's, never prunes, it's over-synchronized, it hard-wires id field name to "docid", can
+ *  only handle specific docs/fields used by this test, etc.  It's just barely enough to show how a translog could work on top of NRT
+ *  replication to guarantee no data loss when nodes crash */
+
+class SimpleTransLog implements Closeable {
+
+  final FileChannel channel;
+  final RAMOutputStream buffer = new RAMOutputStream();
+  final byte[] intBuffer = new byte[4];
+  final ByteBuffer intByteBuffer = ByteBuffer.wrap(intBuffer);
+
+  private final static byte OP_ADD_DOCUMENT = (byte) 0;
+  private final static byte OP_UPDATE_DOCUMENT = (byte) 1;
+  private final static byte OP_DELETE_DOCUMENTS = (byte) 2;
+
+  public SimpleTransLog(Path path) throws IOException {
+    channel = FileChannel.open(path, StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
+  }
+
+  public synchronized long getNextLocation() throws IOException {
+    return channel.position();
+  }
+
+  /** Appends an addDocument op */
+  public synchronized long addDocument(String id, Document doc) throws IOException {
+    assert buffer.getFilePointer() == 0;
+    buffer.writeByte(OP_ADD_DOCUMENT);
+    encode(id, doc);
+    return flushBuffer();
+  }
+
+  /** Appends an updateDocument op */
+  public synchronized long updateDocument(String id, Document doc) throws IOException {
+    assert buffer.getFilePointer() == 0;
+    buffer.writeByte(OP_UPDATE_DOCUMENT);
+    encode(id, doc);
+    return flushBuffer();
+  }
+
+  /** Appends a deleteDocuments op */
+  public synchronized long deleteDocuments(String id) throws IOException {
+    assert buffer.getFilePointer() == 0;
+    buffer.writeByte(OP_DELETE_DOCUMENTS);
+    buffer.writeString(id);
+    return flushBuffer();
+  }
+
+  /** Writes buffer to the file and returns the start position. */
+  private synchronized long flushBuffer() throws IOException {
+    long pos = channel.position();
+    int len = (int) buffer.getFilePointer();
+    byte[] bytes = new byte[len];
+    buffer.writeTo(bytes, 0);
+    buffer.reset();
+
+    intBuffer[0] = (byte) (len >> 24);
+    intBuffer[1] = (byte) (len >> 16);
+    intBuffer[2] = (byte) (len >> 8);
+    intBuffer[3] = (byte) len;
+    intByteBuffer.limit(4);
+    intByteBuffer.position(0);
+
+    writeBytesToChannel(intByteBuffer);
+    writeBytesToChannel(ByteBuffer.wrap(bytes));
+
+    return pos;
+  }
+
+  private void writeBytesToChannel(ByteBuffer src) throws IOException {
+    int left = src.limit();
+    while (left != 0) {
+      left -= channel.write(src);
+    }
+  }
+
+  private void readBytesFromChannel(long pos, ByteBuffer dest) throws IOException {
+    int left = dest.limit() - dest.position();
+    long end = pos + left;
+    while (pos < end) {
+      int inc = channel.read(dest, pos);
+      if (inc < 0) {
+        throw new EOFException();
+      }
+      pos += inc;
+    }
+  }
+
+  /** Replays ops between start and end location against the provided writer.  Can run concurrently with ongoing operations. */
+  public void replay(NodeProcess primary, long start, long end) throws IOException {
+    try (Connection c = new Connection(primary.tcpPort)) {
+      c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+      byte[] intBuffer = new byte[4];
+      ByteBuffer intByteBuffer = ByteBuffer.wrap(intBuffer);
+      ByteArrayDataInput in = new ByteArrayDataInput();
+
+      long pos = start;
+      while (pos < end) {
+        intByteBuffer.position(0);
+        intByteBuffer.limit(4);
+        readBytesFromChannel(pos, intByteBuffer);
+        pos += 4;
+        int len = ((intBuffer[0] & 0xff) << 24) |
+          (intBuffer[1] & 0xff) << 16 |
+          (intBuffer[2] & 0xff) << 8 |
+          (intBuffer[3] & 0xff);
+
+        byte[] bytes = new byte[len];
+        readBytesFromChannel(pos, ByteBuffer.wrap(bytes));
+        pos += len;
+
+        in.reset(bytes);
+        
+        byte op = in.readByte();
+        //System.out.println("xlog: replay op=" + op);
+        switch (op) {
+        case 0:
+          // We replay add as update:
+          replayAddDocument(c, primary, in);
+          break;
+
+        case 1:
+          // We replay add as update:
+          replayAddDocument(c, primary, in);
+          break;
+
+        case 2:
+          replayDeleteDocuments(c, primary, in);
+          break;
+
+        default:
+          throw new CorruptIndexException("invalid operation " + op, in);
+        }
+      }
+      assert pos == end;
+      //System.out.println("xlog: done replay");
+      c.out.writeByte(SimplePrimaryNode.CMD_INDEXING_DONE);
+      c.flush();
+      //System.out.println("xlog: done flush");
+      c.in.readByte();
+      //System.out.println("xlog: done readByte");
+    }
+  }
+
+  private void replayAddDocument(Connection c, NodeProcess primary, DataInput in) throws IOException {
+    String id = in.readString();
+
+    Document doc = new Document();
+    doc.add(new StringField("docid", id, Field.Store.YES));
+
+    String title = readNullableString(in);
+    if (title != null) {
+      doc.add(new StringField("title", title, Field.Store.NO));
+      doc.add(new TextField("titleTokenized", title, Field.Store.NO));
+    }
+    String body = readNullableString(in);
+    if (body != null) {
+      doc.add(new TextField("body", body, Field.Store.NO));
+    }
+    String marker = readNullableString(in);
+    if (marker != null) {
+      //System.out.println("xlog: replay marker=" + id);
+      doc.add(new StringField("marker", marker, Field.Store.YES));
+    }
+
+    // For both add and update originally, we use updateDocument to replay,
+    // because the doc could in fact already be in the index:
+    // nocomit what if this fails?
+    primary.addOrUpdateDocument(c, doc, false);
+  }
+
+
+  private void replayDeleteDocuments(Connection c, NodeProcess primary, DataInput in) throws IOException {
+    String id = in.readString();
+    // nocomit what if this fails?
+    primary.deleteDocument(c, id);
+  }
+
+  /** Encodes doc into buffer.  NOTE: this is NOT general purpose!  It only handles the fields used in this test! */
+  private synchronized void encode(String id, Document doc) throws IOException {
+    assert id.equals(doc.get("docid")): "id=" + id + " vs docid=" + doc.get("docid");
+    buffer.writeString(id);
+    writeNullableString(doc.get("title"));
+    writeNullableString(doc.get("body"));
+    writeNullableString(doc.get("marker"));
+  }
+
+  private synchronized void writeNullableString(String s) throws IOException {
+    if (s == null) {
+      buffer.writeByte((byte) 0);
+    } else {
+      buffer.writeByte((byte) 1);
+      buffer.writeString(s);
+    }
+  }
+
+  private String readNullableString(DataInput in) throws IOException {
+    byte b = in.readByte();
+    if (b == 0) {
+      return null;
+    } else if (b == 1) {
+      return in.readString();
+    } else {
+      throw new CorruptIndexException("invalid string lead byte " + b, in);
+    }
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    channel.close();
+  }
+}