You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/02/11 17:42:37 UTC

[03/31] lucene-solr git commit: current patch

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/Node.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/Node.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/Node.java
new file mode 100644
index 0000000..742b19f
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/Node.java
@@ -0,0 +1,213 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.ReferenceManager;
+import org.apache.lucene.search.SearcherFactory;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.RAMOutputStream;
+import org.apache.lucene.util.StringHelper;
+
+/** Common base class for {@link PrimaryNode} and {@link ReplicaNode}. */
+
+abstract class Node implements Closeable {
+
+  static boolean VERBOSE_FILES = true;
+  static boolean VERBOSE_CONNECTIONS = false;
+
+  // Keys we store into IndexWriter's commit user data:
+
+  /** Key to store the primary gen in the commit data, which increments every time we promote a new primary, so replicas can detect when the
+   *  primary they were talking to is changed */
+  public static String PRIMARY_GEN_KEY = "__primaryGen";
+
+  /** Key to store the version in the commit data, which increments every time we open a new NRT reader */
+  public static String VERSION_KEY = "__version";
+
+  /** Compact ordinal for this node */
+  protected final int id;
+
+  protected final Directory dir;
+
+  protected final SearcherFactory searcherFactory;
+  
+  // Tracks NRT readers, opened from IW (primary) or opened from replicated SegmentInfos pulled across the wire (replica):
+  protected ReferenceManager<IndexSearcher> mgr;
+
+  /** Startup time of original test, carefully propogated to all nodes to produce consistent "seconds since start time" in messages */
+  public static long globalStartNS;
+
+  /** When this node was started */
+  public static final long localStartNS = System.nanoTime();
+
+  // public static final long globalStartNS;
+
+  // For debugging:
+  volatile String state = "idle";
+
+  /** File metadata for last sync that succeeded; we use this as a cache */
+  protected volatile Map<String,FileMetaData> lastFileMetaData;
+
+  public Node(int id, Directory dir, SearcherFactory searcherFactory) {
+    this.id = id;
+    this.dir = dir;
+    this.searcherFactory = searcherFactory;
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + "(id=" + id + ")";
+  }
+
+  public abstract void commit() throws IOException;
+
+  public static void nodeMessage(String message) {
+    long now = System.nanoTime();
+    System.out.println(String.format(Locale.ROOT,
+                                     "%5.3fs %5.1fs:           [%11s] %s",
+                                     (now-globalStartNS)/1000000000.,
+                                     (now-localStartNS)/1000000000.,
+                                     Thread.currentThread().getName(),
+                                     message));
+
+  }
+
+  public static void nodeMessage(int id, String message) {
+    long now = System.nanoTime();
+    System.out.println(String.format(Locale.ROOT,
+                                     "%5.3fs %5.1fs:         N%d [%11s] %s",
+                                     (now-globalStartNS)/1000000000.,
+                                     (now-localStartNS)/1000000000.,
+                                     id,
+                                     Thread.currentThread().getName(),
+                                     message));
+
+  }
+
+  protected void message(String message) {
+    long now = System.nanoTime();
+    System.out.println(String.format(Locale.ROOT,
+                                     "%5.3fs %5.1fs: %7s %2s [%11s] %s",
+                                     (now-globalStartNS)/1000000000.,
+                                     (now-localStartNS)/1000000000.,
+                                     state, name(),
+                                     Thread.currentThread().getName(), message));
+  }
+
+  public String name() {
+    char mode = this instanceof PrimaryNode ? 'P' : 'R';
+    return mode + Integer.toString(id);
+  }
+
+  public abstract boolean isClosed();
+
+  public long getCurrentSearchingVersion() throws IOException {
+    IndexSearcher searcher = mgr.acquire();
+    try {
+      return ((DirectoryReader) searcher.getIndexReader()).getVersion();
+    } finally {
+      mgr.release(searcher);
+    }
+  }
+
+  public static String bytesToString(long bytes) {
+    if (bytes < 1024) {
+      return bytes + " b";
+    } else if (bytes < 1024 * 1024) {
+      return String.format(Locale.ROOT, "%.1f KB", bytes/1024.);
+    } else if (bytes < 1024 * 1024 * 1024) {
+      return String.format(Locale.ROOT, "%.1f MB", bytes/1024./1024.);
+    } else {
+      return String.format(Locale.ROOT, "%.1f GB", bytes/1024./1024./1024.);
+    }
+  }
+
+  /** Opens the specified file, reads its identifying information, including file length, full index header (includes the unique segment
+   *  ID) and the full footer (includes checksum), and returns the resulting {@link FileMetaData}.
+   *
+   *  <p>This returns null, logging a message, if there are any problems (the file does not exist, is corrupt, truncated, etc.).</p> */
+  public FileMetaData readLocalFileMetaData(String fileName) throws IOException {
+
+    Map<String,FileMetaData> cache = lastFileMetaData;
+    FileMetaData result;
+    if (cache != null) {
+      // We may already have this file cached from the last NRT point:
+      result = cache.get(fileName);
+    } else {
+      result = null;
+    }
+
+    if (result == null) {
+      // Pull from the filesystem
+      long checksum;
+      long length;
+      byte[] header;
+      byte[] footer;
+      try (IndexInput in = dir.openInput(fileName, IOContext.DEFAULT)) {
+          try {
+            length = in.length();
+            header = CodecUtil.readIndexHeader(in);
+            footer = CodecUtil.readFooter(in);
+            checksum = CodecUtil.retrieveChecksum(in);
+          } catch (EOFException | CorruptIndexException cie) {
+            // File exists but is busted: we must copy it.  This happens when node had crashed, corrupting an un-fsync'd file.  On init we try
+            // to delete such unreferenced files, but virus checker can block that, leaving this bad file.
+            if (VERBOSE_FILES) {
+              message("file " + fileName + ": will copy [existing file is corrupt]");
+            }
+            return null;
+          }
+          if (VERBOSE_FILES) {
+            message("file " + fileName + " has length=" + bytesToString(length));
+          }
+        } catch (FileNotFoundException | NoSuchFileException e) {
+        if (VERBOSE_FILES) {
+          message("file " + fileName + ": will copy [file does not exist]");
+        }
+        return null;
+      }
+
+      // NOTE: checksum is redundant w/ footer, but we break it out separately because when the bits cross the wire we need direct access to
+      // checksum when copying to catch bit flips:
+      result = new FileMetaData(header, footer, length, checksum);
+    }
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/NodeCommunicationException.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/NodeCommunicationException.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/NodeCommunicationException.java
new file mode 100644
index 0000000..67a9d0a
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/NodeCommunicationException.java
@@ -0,0 +1,26 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class NodeCommunicationException extends RuntimeException {
+  public NodeCommunicationException(String when, Throwable cause) {
+    super(when);
+    assert cause != null;
+    initCause(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PreCopyMergedSegmentWarmer.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PreCopyMergedSegmentWarmer.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PreCopyMergedSegmentWarmer.java
new file mode 100644
index 0000000..1918ede
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PreCopyMergedSegmentWarmer.java
@@ -0,0 +1,80 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** A merged segment warmer that pre-copies the merged segment out to
+ *  replicas before primary cuts over to the merged segment.  This
+ *  ensures that NRT reopen time on replicas is only in proportion to
+ *  flushed segment sizes, not merged segments. */
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
+import org.apache.lucene.index.LeafReader;
+import org.apache.lucene.index.SegmentCommitInfo;
+import org.apache.lucene.index.SegmentReader;
+import org.apache.lucene.replicator.nrt.CopyJob.OnceDone;
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.DataOutput;
+import org.apache.lucene.store.InputStreamDataInput;
+import org.apache.lucene.store.OutputStreamDataOutput;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+// TODO: or ... replica node can do merging locally?  tricky to keep things in sync, when one node merges more slowly than others...
+
+class PreCopyMergedSegmentWarmer extends IndexReaderWarmer {
+
+  private final PrimaryNode primary;
+
+  public PreCopyMergedSegmentWarmer(PrimaryNode primary) {
+    this.primary = primary;
+  }
+
+  @Override
+  public void warm(LeafReader reader) throws IOException {
+    long startNS = System.nanoTime();
+    final SegmentCommitInfo info = ((SegmentReader) reader).getSegmentInfo();
+    //System.out.println("TEST: warm merged segment files " + info);
+    Map<String,FileMetaData> filesMetaData = new HashMap<>();
+    for(String fileName : info.files()) {
+      FileMetaData metaData = primary.readLocalFileMetaData(fileName);
+      assert metaData != null;
+      assert filesMetaData.containsKey(fileName) == false;
+      filesMetaData.put(fileName, metaData);
+    }
+
+    // nocommit if one replica is very slow then it dos's all other replicas?
+
+    primary.preCopyMergedSegmentFiles(info, filesMetaData);
+    primary.message(String.format(Locale.ROOT, "top: done warm merge " + info + ": took %.3f sec, %.1f MB", (System.nanoTime()-startNS)/1000000000., info.sizeInBytes()/1024/1024.));
+    primary.finishedMergedFiles.addAll(filesMetaData.keySet());
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
new file mode 100644
index 0000000..183f16f
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
@@ -0,0 +1,316 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.LogMergePolicy;
+import org.apache.lucene.index.MergePolicy;
+import org.apache.lucene.index.SegmentCommitInfo;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.index.StandardDirectoryReader;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.SearcherFactory;
+import org.apache.lucene.search.SearcherManager;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.RAMFile;
+import org.apache.lucene.store.RAMOutputStream;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.PrintStreamInfoStream;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/*
+ * This just asks IndexWriter to open new NRT reader, in order to publish a new NRT point.  This could be improved, if we separated out 1)
+ * nrt flush (and incRef the SIS) from 2) opening a new reader, but this is tricky with IW's concurrency, and it would also be hard-ish to share
+ * IW's reader pool with our searcher manager.  So we do the simpler solution now, but that adds some unecessary latency to NRT refresh on
+ * replicas since step 2) could otherwise be done concurrently with replicas copying files over.
+ */
+
+/** Node that holds an IndexWriter, indexing documents into its local index. */
+public abstract class PrimaryNode extends Node {
+
+  // Current NRT segment infos, incRef'd with IndexWriter.deleter:
+  private SegmentInfos curInfos;
+
+  final IndexWriter writer;
+
+  // IncRef'd state of the last published NRT point; when a replica comes asking, we give it this as the current NRT point:
+  private CopyState copyState;
+
+  protected final long primaryGen;
+
+  /** Contains merged segments that have been copied to all running replicas (as of when that merge started warming). */
+  final Set<String> finishedMergedFiles = Collections.synchronizedSet(new HashSet<String>());
+
+  private final AtomicInteger copyingCount = new AtomicInteger();
+
+  public PrimaryNode(IndexWriter writer, int id, long primaryGen, long forcePrimaryVersion, SearcherFactory searcherFactory) throws IOException {
+    super(id, writer.getDirectory(), searcherFactory);
+    message("top: now init primary");
+    this.writer = writer;
+    this.primaryGen = primaryGen;
+
+    try {
+      // So that when primary node's IndexWriter finishes a merge, but before it cuts over to the merged segment,
+      // it copies it out to the replicas.  This ensures the whole system's NRT latency remains low even when a
+      // large merge completes:
+      writer.getConfig().setMergedSegmentWarmer(new PreCopyMergedSegmentWarmer(this));
+
+      message("IWC:\n" + writer.getConfig());
+      message("dir:\n" + writer.getDirectory());
+      message("commitData: " + writer.getCommitData());
+
+      // Record our primaryGen in the userData, and set initial version to 0:
+      Map<String,String> commitData = new HashMap<>(writer.getCommitData());
+      commitData.put(PRIMARY_GEN_KEY, Long.toString(primaryGen));
+      if (commitData.get(VERSION_KEY) == null) {
+        commitData.put(VERSION_KEY, "0");
+        message("add initial commitData version=0");
+      } else {
+        message("keep current commitData version=" + commitData.get(VERSION_KEY));
+      }
+      writer.setCommitData(commitData, false);
+
+      // We forcefully advance the SIS version to an unused future version.  This is necessary if the previous primary crashed and we are
+      // starting up on an "older" index, else versions can be illegally reused but show different results:
+      if (forcePrimaryVersion != -1) {
+        message("now forcePrimaryVersion to version=" + forcePrimaryVersion);
+        writer.advanceSegmentInfosVersion(forcePrimaryVersion);
+      }
+
+      mgr = new SearcherManager(writer, true, true, searcherFactory);
+      setCurrentInfos(Collections.<String>emptySet());
+      message("init: infos version=" + curInfos.getVersion());
+
+      IndexSearcher s = mgr.acquire();
+      try {
+        message("init: marker hit count: " + s.search(new TermQuery(new Term("marker", "marker")), 1).totalHits);
+      } finally {
+        mgr.release(s);
+      }
+
+    } catch (Throwable t) {
+      message("init: exception");
+      t.printStackTrace(System.out);
+      throw new RuntimeException(t);
+    }
+  }
+
+  // TODO: in the future, we should separate "flush" (returns an incRef'd SegmentInfos) from "refresh" (open new NRT reader from
+  // IndexWriter) so that the latter can be done concurrently while copying files out to replicas, minimizing the refresh time from the
+  // replicas.  But fixing this is tricky because e.g. IndexWriter may complete a big merge just after returning the incRef'd SegmentInfos
+  // and before we can open a new reader causing us to close the just-merged readers only to then open them again from the (now stale)
+  // SegmentInfos.  To fix this "properly" I think IW.inc/decRefDeleter must also incread the ReaderPool entry
+
+  /** Flush all index operations to disk and opens a new near-real-time reader.
+   *  new NRT point, to make the changes visible to searching.  Returns true if there were changes. */
+  public boolean flushAndRefresh() throws IOException {
+    message("top: now flushAndRefresh");
+    Set<String> completedMergeFiles;
+    synchronized(finishedMergedFiles) {
+      completedMergeFiles = Collections.unmodifiableSet(new HashSet<>(finishedMergedFiles));
+    }
+    mgr.maybeRefreshBlocking();
+    boolean result = setCurrentInfos(completedMergeFiles);
+    if (result) {
+      message("top: opened NRT reader version=" + curInfos.getVersion());
+      finishedMergedFiles.removeAll(completedMergeFiles);
+      message("flushAndRefresh:  version=" + curInfos.getVersion() + " completedMergeFiles=" + completedMergeFiles + " finishedMergedFiles=" + finishedMergedFiles);
+    } else {
+      message("top: no changes in flushAndRefresh; still version=" + curInfos.getVersion());
+    }
+    return result;
+  }
+
+  public long getCopyStateVersion() {
+    return copyState.version;
+  }
+
+  public synchronized long getLastCommitVersion() {
+    String s = curInfos.getUserData().get(VERSION_KEY);
+    // In ctor we always install an initial version:
+    assert s != null;
+    return Long.parseLong(s);
+  }
+
+  @Override
+  public void commit() throws IOException {
+    Map<String,String> commitData = new HashMap<>();
+    commitData.put(PRIMARY_GEN_KEY, Long.toString(primaryGen));
+    // TODO (opto): it's a bit wasteful that we put "last refresh" version here, not the actual version we are committing, because it means
+    // on xlog replay we are replaying more ops than necessary.
+    commitData.put(VERSION_KEY, Long.toString(copyState.version));
+    message("top: commit commitData=" + commitData);
+    // nocommit this is now an NRT-visible change!  make test where nothing is indexing and confirm we don't do silly commit + refresh loop forever!
+    writer.setCommitData(commitData, false);
+    writer.commit();
+  }
+
+  /** IncRef the current CopyState and return it */
+  public synchronized CopyState getCopyState() throws IOException {
+    ensureOpen(false);
+    //message("top: getCopyState replicaID=" + replicaID + " replicaNodeID=" + replicaNodeID + " version=" + curInfos.getVersion() + " infos=" + curInfos.toString());
+    assert curInfos == copyState.infos;
+    writer.incRefDeleter(copyState.infos);
+    int count = copyingCount.incrementAndGet();
+    assert count > 0;
+    return copyState;
+  }
+
+  /** Called once replica is done (or failed) copying an NRT point */
+  public void releaseCopyState(CopyState copyState) throws IOException {
+    //message("top: releaseCopyState version=" + copyState.version);
+    assert copyState.infos != null;
+    writer.decRefDeleter(copyState.infos);
+    int count = copyingCount.decrementAndGet();
+    assert count >= 0;
+  }
+
+  @Override
+  public boolean isClosed() {
+    return isClosed(false);
+  }
+
+  boolean isClosed(boolean allowClosing) {
+    return "closed".equals(state) || (allowClosing == false && "closing".equals(state));
+  }
+
+  private void ensureOpen(boolean allowClosing) {
+    if (isClosed(allowClosing)) {
+      throw new AlreadyClosedException(state);
+    }
+  }
+
+  /** Steals incoming infos refCount; returns true if there were changes. */
+  private synchronized boolean setCurrentInfos(Set<String> completedMergeFiles) throws IOException {
+
+    IndexSearcher searcher = null;
+    SegmentInfos infos;
+    try {
+      searcher = mgr.acquire();
+      infos = ((StandardDirectoryReader) searcher.getIndexReader()).getSegmentInfos();
+    } finally {
+      if (searcher != null) {
+        mgr.release(searcher);
+      }
+    }
+    if (curInfos != null && infos.getVersion() == curInfos.getVersion()) {
+      // no change
+      message("top: skip switch to infos: version=" + infos.getVersion() + " is unchanged: " + infos.toString());
+      return false;
+    }
+
+    SegmentInfos oldInfos = curInfos;
+    writer.incRefDeleter(infos);
+    curInfos = infos;
+    if (oldInfos != null) {
+      writer.decRefDeleter(oldInfos);
+    }
+
+    message("top: switch to infos=" + infos.toString() + " version=" + infos.getVersion());
+
+    // Serialize the SegmentInfos:
+    RAMOutputStream out = new RAMOutputStream(new RAMFile(), true);
+    infos.write(dir, out);
+    byte[] infosBytes = new byte[(int) out.getFilePointer()];
+    out.writeTo(infosBytes, 0);
+
+    Map<String,FileMetaData> filesMetaData = new HashMap<String,FileMetaData>();
+    for(SegmentCommitInfo info : infos) {
+      for(String fileName : info.files()) {
+        FileMetaData metaData = readLocalFileMetaData(fileName);
+        // NOTE: we hold a refCount on this infos, so this file better exist:
+        assert metaData != null;
+        assert filesMetaData.containsKey(fileName) == false;
+        filesMetaData.put(fileName, metaData);
+      }
+    }
+
+    lastFileMetaData = Collections.unmodifiableMap(filesMetaData);
+
+    message("top: set copyState primaryGen=" + primaryGen + " version=" + infos.getVersion() + " files=" + filesMetaData.keySet());
+    copyState = new CopyState(lastFileMetaData,
+                              infos.getVersion(), infos.getGeneration(), infosBytes, completedMergeFiles,
+                              primaryGen, curInfos);
+    return true;
+  }
+
+  private synchronized void waitForAllRemotesToClose() throws IOException {
+
+    // Wait for replicas to finish or crash:
+    while (true) {
+      int count = copyingCount.get();
+      if (count == 0) {
+        return;
+      }
+      message("pendingCopies: " + count);
+
+      try {
+        wait(10);
+      } catch (InterruptedException ie) {
+        throw new ThreadInterruptedException(ie);
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    state = "closing";
+    message("top: close primary");
+
+    synchronized (this) {
+      waitForAllRemotesToClose();
+      if (curInfos != null) {
+        writer.decRefDeleter(curInfos);
+        curInfos = null;
+      }
+    }
+
+    mgr.close();
+
+    writer.rollback();
+    dir.close();
+
+    state = "closed";
+  }
+
+  /** Called when a merge has finished, but before IW switches to the merged segment */
+  protected abstract void preCopyMergedSegmentFiles(SegmentCommitInfo info, Map<String,FileMetaData> files) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaFileDeleter.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaFileDeleter.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaFileDeleter.java
new file mode 100644
index 0000000..005f938
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaFileDeleter.java
@@ -0,0 +1,218 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+
+// TODO: can we factor/share with IFD: this is doing exactly the same thing, but on the replica side
+
+// TODO: once LUCENE-6835 is in, this class becomes a lot simpler?
+
+class ReplicaFileDeleter {
+  private final Map<String,Integer> refCounts = new HashMap<String,Integer>();
+  private final Set<String> pending = new HashSet<String>();
+  private final Directory dir;
+  private final Node node;
+
+  public ReplicaFileDeleter(Node node, Directory dir) throws IOException {
+    this.dir = dir;
+    this.node = node;
+  }
+
+  /** Used only by asserts: returns true if the file exists
+   *  (can be opened), false if it cannot be opened, and
+   *  (unlike Java's File.exists) throws IOException if
+   *  there's some unexpected error. */
+  static boolean slowFileExists(Directory dir, String fileName) throws IOException {
+    try {
+      dir.openInput(fileName, IOContext.DEFAULT).close();
+      return true;
+    } catch (NoSuchFileException | FileNotFoundException e) {
+      return false;
+    }
+  }
+
+  public synchronized void incRef(Collection<String> fileNames) throws IOException {
+    for(String fileName : fileNames) {
+
+      if (pending.contains(fileName)) {
+        throw new IllegalStateException("cannot incRef file \"" + fileName + "\": it is pending delete");
+      }
+
+      assert slowFileExists(dir, fileName): "file " + fileName + " does not exist!";
+
+      Integer curCount = refCounts.get(fileName);
+      if (curCount == null) {
+        refCounts.put(fileName, 1);
+      } else {
+        refCounts.put(fileName, curCount.intValue() + 1);
+      }
+    }
+  }
+
+  public synchronized void decRef(Collection<String> fileNames) {
+    // We don't delete the files immediately when their RC drops to 0; instead, we add to the pending set, and then call deletePending in
+    // the end:
+    for(String fileName : fileNames) {
+      Integer curCount = refCounts.get(fileName);
+      assert curCount != null: "fileName=" + fileName;
+      assert curCount.intValue() > 0;
+      if (curCount.intValue() == 1) {
+        refCounts.remove(fileName);
+        pending.add(fileName);
+      } else {
+        refCounts.put(fileName, curCount.intValue() - 1);
+      }
+    }
+
+    deletePending();
+
+    // TODO: this local IR could incRef files here, like we do now with IW ... then we can assert this again:
+
+    // we can't assert this, e.g a search can be running when we switch to a new NRT point, holding a previous IndexReader still open for
+    // a bit:
+    /*
+    // We should never attempt deletion of a still-open file:
+    Set<String> delOpen = ((MockDirectoryWrapper) dir).getOpenDeletedFiles();
+    if (delOpen.isEmpty() == false) {
+      node.message("fail: we tried to delete these still-open files: " + delOpen);
+      throw new AssertionError("we tried to delete these still-open files: " + delOpen);
+    }
+    */
+  }
+
+  private synchronized boolean delete(String fileName) {
+    try {
+      if (Node.VERBOSE_FILES) {
+        node.message("file " + fileName + ": now delete");
+      }
+      dir.deleteFile(fileName);
+      pending.remove(fileName);
+      return true;
+    } catch (FileNotFoundException|NoSuchFileException missing) {
+      // This should never happen: we should only be asked to track files that do exist
+      node.message("file " + fileName + ": delete failed: " + missing);
+      throw new IllegalStateException("file " + fileName + ": we attempted delete but the file does not exist?", missing);
+    } catch (IOException ioe) {
+      if (Node.VERBOSE_FILES) {
+        node.message("file " + fileName + ": delete failed: " + ioe + "; will retry later");
+      }
+      pending.add(fileName);
+      return false;
+    }
+  }
+
+  public synchronized Integer getRefCount(String fileName) {
+    return refCounts.get(fileName);
+  }
+
+  public synchronized boolean isPending(String fileName) {
+    return pending.contains(fileName);
+  }
+
+  public synchronized void deletePending() {
+    if (Node.VERBOSE_FILES) {
+      node.message("now deletePending: " + pending.size() + " files to try: " + pending);
+    }
+
+    // Clone the set because it will change as we iterate:
+    List<String> toDelete = new ArrayList<>(pending);
+
+    // First pass: delete any segments_N files.  We do these first to be certain stale commit points are removed
+    // before we remove any files they reference.  If any delete of segments_N fails, we leave all other files
+    // undeleted so index is never in a corrupt state:
+    for (String fileName : toDelete) {
+      Integer rc = refCounts.get(fileName);
+      if (rc != null && rc > 0) {
+        // Should never happen!  This means we are about to pending-delete a referenced index file
+        throw new IllegalStateException("file \"" + fileName + "\" is in pending delete set but has non-zero refCount=" + rc);
+      } else if (fileName.startsWith(IndexFileNames.SEGMENTS)) {
+        if (delete(fileName) == false) {
+          if (Node.VERBOSE_FILES) {
+            node.message("failed to remove commit point \"" + fileName + "\"; skipping deletion of all other pending files");
+          }
+          return;
+        }
+      }
+    }
+
+    // Only delete other files if we were able to remove the segments_N files; this way we never
+    // leave a corrupt commit in the index even in the presense of virus checkers:
+    for(String fileName : toDelete) {
+      if (fileName.startsWith(IndexFileNames.SEGMENTS) == false) {
+        delete(fileName);
+      }
+    }
+
+    Set<String> copy = new HashSet<String>(pending);
+    pending.clear();
+    for(String fileName : copy) {
+      delete(fileName);
+    }
+  }
+
+  /** Necessary in case we had tried to delete this fileName before, it failed, but then it was later overwritten (because primary changed
+   *  and new primary didn't know this segment name had been previously attempted) and now has > 0 refCount */
+  public synchronized void clearPending(Collection<String> fileNames) {
+    for(String fileName : fileNames) {
+      if (pending.remove(fileName)) {
+        node.message("file " + fileName + ": deleter.clearPending now clear from pending");
+      }
+    }
+  }
+
+  public synchronized void deleteIfNoRef(String fileName) {
+    if (refCounts.containsKey(fileName) == false) {
+      deleteNewFile(fileName);
+    }
+  }
+
+  public synchronized void deleteNewFile(String fileName) {
+    delete(fileName);
+  }
+
+  public synchronized Set<String> getPending() {
+    return new HashSet<String>(pending);
+  }
+
+  public synchronized void deleteUnknownFiles(String segmentsFileName) throws IOException {
+    for(String fileName : dir.listAll()) {
+      if (refCounts.containsKey(fileName) == false &&
+          fileName.equals("write.lock") == false &&
+          fileName.equals(segmentsFileName) == false) {
+        node.message("will delete unknown file \"" + fileName + "\"");
+        pending.add(fileName);
+      }
+    }
+
+    deletePending();
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
new file mode 100644
index 0000000..af142d5
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
@@ -0,0 +1,772 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.SearcherFactory;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.BufferedChecksumIndexInput;
+import org.apache.lucene.store.ByteArrayDataInput;
+import org.apache.lucene.store.ByteArrayIndexInput;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.DataOutput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.InputStreamDataInput;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockObtainFailedException;
+import org.apache.lucene.store.OutputStreamDataOutput;
+import org.apache.lucene.store.RAMOutputStream;
+import org.apache.lucene.store.RateLimiter;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/** Replica node, that pulls index changes from the primary node by copying newly flushed or merged index files */
+
+abstract class ReplicaNode extends Node {
+
+  ReplicaFileDeleter deleter;
+
+  /** IncRef'd files in the current commit point: */
+  private final Collection<String> lastCommitFiles = new HashSet<>();
+
+  /** IncRef'd files in the current NRT point: */
+  protected final Collection<String> lastNRTFiles = new HashSet<>();
+
+  /** Currently running merge pre-copy jobs */
+  protected final Set<CopyJob> mergeCopyJobs = Collections.synchronizedSet(new HashSet<>());
+
+  /** Non-null when we are currently copying files from a new NRT point: */
+  protected CopyJob curNRTCopy;
+
+  /** We hold this to ensure an external IndexWriter cannot also open on our directory: */
+  private final Lock writeFileLock;
+
+  /** Merged segment files that we pre-copied, but have not yet made visible in a new NRT point. */
+  final Set<String> pendingMergeFiles = Collections.synchronizedSet(new HashSet<String>());
+
+  /** Primary gen last time we successfully replicated: */
+  protected long lastPrimaryGen;
+
+  public ReplicaNode(int id, Directory dir, SearcherFactory searcherFactory) throws IOException {
+    super(id, dir, searcherFactory);
+
+    boolean success = false;
+
+    try {
+      message("top: init replica dir=" + dir);
+
+      // Obtain a write lock on this index since we "act like" an IndexWriter, to prevent any other IndexWriter or ReplicaNode from using it:
+      writeFileLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME);
+
+      state = "init";
+      deleter = new ReplicaFileDeleter(this, dir);
+    } catch (Throwable t) {
+      message("exc on init:");
+      t.printStackTrace(System.out);
+      throw t;
+    } finally {
+      if (success == false) {
+        IOUtils.closeWhileHandlingException(this);
+      }
+    }
+  }
+
+  /** Start up this replica, which possibly requires heavy copying of files from the primary node, if we were down for a long time */
+  protected synchronized void start(long curPrimaryGen) throws IOException {
+
+    if (state.equals("init") == false) {
+      throw new IllegalStateException("already started");
+    }
+
+    message("top: now start");
+    try {
+
+      // Figure out what state our local index is in now:
+      String segmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(dir);
+
+      // Also look for any pending_segments_N, in case we crashed mid-commit.  We must "inflate" our infos gen to at least this, since
+      // otherwise we may wind up re-using the pending_segments_N file name on commit, and then our deleter can get angry because it still
+      // wants to delete this file:
+      long maxPendingGen = -1;
+      for(String fileName : dir.listAll()) {
+        if (fileName.startsWith(IndexFileNames.PENDING_SEGMENTS)) {
+          long gen = Long.parseLong(fileName.substring(IndexFileNames.PENDING_SEGMENTS.length()+1), Character.MAX_RADIX);
+          if (gen > maxPendingGen) {
+            maxPendingGen = gen;
+          }
+        }
+      }
+
+      SegmentInfos infos;
+      if (segmentsFileName == null) {
+        // No index here yet:
+        infos = new SegmentInfos();
+        message("top: init: no segments in index");
+      } else {
+        message("top: init: read existing segments commit " + segmentsFileName);
+        infos = SegmentInfos.readCommit(dir, segmentsFileName);
+        message("top: init: segments: " + infos.toString() + " version=" + infos.getVersion());
+        Collection<String> indexFiles = infos.files(false);
+
+        lastCommitFiles.add(segmentsFileName);
+        lastCommitFiles.addAll(indexFiles);
+
+        // Always protect the last commit:
+        deleter.incRef(lastCommitFiles);
+
+        lastNRTFiles.addAll(indexFiles);
+        deleter.incRef(lastNRTFiles);
+        message("top: commitFiles=" + lastCommitFiles);
+        message("top: nrtFiles=" + lastNRTFiles);
+      }
+
+      message("top: delete unknown files on init: all files=" + Arrays.toString(dir.listAll()));
+      deleter.deleteUnknownFiles(segmentsFileName);
+      message("top: done delete unknown files on init: all files=" + Arrays.toString(dir.listAll()));
+
+      // nocommit make test where index has all docs deleted (all segments dropped, so 0 segments) and is then replicated
+
+      String s = infos.getUserData().get(PRIMARY_GEN_KEY);
+      long myPrimaryGen;
+      if (s == null) {
+        assert infos.size() == 0;
+        myPrimaryGen = -1;
+      } else {
+        myPrimaryGen = Long.parseLong(s);
+      }
+      message("top: myPrimaryGen=" + myPrimaryGen);
+
+      boolean doCommit;
+
+      if (infos.size() > 0 && myPrimaryGen != -1 && myPrimaryGen != curPrimaryGen) {
+
+        assert myPrimaryGen < curPrimaryGen;
+
+        // Primary changed while we were down.  In this case, we must sync from primary before opening a reader, because it's possible current
+        // files we have will need to be overwritten with different ones (if index rolled back and "forked"), and we can't overwrite open
+        // files on Windows:
+
+        final long initSyncStartNS = System.nanoTime();
+
+        message("top: init: primary changed while we were down myPrimaryGen=" + myPrimaryGen +
+                " vs curPrimaryGen=" + curPrimaryGen +
+                "; sync now before mgr init");
+
+        // Try until we succeed in copying over the latest NRT point:
+        CopyJob job = null;
+
+        // We may need to overwrite files referenced by our latest commit, either right now on initial sync, or on a later sync.  To make
+        // sure the index is never even in an "apparently" corrupt state (where an old segments_N references invalid files) we forcefully
+        // remove the commit now, and refuse to start the replica if this delete fails:
+        message("top: now delete starting commit point " + segmentsFileName);
+
+        // If this throws exc (e.g. due to virus checker), we cannot start this replica:
+        assert deleter.getRefCount(segmentsFileName) == 1;
+        deleter.decRef(Collections.singleton(segmentsFileName));
+        if (deleter.isPending(segmentsFileName)) {
+          // If e.g. virus checker blocks us from deleting, we absolutely cannot start this node else we can cause corruption:
+          throw new RuntimeException("replica cannot start: existing segments file=" + segmentsFileName + " must be removed in order to start, but the file delete failed");
+        }
+        // So we don't later try to decRef it (illegally) again:
+        boolean didRemove = lastCommitFiles.remove(segmentsFileName);
+        assert didRemove;
+
+        while (true) {
+          job = newCopyJob("sync on startup replica=" + name() + " myVersion=" + infos.getVersion(),
+                           null,
+                           null,
+                           true,
+                           null);
+          job.start();
+
+          message("top: init: sync sis.version=" + job.getCopyState().version);
+
+          Collection<String> fileNamesToCopy = job.getFileNamesToCopy();
+
+          // Force this copy job to finish while we wait, now.  Note that this can be very time consuming!
+          // NOTE: newNRTPoint detects we are still in init (mgr is null) and does not cancel our copy if a flush happens
+          try {
+            job.runBlocking();
+            job.finish();
+
+            // Success!
+            break;
+          } catch (IOException ioe) {
+            job.cancel("startup failed", ioe);
+            if (ioe.getMessage().contains("checksum mismatch after file copy")) {
+              // OK-ish
+              message("top: failed to copy: " + ioe + "; retrying");
+            } else {
+              throw ioe;
+            }
+          }
+        }
+
+        lastPrimaryGen = job.getCopyState().primaryGen;
+        byte[] infosBytes = job.getCopyState().infosBytes;
+
+        SegmentInfos syncInfos = SegmentInfos.readCommit(dir,
+                                                         new BufferedChecksumIndexInput(new ByteArrayIndexInput("SegmentInfos", job.getCopyState().infosBytes)),
+                                                         job.getCopyState().gen);
+
+        // Must always commit to a larger generation than what's currently in the index:
+        syncInfos.updateGeneration(infos);
+        infos = syncInfos;
+
+        assert infos.getVersion() == job.getCopyState().version;
+        message("  version=" + infos.getVersion() + " segments=" + infos.toString());
+        message("top: init: incRef nrtFiles=" + job.getFileNames());
+        deleter.incRef(job.getFileNames());
+        message("top: init: decRef lastNRTFiles=" + lastNRTFiles);
+        deleter.decRef(lastNRTFiles);
+
+        lastNRTFiles.clear();
+        lastNRTFiles.addAll(job.getFileNames());
+
+        message("top: init: set lastNRTFiles=" + lastNRTFiles);
+        lastFileMetaData = job.getCopyState().files;
+        message(String.format(Locale.ROOT, "top: %d: start: done sync: took %.3fs for %s, opened NRT reader version=%d",
+                              id,
+                              (System.nanoTime()-initSyncStartNS)/1000000000.0,
+                              bytesToString(job.getTotalBytesCopied()),
+                              job.getCopyState().version));
+
+        doCommit = true;
+      } else {
+        doCommit = false;
+        lastPrimaryGen = curPrimaryGen;
+        message("top: same primary as before");
+      }
+
+      if (infos.getGeneration() < maxPendingGen) {
+        message("top: move infos generation from " + infos.getGeneration() + " to " + maxPendingGen);
+        infos.setNextWriteGeneration(maxPendingGen);
+      }
+
+      // Notify primary we started, to give it a chance to send any warming merges our way to reduce NRT latency of first sync:
+      sendNewReplica();
+
+      // Finally, we are open for business, since our index now "agrees" with the primary:
+      mgr = new SegmentInfosSearcherManager(dir, this, infos, searcherFactory);
+
+      // Must commit after init mgr:
+      if (doCommit) {
+        // Very important to commit what we just sync'd over, because we removed the pre-existing commit point above if we had to
+        // overwrite any files it referenced:
+        commit();
+      }
+
+      message("top: done start");
+      state = "idle";
+    } catch (Throwable t) {
+      message("exc on start:");
+      t.printStackTrace(System.out);
+      throw new RuntimeException(t);
+    }
+  }
+  
+  final Object commitLock = new Object();
+
+  @Override
+  public void commit() throws IOException {
+
+    synchronized(commitLock) {
+
+      SegmentInfos infos;
+      Collection<String> indexFiles;
+
+      synchronized (this) {
+        infos = ((SegmentInfosSearcherManager) mgr).getCurrentInfos();
+        indexFiles = infos.files(false);
+        deleter.incRef(indexFiles);
+      }
+
+      message("top: commit primaryGen=" + lastPrimaryGen + " infos=" + infos.toString() + " files=" + indexFiles);
+
+      // fsync all index files we are now referencing
+      dir.sync(indexFiles);
+
+      Map<String,String> commitData = new HashMap<>();
+      commitData.put(PRIMARY_GEN_KEY, Long.toString(lastPrimaryGen));
+      commitData.put(VERSION_KEY, Long.toString(getCurrentSearchingVersion()));
+      infos.setUserData(commitData, false);
+
+      // write and fsync a new segments_N
+      infos.commit(dir);
+
+      // Notify current infos (which may have changed while we were doing dir.sync above) what generation we are up to; this way future
+      // commits are guaranteed to go to the next (unwritten) generations:
+      if (mgr != null) {
+        ((SegmentInfosSearcherManager) mgr).getCurrentInfos().updateGeneration(infos);
+      }
+      String segmentsFileName = infos.getSegmentsFileName();
+      message("top: commit wrote segments file " + segmentsFileName + " version=" + infos.getVersion() + " sis=" + infos.toString() + " commitData=" + commitData);
+      deleter.incRef(Collections.singletonList(segmentsFileName));
+      message("top: commit decRef lastCommitFiles=" + lastCommitFiles);
+      deleter.decRef(lastCommitFiles);
+      lastCommitFiles.clear();
+      lastCommitFiles.addAll(indexFiles);
+      lastCommitFiles.add(segmentsFileName);
+      message("top: commit version=" + infos.getVersion() + " files now " + lastCommitFiles);
+    }
+  }
+
+  void finishNRTCopy(CopyJob job, long startNS) throws IOException {
+    CopyState copyState = job.getCopyState();
+    message("top: finishNRTCopy: version=" + copyState.version + (job.getFailed() ? " FAILED" : "") + " job=" + job);
+
+    // NOTE: if primary crashed while we were still copying then the job will hit an exc trying to read bytes for the files from the primary node,
+    // and the job will be marked as failed here:
+
+    synchronized (this) {
+
+      if ("syncing".equals(state)) {
+        state = "idle";
+      }
+
+      if (curNRTCopy == job) {
+        message("top: now clear curNRTCopy; job=" + job);
+        curNRTCopy = null;
+      } else {
+        assert job.getFailed();
+        message("top: skip clear curNRTCopy: we were cancelled; job=" + job);
+      }
+
+      if (job.getFailed()) {
+        return;
+      }
+
+      // Does final file renames:
+      job.finish();
+
+      // Turn byte[] back to SegmentInfos:
+      byte[] infosBytes = copyState.infosBytes;
+      SegmentInfos infos = SegmentInfos.readCommit(dir,
+                                                   new BufferedChecksumIndexInput(new ByteArrayIndexInput("SegmentInfos", copyState.infosBytes)),
+                                                   copyState.gen);
+      assert infos.getVersion() == copyState.version;
+
+      message("  version=" + infos.getVersion() + " segments=" + infos.toString());
+
+      // Cutover to new searcher:
+      if (mgr != null) {
+        ((SegmentInfosSearcherManager) mgr).setCurrentInfos(infos);
+      }
+
+      // Must first incRef new NRT files, then decRef old ones, to make sure we don't remove an NRT file that's in common to both:
+      Collection<String> newFiles = copyState.files.keySet();
+      message("top: incRef newNRTFiles=" + newFiles);
+      deleter.incRef(newFiles);
+
+      // If any of our new files were previously copied merges, we clear them now, so we don't try to later delete a non-existent file:
+      pendingMergeFiles.removeAll(newFiles);
+      message("top: after remove from pending merges pendingMergeFiles=" + pendingMergeFiles);
+
+      message("top: decRef lastNRTFiles=" + lastNRTFiles);
+      deleter.decRef(lastNRTFiles);
+      lastNRTFiles.clear();
+      lastNRTFiles.addAll(newFiles);
+      message("top: set lastNRTFiles=" + lastNRTFiles);
+
+      // At this point we can remove any completed merge segment files that we still do not reference.  This can happen when a merge
+      // finishes, copies its files out to us, but is then merged away (or dropped due to 100% deletions) before we ever cutover to it
+      // in an NRT point:
+      if (copyState.completedMergeFiles.isEmpty() == false) {
+        message("now remove-if-not-ref'd completed merge files: " + copyState.completedMergeFiles);
+        for(String fileName : copyState.completedMergeFiles) {
+          if (pendingMergeFiles.contains(fileName)) {
+            pendingMergeFiles.remove(fileName);
+            deleter.deleteIfNoRef(fileName);
+          }
+        }
+      }
+
+      lastFileMetaData = copyState.files;
+
+      // It's a good time to delete pending files, since we just refreshed and some previously open files are now closed:
+      deleter.deletePending();
+    }
+
+    int markerCount;
+    IndexSearcher s = mgr.acquire();
+    try {
+      markerCount = s.search(new TermQuery(new Term("marker", "marker")), 1).totalHits;
+    } finally {
+      mgr.release(s);
+    }
+
+    message(String.format(Locale.ROOT, "top: done sync: took %.3fs for %s, opened NRT reader version=%d markerCount=%d",
+                          (System.nanoTime()-startNS)/1000000000.0,
+                          bytesToString(job.getTotalBytesCopied()),
+                          copyState.version,
+                          markerCount));
+  }
+
+  /** Start a background copying job, to copy the specified files from the current primary node.  If files is null then the latest copy
+   *  state should be copied.  If prevJob is not null, then the new copy job is replacing it and should 1) cancel the previous one, and
+   *  2) optionally salvage e.g. partially copied and, shared with the new copy job, files. */
+  protected abstract CopyJob newCopyJob(String reason, Map<String,FileMetaData> files, Map<String,FileMetaData> prevFiles,
+                                        boolean highPriority, CopyJob.OnceDone onceDone) throws IOException;
+
+  /** Runs this job async'd */
+  protected abstract void launch(CopyJob job);
+
+  /** Tell primary we (replica) just started, so primary can tell us to warm any already warming merges.  This lets us keep low nrt refresh
+   *  time for the first nrt sync after we started. */
+  protected abstract void sendNewReplica() throws IOException;
+
+  /** Call this to notify this replica node that a new NRT infos is available on the primary.
+   *  We kick off a job (runs in the background) to copy files across, and open a new reader once that's done. */
+  public synchronized CopyJob newNRTPoint(long version) throws IOException {
+
+    if (isClosed()) {
+      throw new AlreadyClosedException("this replica is closed: state=" + state);
+    }
+
+    // Caller should not "publish" us until we have finished .start():
+    assert mgr != null;
+
+    if ("idle".equals(state)) {
+      state = "syncing";
+    }
+
+    long curVersion = getCurrentSearchingVersion();
+
+    message("top: start sync sis.version=" + version);
+
+    if (version == curVersion) {
+      // Caller releases the CopyState:
+      message("top: new NRT point has same version as current; skipping");
+      return null;
+    }
+
+    if (version < curVersion) {
+      // This can happen, if two syncs happen close together, and due to thread scheduling, the incoming older version runs after the newer version
+      message("top: new NRT point (version=" + version + ") is older than current (version=" + version + "); skipping");
+      return null;
+    }
+
+    final long startNS = System.nanoTime();
+
+    message("top: newNRTPoint");
+    CopyJob job = null;
+    try {
+      job = newCopyJob("NRT point sync version=" + version,
+                       null,
+                       lastFileMetaData,
+                       true,
+                       new CopyJob.OnceDone() {
+                         @Override
+                         public void run(CopyJob job) {
+                           try {
+                             finishNRTCopy(job, startNS);
+                           } catch (IOException ioe) {
+                             throw new RuntimeException(ioe);
+                           }
+                         }
+                       });
+    } catch (NodeCommunicationException nce) {
+      // E.g. primary could crash/close when we are asking it for the copy state:
+      message("top: ignoring communication exception creating CopyJob: " + nce);
+      nce.printStackTrace(System.out);
+      if (state.equals("syncing")) {
+        state = "idle";
+      }
+      return null;
+    }
+
+    Collection<String> newNRTFiles = job.getFileNames();
+    long newPrimaryGen = job.getCopyState().primaryGen;
+    maybeNewPrimary(newPrimaryGen);
+
+    message("top: newNRTPoint: job files=" + newNRTFiles);
+
+    if (curNRTCopy != null) {
+      job.transferAndCancel(curNRTCopy);
+      assert curNRTCopy.getFailed();
+    }
+
+    curNRTCopy = job;
+
+    for(String fileName : curNRTCopy.getFileNamesToCopy()) {
+      assert lastCommitFiles.contains(fileName) == false: "fileName=" + fileName + " is in lastCommitFiles and is being copied?";
+      synchronized (mergeCopyJobs) {
+        for (CopyJob mergeJob : mergeCopyJobs) {
+          if (mergeJob.getFileNames().contains(fileName)) {
+            // nocommit can't we .transferAndCancel?
+            message("top: now cancel merge copy job=" + mergeJob + ": file " + fileName + " is now being copied via NRT point");
+            mergeJob.cancel("newNRTPoint is copying over the same file", null);
+          }
+        }
+      }
+    }
+
+    try {
+      job.start();
+    } catch (NodeCommunicationException nce) {
+      // E.g. primary could crash/close when we are asking it for the copy state:
+      message("top: ignoring exception starting CopyJob: " + nce);
+      nce.printStackTrace(System.out);
+      if (state.equals("syncing")) {
+        state = "idle";
+      }
+      return null;
+    }
+
+    // Runs in the background jobs thread, maybe slowly/throttled, and calls finishSync once it's done:
+    launch(curNRTCopy);
+    return curNRTCopy;
+  }
+
+  public synchronized boolean isCopying() {
+    return curNRTCopy != null;
+  }
+
+  @Override
+  public boolean isClosed() {
+    return "closed".equals(state) || "closing".equals(state) || "crashing".equals(state) || "crashed".equals(state);
+  }
+
+  @Override
+  public void close() throws IOException {
+    message("top: now close");
+
+    synchronized (this) {
+      state = "closing";
+      if (curNRTCopy != null) {
+        curNRTCopy.cancel("closing", null);
+      }
+    }
+
+    synchronized (this) {
+      message("top: close mgr");
+      mgr.close();
+
+      message("top: decRef lastNRTFiles=" + lastNRTFiles);
+      deleter.decRef(lastNRTFiles);
+      lastNRTFiles.clear();
+
+      // NOTE: do not decRef these!
+      lastCommitFiles.clear();
+
+      message("top: delete if no ref pendingMergeFiles=" + pendingMergeFiles);
+      for(String fileName : pendingMergeFiles) {
+        deleter.deleteIfNoRef(fileName);
+      }
+      pendingMergeFiles.clear();
+    
+      message("top: close dir");
+      IOUtils.close(writeFileLock, dir);
+    }
+    message("top: done close");
+    state = "closed";
+  }
+
+  /** Called when the primary changed */
+  protected synchronized void maybeNewPrimary(long newPrimaryGen) {
+    if (newPrimaryGen != lastPrimaryGen) {
+      message("top: now change lastPrimaryGen from " + lastPrimaryGen + " to " + newPrimaryGen + " pendingMergeFiles=" + pendingMergeFiles);
+      assert newPrimaryGen > lastPrimaryGen: "newPrimaryGen=" + newPrimaryGen + " vs lastPrimaryGen=" + lastPrimaryGen;
+      lastPrimaryGen = newPrimaryGen;
+      pendingMergeFiles.clear();
+    } else {
+      message("top: keep current lastPrimaryGen=" + lastPrimaryGen);
+    }
+  }
+
+  protected synchronized CopyJob launchPreCopyMerge(AtomicBoolean finished, long newPrimaryGen, Map<String,FileMetaData> files) throws IOException {
+
+    CopyJob job;
+
+    maybeNewPrimary(newPrimaryGen);
+    final long primaryGenStart = lastPrimaryGen;
+    Set<String> fileNames = files.keySet();
+    message("now pre-copy warm merge files=" + fileNames + " primaryGen=" + newPrimaryGen);
+
+    for(String fileName : fileNames) {
+      assert pendingMergeFiles.contains(fileName) == false: "file \"" + fileName + "\" is already being warmed!";
+      assert lastNRTFiles.contains(fileName) == false: "file \"" + fileName + "\" is already NRT visible!";
+    }
+
+    job = newCopyJob("warm merge on " + name() + " filesNames=" + fileNames,
+                     files, null, false,
+                     new CopyJob.OnceDone() {
+
+                       @Override
+                       public void run(CopyJob job) throws IOException {
+                         // Signals that this replica has finished
+                         mergeCopyJobs.remove(job);
+                         message("done warming merge " + fileNames + " failed?=" + job.getFailed());
+                         synchronized(this) {
+                           if (job.getFailed() == false) {
+                             if (lastPrimaryGen != primaryGenStart) {
+                               message("merge pre copy finished but primary has changed; cancelling job files=" + fileNames);
+                               job.cancel("primary changed during merge copy", null);
+                             } else {
+                               boolean abort = false;
+                               for (String fileName : fileNames) {
+                                 if (lastNRTFiles.contains(fileName)) {
+                                   message("abort merge finish: file " + fileName + " is referenced by last NRT point");
+                                   abort = true;
+                                 }
+                                 if (lastCommitFiles.contains(fileName)) {
+                                   message("abort merge finish: file " + fileName + " is referenced by last commit point");
+                                   abort = true;
+                                 }
+                               }
+                               if (abort) {
+                                 // Even though in newNRTPoint we have similar logic, which cancels any merge copy jobs if an NRT point
+                                 // shows up referencing the files we are warming (because primary got impatient and gave up on us), we also
+                                 // need it here in case replica is way far behind and fails to even receive the merge pre-copy request
+                                 // until after the newNRTPoint referenced those files:
+                                 job.cancel("merged segment was separately copied via NRT point", null);
+                               } else {
+                                 job.finish();
+                                 message("merge pre copy finished files=" + fileNames);
+                                 for(String fileName : fileNames) {
+                                   assert pendingMergeFiles.contains(fileName) == false : "file \"" + fileName + "\" is already in pendingMergeFiles";
+                                   message("add file " + fileName + " to pendingMergeFiles");
+                                   pendingMergeFiles.add(fileName);
+                                 }
+                               }
+                             }
+                           } else {
+                             message("merge copy finished with failure");
+                           }
+                         }
+                         finished.set(true);
+                       }
+                     });
+
+    job.start();
+
+    // When warming a merge we better not already have any of these files copied!
+    assert job.getFileNamesToCopy().size() == files.size();
+
+    mergeCopyJobs.add(job);
+    launch(job);
+
+    return job;
+  }
+
+  public IndexOutput createTempOutput(String prefix, String suffix, IOContext ioContext) throws IOException {
+    return dir.createTempOutput(prefix, suffix, IOContext.DEFAULT);
+  }
+
+  /** Compares incoming per-file identity (id, checksum, header, footer) versus what we have locally and returns the subset of the incoming
+   *  files that need copying */
+  public List<Map.Entry<String,FileMetaData>> getFilesToCopy(Map<String,FileMetaData> files) throws IOException {
+
+    boolean doCopyCommitFiles = false;
+    List<Map.Entry<String,FileMetaData>> toCopy = new ArrayList<>();
+    for (Map.Entry<String,FileMetaData> ent : files.entrySet()) {
+      String fileName = ent.getKey();
+      FileMetaData fileMetaData = ent.getValue();
+      if (fileIsIdentical(fileName, fileMetaData) == false) {
+        toCopy.add(ent);
+      }
+    }
+
+    return toCopy;
+  }
+
+  /** Carefully determine if the file on the primary, identified by its {@code String fileName} along with the {@link FileMetaData}
+   * "summarizing" its contents, is precisely the same file that we have locally.  If the file does not exist locally, or if its its header
+   * (inclues the segment id), length, footer (including checksum) differ, then this returns false, else true. */
+  private boolean fileIsIdentical(String fileName, FileMetaData srcMetaData) throws IOException {
+
+    if (deleter.isPending(fileName)) {
+      // This was a file we had wanted to delete yet a virus checker prevented us, and now we need to overwrite it.
+      // Such files are in an unknown state, and even if their header and footer and length all
+      // match, since they may not have been fsync'd by the previous node instance on this directory,
+      // they could in theory have corruption internally.  So we always force ourselves to copy them here:
+      if (Node.VERBOSE_FILES) {
+        message("file " + fileName + ": will copy [we had wanted to delete this file on init, but failed]");
+      }
+      return false;
+    }
+
+    FileMetaData destMetaData = readLocalFileMetaData(fileName);
+    if (destMetaData == null) {
+      // Something went wrong in reading the file (it's corrupt, truncated, does not exist, etc.):
+      return false;
+    }
+
+    if (Arrays.equals(destMetaData.header, srcMetaData.header) == false ||
+        Arrays.equals(destMetaData.footer, srcMetaData.footer) == false) {
+      // Segment name was reused!  This is rare but possible and otherwise devastating:
+      if (Node.VERBOSE_FILES) {
+        message("file " + fileName + ": will copy [header/footer is different]");
+      }
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  private ConcurrentMap<String,Boolean> copying = new ConcurrentHashMap<>();
+
+  // Used only to catch bugs, ensuring a given file name is only ever being copied bye one job:
+  public void startCopyFile(String name) {
+    if (copying.putIfAbsent(name, Boolean.TRUE) != null) {
+      throw new IllegalStateException("file " + name + " is being copied in two places!");
+    }
+  }
+
+  public void finishCopyFile(String name) {
+    if (copying.remove(name) == null) {
+      throw new IllegalStateException("file " + name + " was not actually being copied?");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/SegmentInfosSearcherManager.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/SegmentInfosSearcherManager.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/SegmentInfosSearcherManager.java
new file mode 100644
index 0000000..72ed921
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/SegmentInfosSearcherManager.java
@@ -0,0 +1,129 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.LeafReader;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.index.StandardDirectoryReader;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.ReferenceManager;
+import org.apache.lucene.search.SearcherFactory;
+import org.apache.lucene.search.SearcherManager;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/** A SearcherManager that refreshes via an externally provided (NRT) SegmentInfos, either from {@link IndexWriter} or via
+ *  nrt replication to another index. */
+class SegmentInfosSearcherManager extends ReferenceManager<IndexSearcher> {
+  private volatile SegmentInfos currentInfos;
+  private final Directory dir;
+  private final Node node;
+  private final AtomicInteger openReaderCount = new AtomicInteger();
+  private final SearcherFactory searcherFactory;
+
+  public SegmentInfosSearcherManager(Directory dir, Node node, SegmentInfos infosIn, SearcherFactory searcherFactory) throws IOException {
+    this.dir = dir;
+    this.node = node;
+    if (searcherFactory == null) {
+      searcherFactory = new SearcherFactory();
+    }
+    this.searcherFactory = searcherFactory;
+    currentInfos = infosIn;
+    node.message("SegmentInfosSearcherManager.init: use incoming infos=" + infosIn.toString());
+    current = SearcherManager.getSearcher(searcherFactory, StandardDirectoryReader.open(dir, currentInfos, null), null);
+    addReaderClosedListener(current.getIndexReader());
+  }
+
+  @Override
+  protected int getRefCount(IndexSearcher s) {
+    return s.getIndexReader().getRefCount();
+  }
+
+  @Override
+  protected boolean tryIncRef(IndexSearcher s) {
+    return s.getIndexReader().tryIncRef();
+  }
+
+  @Override
+  protected void decRef(IndexSearcher s) throws IOException {
+    s.getIndexReader().decRef();
+  }
+
+  public SegmentInfos getCurrentInfos() {
+    return currentInfos;
+  }
+
+  /** Switch to new segments, refreshing if necessary.  Note that it's the caller job to ensure there's a held refCount for the
+   *  incoming infos, so all files exist. */
+  public void setCurrentInfos(SegmentInfos infos) throws IOException {
+    if (currentInfos != null) {
+      // So that if we commit, we will go to the next
+      // (unwritten so far) generation:
+      infos.updateGeneration(currentInfos);
+      node.message("mgr.setCurrentInfos: carry over infos gen=" + infos.getSegmentsFileName());
+    }
+    currentInfos = infos;
+    maybeRefresh();
+  }
+
+  @Override
+  protected IndexSearcher refreshIfNeeded(IndexSearcher old) throws IOException {
+    List<LeafReader> subs;
+    if (old == null) {
+      subs = null;
+    } else {
+      subs = new ArrayList<>();
+      for(LeafReaderContext ctx : old.getIndexReader().leaves()) {
+        subs.add(ctx.reader());
+      }
+    }
+
+    // Open a new reader, sharing any common segment readers with the old one:
+    DirectoryReader r = StandardDirectoryReader.open(dir, currentInfos, subs);
+    addReaderClosedListener(r);
+    node.message("refreshed to version=" + currentInfos.getVersion() + " r=" + r);
+    return SearcherManager.getSearcher(searcherFactory, r, (DirectoryReader) old.getIndexReader());
+  }
+
+  private void addReaderClosedListener(IndexReader r) {
+    openReaderCount.incrementAndGet();
+    r.addReaderClosedListener(new IndexReader.ReaderClosedListener() {
+        @Override
+        public void onClose(IndexReader reader) {
+          onReaderClosed();
+        }
+      });
+  }
+
+  /** Tracks how many readers are still open, so that when we are closed,
+   *  we can additionally wait until all in-flight searchers are
+   *  closed. */
+  synchronized void onReaderClosed() {
+    if (openReaderCount.decrementAndGet() == 0) {
+      notifyAll();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Connection.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Connection.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Connection.java
new file mode 100644
index 0000000..7db7bc1
--- /dev/null
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Connection.java
@@ -0,0 +1,63 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketException;
+
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.DataOutput;
+import org.apache.lucene.store.InputStreamDataInput;
+import org.apache.lucene.store.OutputStreamDataOutput;
+
+/** Simple point-to-point TCP connection */
+class Connection implements Closeable {
+  public final DataInput in;
+  public final DataOutput out;
+  public final InputStream sockIn;
+  public final BufferedOutputStream bos;
+  public final Socket s;
+  public final int destTCPPort;
+  public long lastKeepAliveNS = System.nanoTime();
+
+  public Connection(int tcpPort) throws IOException {
+    this.destTCPPort = tcpPort;
+    this.s = new Socket(InetAddress.getLoopbackAddress(), tcpPort);
+    this.sockIn = s.getInputStream();
+    this.in = new InputStreamDataInput(sockIn);
+    this.bos = new BufferedOutputStream(s.getOutputStream());
+    this.out = new OutputStreamDataOutput(bos);
+    if (Node.VERBOSE_CONNECTIONS) {
+      System.out.println("make new client Connection socket=" + this.s + " destPort=" + tcpPort);
+    }
+  }
+
+  public void flush() throws IOException {
+    bos.flush();
+  }
+
+  @Override
+  public void close() throws IOException {
+    s.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Jobs.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Jobs.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Jobs.java
new file mode 100644
index 0000000..369414f
--- /dev/null
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Jobs.java
@@ -0,0 +1,152 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.lucene.store.AlreadyClosedException;
+
+/** Runs CopyJob(s) in background thread; each ReplicaNode has an instance of this
+ *  running.  At a given there could be one NRT copy job running, and multiple
+ *  pre-warm merged segments jobs. */
+class Jobs extends Thread implements Closeable {
+
+  private final PriorityQueue<CopyJob> queue = new PriorityQueue<>();
+
+  private final Node node;
+
+  public Jobs(Node node) {
+    this.node = node;
+  }
+
+  private boolean finish;
+
+  /** Returns null if we are closing, else, returns the top job or waits for one to arrive if the queue is empty. */
+  private synchronized SimpleCopyJob getNextJob() {
+    while (true) {
+      if (finish) {
+        return null;
+      } else if (queue.isEmpty()) {
+        try {
+          wait();
+        } catch (InterruptedException ie) {
+          throw new RuntimeException(ie);
+        }
+      } else {
+        return (SimpleCopyJob) queue.poll();
+      }
+    }
+  }
+
+  @Override
+  public void run() {
+    while (true) {
+      SimpleCopyJob topJob = getNextJob();
+      if (topJob == null) {
+        assert finish;
+        break;
+      }
+
+      this.setName("jobs o" + topJob.ord);
+
+      assert topJob != null;
+
+      boolean result;
+      try {
+        result = topJob.visit();
+      } catch (Throwable t) {
+        if ((t instanceof AlreadyClosedException) == false) {
+          node.message("exception during job.visit job=" + topJob + "; now cancel");
+          t.printStackTrace(System.out);
+        } else {
+          node.message("AlreadyClosedException during job.visit job=" + topJob + "; now cancel");
+        }
+        topJob.cancel("unexpected exception in visit", t);
+        try {
+          topJob.onceDone.run(topJob);
+        } catch (Throwable t2) {
+          node.message("ignore exception calling OnceDone: " + t2);
+        }
+        continue;
+      }
+
+      if (result == false) {
+        // Job isn't done yet; put it back:
+        synchronized (this) {
+          queue.offer(topJob);
+        }
+      } else {
+        // Job finished, now notify caller:
+        try {
+          topJob.onceDone.run(topJob);
+        } catch (Throwable t) {
+          node.message("ignore exception calling OnceDone: " + t);
+        }
+      }
+    }
+
+    node.message("top: jobs now exit run thread");
+
+    synchronized(this) {
+      // Gracefully cancel any jobs we didn't finish:
+      while (queue.isEmpty() == false) {
+        SimpleCopyJob job = (SimpleCopyJob) queue.poll();
+        node.message("top: Jobs: now cancel job=" + job);
+        job.cancel("jobs closing", null);
+        try {
+          job.onceDone.run(job);
+        } catch (Throwable t) {
+          node.message("ignore exception calling OnceDone: " + t);
+        }
+      }
+    }
+  }
+
+  public synchronized void launch(CopyJob job) {
+    if (finish == false) {
+      queue.offer(job);
+      notify();
+    } else {
+      throw new AlreadyClosedException("closed");
+    }
+  }
+
+  /** Cancels any existing jobs that are copying the same file names as this one */
+  public synchronized void cancelConflictingJobs(CopyJob newJob) {
+    for (CopyJob job : queue) {
+      if (job.conflicts(newJob)) {
+        node.message("top: now cancel existing conflicting job=" + job + " due to newJob=" + newJob);
+        job.cancel("conflicts with new job", null);
+      }
+    }
+  }
+
+  @Override
+  public synchronized void close() {
+    finish = true;
+    notify();
+    try {
+      join();
+    } catch (InterruptedException ie) {
+      throw new RuntimeException(ie);
+    }
+  }
+}