You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/02/06 17:24:08 UTC

[14/15] lucene-solr git commit: Merge branch 'master' into nrt_replicas

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/java/org/apache/lucene/codecs/CodecUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/java/org/apache/lucene/index/DirectoryReader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/java/org/apache/lucene/index/ReaderManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/java/org/apache/lucene/search/SearcherManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/java/org/apache/lucene/store/NRTCachingDirectory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/java/org/apache/lucene/util/IOUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions2.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnJRECrash.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnVMError.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/test/org/apache/lucene/index/TestRollingUpdates.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/test/org/apache/lucene/index/TestStressDeletes.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/test/org/apache/lucene/search/TestLRUQueryCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/test/org/apache/lucene/search/TestSearcherManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/SearcherTaxonomyManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/directory/DirectoryTaxonomyWriter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
----------------------------------------------------------------------
diff --cc lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
index 133992f,0000000..a7adbe2
mode 100644,000000..100644
--- a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
@@@ -1,763 -1,0 +1,768 @@@
 +package org.apache.lucene.replicator.nrt;
 +
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Locale;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ConcurrentMap;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.lucene.index.DirectoryReader;
 +import org.apache.lucene.index.IndexFileNames;
 +import org.apache.lucene.index.IndexWriter;
 +import org.apache.lucene.index.SegmentInfos;
 +import org.apache.lucene.index.Term;
 +import org.apache.lucene.search.IndexSearcher;
 +import org.apache.lucene.search.SearcherFactory;
 +import org.apache.lucene.search.TermQuery;
 +import org.apache.lucene.store.AlreadyClosedException;
 +import org.apache.lucene.store.BufferedChecksumIndexInput;
 +import org.apache.lucene.store.ByteArrayIndexInput;
 +import org.apache.lucene.store.Directory;
++import org.apache.lucene.store.FSDirectory;
 +import org.apache.lucene.store.IOContext;
 +import org.apache.lucene.store.IndexOutput;
 +import org.apache.lucene.store.Lock;
 +import org.apache.lucene.util.IOUtils;
 +
 +/** Replica node, that pulls index changes from the primary node by copying newly flushed or merged index files */
 +
 +abstract class ReplicaNode extends Node {
 +
 +  ReplicaFileDeleter deleter;
 +
 +  /** IncRef'd files in the current commit point: */
 +  private final Collection<String> lastCommitFiles = new HashSet<>();
 +
 +  /** IncRef'd files in the current NRT point: */
 +  protected final Collection<String> lastNRTFiles = new HashSet<>();
 +
 +  /** Currently running merge pre-copy jobs */
 +  protected final Set<CopyJob> mergeCopyJobs = Collections.synchronizedSet(new HashSet<>());
 +
 +  /** Non-null when we are currently copying files from a new NRT point: */
 +  protected CopyJob curNRTCopy;
 +
 +  /** We hold this to ensure an external IndexWriter cannot also open on our directory: */
 +  private final Lock writeFileLock;
 +
 +  /** Merged segment files that we pre-copied, but have not yet made visible in a new NRT point. */
 +  final Set<String> pendingMergeFiles = Collections.synchronizedSet(new HashSet<String>());
 +
 +  /** Primary gen last time we successfully replicated: */
 +  protected long lastPrimaryGen;
 +
 +  public ReplicaNode(int id, Directory dir, SearcherFactory searcherFactory) throws IOException {
 +    super(id, dir, searcherFactory);
 +
++    if (dir instanceof FSDirectory && ((FSDirectory) dir).checkPendingDeletions()) {
++      throw new IllegalArgumentException("Directory " + dir + " still has pending deleted files; cannot initialize IndexWriter");
++    }
++
 +    boolean success = false;
 +
 +    try {
 +      message("top: init replica dir=" + dir);
 +
 +      // Obtain a write lock on this index since we "act like" an IndexWriter, to prevent any other IndexWriter or ReplicaNode from using it:
 +      writeFileLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME);
 +      
 +      // nocommit must check for no pending deletes here, like IW does
 +
 +      state = "init";
 +      deleter = new ReplicaFileDeleter(this, dir);
 +    } catch (Throwable t) {
 +      message("exc on init:");
 +      t.printStackTrace(System.out);
 +      throw t;
 +    } finally {
 +      if (success == false) {
 +        IOUtils.closeWhileHandlingException(this);
 +      }
 +    }
 +  }
 +
 +  /** Start up this replica, which possibly requires heavy copying of files from the primary node, if we were down for a long time */
 +  protected synchronized void start(long curPrimaryGen) throws IOException {
 +
 +    if (state.equals("init") == false) {
 +      throw new IllegalStateException("already started");
 +    }
 +
 +    message("top: now start");
 +    try {
 +
 +      // Figure out what state our local index is in now:
 +      String segmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(dir);
 +
 +      // Also look for any pending_segments_N, in case we crashed mid-commit.  We must "inflate" our infos gen to at least this, since
 +      // otherwise we may wind up re-using the pending_segments_N file name on commit, and then our deleter can get angry because it still
 +      // wants to delete this file:
 +      long maxPendingGen = -1;
 +      for(String fileName : dir.listAll()) {
 +        if (fileName.startsWith(IndexFileNames.PENDING_SEGMENTS)) {
 +          long gen = Long.parseLong(fileName.substring(IndexFileNames.PENDING_SEGMENTS.length()+1), Character.MAX_RADIX);
 +          if (gen > maxPendingGen) {
 +            maxPendingGen = gen;
 +          }
 +        }
 +      }
 +
 +      SegmentInfos infos;
 +      if (segmentsFileName == null) {
 +        // No index here yet:
 +        infos = new SegmentInfos();
 +        message("top: init: no segments in index");
 +      } else {
 +        message("top: init: read existing segments commit " + segmentsFileName);
 +        infos = SegmentInfos.readCommit(dir, segmentsFileName);
 +        message("top: init: segments: " + infos.toString() + " version=" + infos.getVersion());
 +        Collection<String> indexFiles = infos.files(false);
 +
 +        lastCommitFiles.add(segmentsFileName);
 +        lastCommitFiles.addAll(indexFiles);
 +
 +        // Always protect the last commit:
 +        deleter.incRef(lastCommitFiles);
 +
 +        lastNRTFiles.addAll(indexFiles);
 +        deleter.incRef(lastNRTFiles);
 +        message("top: commitFiles=" + lastCommitFiles);
 +        message("top: nrtFiles=" + lastNRTFiles);
 +      }
 +
 +      message("top: delete unknown files on init: all files=" + Arrays.toString(dir.listAll()));
 +      deleter.deleteUnknownFiles(segmentsFileName);
 +      message("top: done delete unknown files on init: all files=" + Arrays.toString(dir.listAll()));
 +
 +      String s = infos.getUserData().get(PRIMARY_GEN_KEY);
 +      long myPrimaryGen;
 +      if (s == null) {
 +        assert infos.size() == 0;
 +        myPrimaryGen = -1;
 +      } else {
 +        myPrimaryGen = Long.parseLong(s);
 +      }
 +      message("top: myPrimaryGen=" + myPrimaryGen);
 +
 +      boolean doCommit;
 +
 +      if (infos.size() > 0 && myPrimaryGen != -1 && myPrimaryGen != curPrimaryGen) {
 +
 +        assert myPrimaryGen < curPrimaryGen;
 +
 +        // Primary changed while we were down.  In this case, we must sync from primary before opening a reader, because it's possible current
 +        // files we have will need to be overwritten with different ones (if index rolled back and "forked"), and we can't overwrite open
 +        // files on Windows:
 +
 +        final long initSyncStartNS = System.nanoTime();
 +
 +        message("top: init: primary changed while we were down myPrimaryGen=" + myPrimaryGen +
 +                " vs curPrimaryGen=" + curPrimaryGen +
 +                "; sync now before mgr init");
 +
 +        // Try until we succeed in copying over the latest NRT point:
 +        CopyJob job = null;
 +
 +        // We may need to overwrite files referenced by our latest commit, either right now on initial sync, or on a later sync.  To make
 +        // sure the index is never even in an "apparently" corrupt state (where an old segments_N references invalid files) we forcefully
 +        // remove the commit now, and refuse to start the replica if this delete fails:
 +        message("top: now delete starting commit point " + segmentsFileName);
 +
 +        // If this throws exc (e.g. due to virus checker), we cannot start this replica:
 +        assert deleter.getRefCount(segmentsFileName) == 1;
 +        deleter.decRef(Collections.singleton(segmentsFileName));
 +        if (deleter.isPending(segmentsFileName)) {
 +          // If e.g. virus checker blocks us from deleting, we absolutely cannot start this node else we can cause corruption:
 +          throw new RuntimeException("replica cannot start: existing segments file=" + segmentsFileName + " must be removed in order to start, but the file delete failed");
 +        }
 +        // So we don't later try to decRef it (illegally) again:
 +        boolean didRemove = lastCommitFiles.remove(segmentsFileName);
 +        assert didRemove;
 +
 +        while (true) {
 +          job = newCopyJob("sync on startup replica=" + name() + " myVersion=" + infos.getVersion(),
 +                           null,
 +                           null,
 +                           true,
 +                           null);
 +          job.start();
 +
 +          message("top: init: sync sis.version=" + job.getCopyState().version);
 +
 +          Collection<String> fileNamesToCopy = job.getFileNamesToCopy();
 +
 +          // Force this copy job to finish while we wait, now.  Note that this can be very time consuming!
 +          // NOTE: newNRTPoint detects we are still in init (mgr is null) and does not cancel our copy if a flush happens
 +          try {
 +            job.runBlocking();
 +            job.finish();
 +
 +            // Success!
 +            break;
 +          } catch (IOException ioe) {
 +            job.cancel("startup failed", ioe);
 +            if (ioe.getMessage().contains("checksum mismatch after file copy")) {
 +              // OK-ish
 +              message("top: failed to copy: " + ioe + "; retrying");
 +            } else {
 +              throw ioe;
 +            }
 +          }
 +        }
 +
 +        lastPrimaryGen = job.getCopyState().primaryGen;
 +        byte[] infosBytes = job.getCopyState().infosBytes;
 +
 +        SegmentInfos syncInfos = SegmentInfos.readCommit(dir,
 +                                                         new BufferedChecksumIndexInput(new ByteArrayIndexInput("SegmentInfos", job.getCopyState().infosBytes)),
 +                                                         job.getCopyState().gen);
 +
 +        // Must always commit to a larger generation than what's currently in the index:
 +        syncInfos.updateGeneration(infos);
 +        infos = syncInfos;
 +
 +        assert infos.getVersion() == job.getCopyState().version;
 +        message("  version=" + infos.getVersion() + " segments=" + infos.toString());
 +        message("top: init: incRef nrtFiles=" + job.getFileNames());
 +        deleter.incRef(job.getFileNames());
 +        message("top: init: decRef lastNRTFiles=" + lastNRTFiles);
 +        deleter.decRef(lastNRTFiles);
 +
 +        lastNRTFiles.clear();
 +        lastNRTFiles.addAll(job.getFileNames());
 +
 +        message("top: init: set lastNRTFiles=" + lastNRTFiles);
 +        lastFileMetaData = job.getCopyState().files;
 +        message(String.format(Locale.ROOT, "top: %d: start: done sync: took %.3fs for %s, opened NRT reader version=%d",
 +                              id,
 +                              (System.nanoTime()-initSyncStartNS)/1000000000.0,
 +                              bytesToString(job.getTotalBytesCopied()),
 +                              job.getCopyState().version));
 +
 +        doCommit = true;
 +      } else {
 +        doCommit = false;
 +        lastPrimaryGen = curPrimaryGen;
 +        message("top: same primary as before");
 +      }
 +
 +      if (infos.getGeneration() < maxPendingGen) {
 +        message("top: move infos generation from " + infos.getGeneration() + " to " + maxPendingGen);
 +        infos.setNextWriteGeneration(maxPendingGen);
 +      }
 +
 +      // Notify primary we started, to give it a chance to send any warming merges our way to reduce NRT latency of first sync:
 +      sendNewReplica();
 +
 +      // Finally, we are open for business, since our index now "agrees" with the primary:
 +      mgr = new SegmentInfosSearcherManager(dir, this, infos, searcherFactory);
 +
 +      IndexSearcher searcher = mgr.acquire();
 +      try {
 +        // TODO: this is test specific:
 +        int hitCount = searcher.count(new TermQuery(new Term("marker", "marker")));
 +        message("top: marker count=" + hitCount + " version=" + ((DirectoryReader) searcher.getIndexReader()).getVersion());
 +      } finally {
 +        mgr.release(searcher);
 +      }
 +
 +      // Must commit after init mgr:
 +      if (doCommit) {
 +        // Very important to commit what we just sync'd over, because we removed the pre-existing commit point above if we had to
 +        // overwrite any files it referenced:
 +        commit();
 +      }
 +
 +      message("top: done start");
 +      state = "idle";
 +    } catch (Throwable t) {
 +      if (t.getMessage().startsWith("replica cannot start") == false) {
 +        message("exc on start:");
 +        t.printStackTrace(System.out);
 +      } else {
 +        dir.close();
 +      }
 +      IOUtils.reThrow(t);
 +    }
 +  }
 +  
 +  final Object commitLock = new Object();
 +
 +  @Override
 +  public void commit() throws IOException {
 +
 +    synchronized(commitLock) {
 +
 +      SegmentInfos infos;
 +      Collection<String> indexFiles;
 +
 +      synchronized (this) {
 +        infos = ((SegmentInfosSearcherManager) mgr).getCurrentInfos();
 +        indexFiles = infos.files(false);
 +        deleter.incRef(indexFiles);
 +      }
 +
 +      message("top: commit primaryGen=" + lastPrimaryGen + " infos=" + infos.toString() + " files=" + indexFiles);
 +
 +      // fsync all index files we are now referencing
 +      dir.sync(indexFiles);
 +
 +      Map<String,String> commitData = new HashMap<>();
 +      commitData.put(PRIMARY_GEN_KEY, Long.toString(lastPrimaryGen));
 +      commitData.put(VERSION_KEY, Long.toString(getCurrentSearchingVersion()));
 +      infos.setUserData(commitData, false);
 +
 +      // write and fsync a new segments_N
 +      infos.commit(dir);
 +
 +      // Notify current infos (which may have changed while we were doing dir.sync above) what generation we are up to; this way future
 +      // commits are guaranteed to go to the next (unwritten) generations:
 +      if (mgr != null) {
 +        ((SegmentInfosSearcherManager) mgr).getCurrentInfos().updateGeneration(infos);
 +      }
 +      String segmentsFileName = infos.getSegmentsFileName();
 +      message("top: commit wrote segments file " + segmentsFileName + " version=" + infos.getVersion() + " sis=" + infos.toString() + " commitData=" + commitData);
 +      deleter.incRef(Collections.singletonList(segmentsFileName));
 +      message("top: commit decRef lastCommitFiles=" + lastCommitFiles);
 +      deleter.decRef(lastCommitFiles);
 +      lastCommitFiles.clear();
 +      lastCommitFiles.addAll(indexFiles);
 +      lastCommitFiles.add(segmentsFileName);
 +      message("top: commit version=" + infos.getVersion() + " files now " + lastCommitFiles);
 +    }
 +  }
 +
 +  void finishNRTCopy(CopyJob job, long startNS) throws IOException {
 +    CopyState copyState = job.getCopyState();
 +    message("top: finishNRTCopy: version=" + copyState.version + (job.getFailed() ? " FAILED" : "") + " job=" + job);
 +
 +    // NOTE: if primary crashed while we were still copying then the job will hit an exc trying to read bytes for the files from the primary node,
 +    // and the job will be marked as failed here:
 +
 +    synchronized (this) {
 +
 +      if ("syncing".equals(state)) {
 +        state = "idle";
 +      }
 +
 +      if (curNRTCopy == job) {
 +        message("top: now clear curNRTCopy; job=" + job);
 +        curNRTCopy = null;
 +      } else {
 +        assert job.getFailed();
 +        message("top: skip clear curNRTCopy: we were cancelled; job=" + job);
 +      }
 +
 +      if (job.getFailed()) {
 +        return;
 +      }
 +
 +      // Does final file renames:
 +      job.finish();
 +
 +      // Turn byte[] back to SegmentInfos:
 +      byte[] infosBytes = copyState.infosBytes;
 +      SegmentInfos infos = SegmentInfos.readCommit(dir,
 +                                                   new BufferedChecksumIndexInput(new ByteArrayIndexInput("SegmentInfos", copyState.infosBytes)),
 +                                                   copyState.gen);
 +      assert infos.getVersion() == copyState.version;
 +
 +      message("  version=" + infos.getVersion() + " segments=" + infos.toString());
 +
 +      // Cutover to new searcher:
 +      if (mgr != null) {
 +        ((SegmentInfosSearcherManager) mgr).setCurrentInfos(infos);
 +      }
 +
 +      // Must first incRef new NRT files, then decRef old ones, to make sure we don't remove an NRT file that's in common to both:
 +      Collection<String> newFiles = copyState.files.keySet();
 +      message("top: incRef newNRTFiles=" + newFiles);
 +      deleter.incRef(newFiles);
 +
 +      // If any of our new files were previously copied merges, we clear them now, so we don't try to later delete a non-existent file:
 +      pendingMergeFiles.removeAll(newFiles);
 +      message("top: after remove from pending merges pendingMergeFiles=" + pendingMergeFiles);
 +
 +      message("top: decRef lastNRTFiles=" + lastNRTFiles);
 +      deleter.decRef(lastNRTFiles);
 +      lastNRTFiles.clear();
 +      lastNRTFiles.addAll(newFiles);
 +      message("top: set lastNRTFiles=" + lastNRTFiles);
 +
 +      // At this point we can remove any completed merge segment files that we still do not reference.  This can happen when a merge
 +      // finishes, copies its files out to us, but is then merged away (or dropped due to 100% deletions) before we ever cutover to it
 +      // in an NRT point:
 +      if (copyState.completedMergeFiles.isEmpty() == false) {
 +        message("now remove-if-not-ref'd completed merge files: " + copyState.completedMergeFiles);
 +        for(String fileName : copyState.completedMergeFiles) {
 +          if (pendingMergeFiles.contains(fileName)) {
 +            pendingMergeFiles.remove(fileName);
 +            deleter.deleteIfNoRef(fileName);
 +          }
 +        }
 +      }
 +
 +      lastFileMetaData = copyState.files;
 +
 +      // It's a good time to delete pending files, since we just refreshed and some previously open files are now closed:
 +      deleter.deletePending();
 +    }
 +
 +    int markerCount;
 +    IndexSearcher s = mgr.acquire();
 +    try {
 +      markerCount = s.count(new TermQuery(new Term("marker", "marker")));
 +    } finally {
 +      mgr.release(s);
 +    }
 +
 +    message(String.format(Locale.ROOT, "top: done sync: took %.3fs for %s, opened NRT reader version=%d markerCount=%d",
 +                          (System.nanoTime()-startNS)/1000000000.0,
 +                          bytesToString(job.getTotalBytesCopied()),
 +                          copyState.version,
 +                          markerCount));
 +  }
 +
 +  /** Start a background copying job, to copy the specified files from the current primary node.  If files is null then the latest copy
 +   *  state should be copied.  If prevJob is not null, then the new copy job is replacing it and should 1) cancel the previous one, and
 +   *  2) optionally salvage e.g. partially copied and, shared with the new copy job, files. */
 +  protected abstract CopyJob newCopyJob(String reason, Map<String,FileMetaData> files, Map<String,FileMetaData> prevFiles,
 +                                        boolean highPriority, CopyJob.OnceDone onceDone) throws IOException;
 +
 +  /** Runs this job async'd */
 +  protected abstract void launch(CopyJob job);
 +
 +  /** Tell primary we (replica) just started, so primary can tell us to warm any already warming merges.  This lets us keep low nrt refresh
 +   *  time for the first nrt sync after we started. */
 +  protected abstract void sendNewReplica() throws IOException;
 +
 +  /** Call this to notify this replica node that a new NRT infos is available on the primary.
 +   *  We kick off a job (runs in the background) to copy files across, and open a new reader once that's done. */
 +  public synchronized CopyJob newNRTPoint(long version) throws IOException {
 +
 +    if (isClosed()) {
 +      throw new AlreadyClosedException("this replica is closed: state=" + state);
 +    }
 +
 +    // Caller should not "publish" us until we have finished .start():
 +    assert mgr != null;
 +
 +    if ("idle".equals(state)) {
 +      state = "syncing";
 +    }
 +
 +    long curVersion = getCurrentSearchingVersion();
 +
 +    message("top: start sync sis.version=" + version);
 +
 +    if (version == curVersion) {
 +      // Caller releases the CopyState:
 +      message("top: new NRT point has same version as current; skipping");
 +      return null;
 +    }
 +
 +    if (version < curVersion) {
 +      // This can happen, if two syncs happen close together, and due to thread scheduling, the incoming older version runs after the newer version
 +      message("top: new NRT point (version=" + version + ") is older than current (version=" + version + "); skipping");
 +      return null;
 +    }
 +
 +    final long startNS = System.nanoTime();
 +
 +    message("top: newNRTPoint");
 +    CopyJob job = null;
 +    try {
 +      job = newCopyJob("NRT point sync version=" + version,
 +                       null,
 +                       lastFileMetaData,
 +                       true,
 +                       new CopyJob.OnceDone() {
 +                         @Override
 +                         public void run(CopyJob job) {
 +                           try {
 +                             finishNRTCopy(job, startNS);
 +                           } catch (IOException ioe) {
 +                             throw new RuntimeException(ioe);
 +                           }
 +                         }
 +                       });
 +    } catch (NodeCommunicationException nce) {
 +      // E.g. primary could crash/close when we are asking it for the copy state:
 +      message("top: ignoring communication exception creating CopyJob: " + nce);
 +      //nce.printStackTrace(System.out);
 +      if (state.equals("syncing")) {
 +        state = "idle";
 +      }
 +      return null;
 +    }
 +
 +    Collection<String> newNRTFiles = job.getFileNames();
 +    long newPrimaryGen = job.getCopyState().primaryGen;
 +    maybeNewPrimary(newPrimaryGen);
 +
 +    message("top: newNRTPoint: job files=" + newNRTFiles);
 +
 +    if (curNRTCopy != null) {
 +      job.transferAndCancel(curNRTCopy);
 +      assert curNRTCopy.getFailed();
 +    }
 +
 +    curNRTCopy = job;
 +
 +    for(String fileName : curNRTCopy.getFileNamesToCopy()) {
 +      assert lastCommitFiles.contains(fileName) == false: "fileName=" + fileName + " is in lastCommitFiles and is being copied?";
 +      synchronized (mergeCopyJobs) {
 +        for (CopyJob mergeJob : mergeCopyJobs) {
 +          if (mergeJob.getFileNames().contains(fileName)) {
 +            // TODO: we could maybe transferAndCancel here?  except CopyJob can't transferAndCancel more than one currently
 +            message("top: now cancel merge copy job=" + mergeJob + ": file " + fileName + " is now being copied via NRT point");
 +            mergeJob.cancel("newNRTPoint is copying over the same file", null);
 +          }
 +        }
 +      }
 +    }
 +
 +    try {
 +      job.start();
 +    } catch (NodeCommunicationException nce) {
 +      // E.g. primary could crash/close when we are asking it for the copy state:
 +      message("top: ignoring exception starting CopyJob: " + nce);
 +      nce.printStackTrace(System.out);
 +      if (state.equals("syncing")) {
 +        state = "idle";
 +      }
 +      return null;
 +    }
 +
 +    // Runs in the background jobs thread, maybe slowly/throttled, and calls finishSync once it's done:
 +    launch(curNRTCopy);
 +    return curNRTCopy;
 +  }
 +
 +  public synchronized boolean isCopying() {
 +    return curNRTCopy != null;
 +  }
 +
 +  @Override
 +  public boolean isClosed() {
 +    return "closed".equals(state) || "closing".equals(state) || "crashing".equals(state) || "crashed".equals(state);
 +  }
 +
 +  @Override
 +  public void close() throws IOException {
 +    message("top: now close");
 +
 +    synchronized (this) {
 +      state = "closing";
 +      if (curNRTCopy != null) {
 +        curNRTCopy.cancel("closing", null);
 +      }
 +    }
 +
 +    synchronized (this) {
 +      message("top: close mgr");
 +      mgr.close();
 +
 +      message("top: decRef lastNRTFiles=" + lastNRTFiles);
 +      deleter.decRef(lastNRTFiles);
 +      lastNRTFiles.clear();
 +
 +      // NOTE: do not decRef these!
 +      lastCommitFiles.clear();
 +
 +      message("top: delete if no ref pendingMergeFiles=" + pendingMergeFiles);
 +      for(String fileName : pendingMergeFiles) {
 +        deleter.deleteIfNoRef(fileName);
 +      }
 +      pendingMergeFiles.clear();
 +    
 +      message("top: close dir");
 +      IOUtils.close(writeFileLock, dir);
 +    }
 +    message("top: done close");
 +    state = "closed";
 +  }
 +
 +  /** Called when the primary changed */
 +  protected synchronized void maybeNewPrimary(long newPrimaryGen) {
 +    if (newPrimaryGen != lastPrimaryGen) {
 +      message("top: now change lastPrimaryGen from " + lastPrimaryGen + " to " + newPrimaryGen + " pendingMergeFiles=" + pendingMergeFiles);
 +      assert newPrimaryGen > lastPrimaryGen: "newPrimaryGen=" + newPrimaryGen + " vs lastPrimaryGen=" + lastPrimaryGen;
 +      lastPrimaryGen = newPrimaryGen;
 +      pendingMergeFiles.clear();
 +    } else {
 +      message("top: keep current lastPrimaryGen=" + lastPrimaryGen);
 +    }
 +  }
 +
 +  protected synchronized CopyJob launchPreCopyMerge(AtomicBoolean finished, long newPrimaryGen, Map<String,FileMetaData> files) throws IOException {
 +
 +    CopyJob job;
 +
 +    maybeNewPrimary(newPrimaryGen);
 +    final long primaryGenStart = lastPrimaryGen;
 +    Set<String> fileNames = files.keySet();
 +    message("now pre-copy warm merge files=" + fileNames + " primaryGen=" + newPrimaryGen);
 +
 +    for(String fileName : fileNames) {
 +      assert pendingMergeFiles.contains(fileName) == false: "file \"" + fileName + "\" is already being warmed!";
 +      assert lastNRTFiles.contains(fileName) == false: "file \"" + fileName + "\" is already NRT visible!";
 +    }
 +
 +    job = newCopyJob("warm merge on " + name() + " filesNames=" + fileNames,
 +                     files, null, false,
 +                     new CopyJob.OnceDone() {
 +
 +                       @Override
 +                       public void run(CopyJob job) throws IOException {
 +                         // Signals that this replica has finished
 +                         mergeCopyJobs.remove(job);
 +                         message("done warming merge " + fileNames + " failed?=" + job.getFailed());
 +                         synchronized(this) {
 +                           if (job.getFailed() == false) {
 +                             if (lastPrimaryGen != primaryGenStart) {
 +                               message("merge pre copy finished but primary has changed; cancelling job files=" + fileNames);
 +                               job.cancel("primary changed during merge copy", null);
 +                             } else {
 +                               boolean abort = false;
 +                               for (String fileName : fileNames) {
 +                                 if (lastNRTFiles.contains(fileName)) {
 +                                   message("abort merge finish: file " + fileName + " is referenced by last NRT point");
 +                                   abort = true;
 +                                 }
 +                                 if (lastCommitFiles.contains(fileName)) {
 +                                   message("abort merge finish: file " + fileName + " is referenced by last commit point");
 +                                   abort = true;
 +                                 }
 +                               }
 +                               if (abort) {
 +                                 // Even though in newNRTPoint we have similar logic, which cancels any merge copy jobs if an NRT point
 +                                 // shows up referencing the files we are warming (because primary got impatient and gave up on us), we also
 +                                 // need it here in case replica is way far behind and fails to even receive the merge pre-copy request
 +                                 // until after the newNRTPoint referenced those files:
 +                                 job.cancel("merged segment was separately copied via NRT point", null);
 +                               } else {
 +                                 job.finish();
 +                                 message("merge pre copy finished files=" + fileNames);
 +                                 for(String fileName : fileNames) {
 +                                   assert pendingMergeFiles.contains(fileName) == false : "file \"" + fileName + "\" is already in pendingMergeFiles";
 +                                   message("add file " + fileName + " to pendingMergeFiles");
 +                                   pendingMergeFiles.add(fileName);
 +                                 }
 +                               }
 +                             }
 +                           } else {
 +                             message("merge copy finished with failure");
 +                           }
 +                         }
 +                         finished.set(true);
 +                       }
 +                     });
 +
 +    job.start();
 +
 +    // When warming a merge we better not already have any of these files copied!
 +    assert job.getFileNamesToCopy().size() == files.size();
 +
 +    mergeCopyJobs.add(job);
 +    launch(job);
 +
 +    return job;
 +  }
 +
 +  public IndexOutput createTempOutput(String prefix, String suffix, IOContext ioContext) throws IOException {
 +    return dir.createTempOutput(prefix, suffix, IOContext.DEFAULT);
 +  }
 +
 +  /** Compares incoming per-file identity (id, checksum, header, footer) versus what we have locally and returns the subset of the incoming
 +   *  files that need copying */
 +  public List<Map.Entry<String,FileMetaData>> getFilesToCopy(Map<String,FileMetaData> files) throws IOException {
 +
 +    boolean doCopyCommitFiles = false;
 +    List<Map.Entry<String,FileMetaData>> toCopy = new ArrayList<>();
 +    for (Map.Entry<String,FileMetaData> ent : files.entrySet()) {
 +      String fileName = ent.getKey();
 +      FileMetaData fileMetaData = ent.getValue();
 +      if (fileIsIdentical(fileName, fileMetaData) == false) {
 +        toCopy.add(ent);
 +      }
 +    }
 +
 +    return toCopy;
 +  }
 +
 +  /** Carefully determine if the file on the primary, identified by its {@code String fileName} along with the {@link FileMetaData}
 +   * "summarizing" its contents, is precisely the same file that we have locally.  If the file does not exist locally, or if its its header
 +   * (inclues the segment id), length, footer (including checksum) differ, then this returns false, else true. */
 +  private boolean fileIsIdentical(String fileName, FileMetaData srcMetaData) throws IOException {
 +
 +    if (deleter.isPending(fileName)) {
 +      // This was a file we had wanted to delete yet a virus checker prevented us, and now we need to overwrite it.
 +      // Such files are in an unknown state, and even if their header and footer and length all
 +      // match, since they may not have been fsync'd by the previous node instance on this directory,
 +      // they could in theory have corruption internally.  So we always force ourselves to copy them here:
 +      if (Node.VERBOSE_FILES) {
 +        message("file " + fileName + ": will copy [we had wanted to delete this file on init, but failed]");
 +      }
 +      return false;
 +    }
 +
 +    FileMetaData destMetaData = readLocalFileMetaData(fileName);
 +    if (destMetaData == null) {
 +      // Something went wrong in reading the file (it's corrupt, truncated, does not exist, etc.):
 +      return false;
 +    }
 +
 +    if (Arrays.equals(destMetaData.header, srcMetaData.header) == false ||
 +        Arrays.equals(destMetaData.footer, srcMetaData.footer) == false) {
 +      // Segment name was reused!  This is rare but possible and otherwise devastating:
 +      if (Node.VERBOSE_FILES) {
 +        message("file " + fileName + ": will copy [header/footer is different]");
 +      }
 +      return false;
 +    } else {
 +      return true;
 +    }
 +  }
 +
 +  private ConcurrentMap<String,Boolean> copying = new ConcurrentHashMap<>();
 +
 +  // Used only to catch bugs, ensuring a given file name is only ever being copied bye one job:
 +  public void startCopyFile(String name) {
 +    if (copying.putIfAbsent(name, Boolean.TRUE) != null) {
 +      throw new IllegalStateException("file " + name + " is being copied in two places!");
 +    }
 +  }
 +
 +  public void finishCopyFile(String name) {
 +    if (copying.remove(name) == null) {
 +      throw new IllegalStateException("file " + name + " was not actually being copied?");
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
----------------------------------------------------------------------
diff --cc lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
index 15e9c8c,0000000..cd98b48
mode 100644,000000..100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
@@@ -1,876 -1,0 +1,876 @@@
 +package org.apache.lucene.replicator.nrt;
 +
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +import java.io.BufferedReader;
 +import java.io.IOException;
 +import java.io.InputStreamReader;
 +import java.io.UnsupportedEncodingException;
 +import java.nio.file.Path;
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.Locale;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.regex.Pattern;
 +
 +import org.apache.lucene.document.Document;
 +import org.apache.lucene.util.IOUtils;
 +import org.apache.lucene.util.LineFileDocs;
 +import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
 +import org.apache.lucene.util.LuceneTestCase.SuppressSysoutChecks;
 +import org.apache.lucene.util.LuceneTestCase;
 +import org.apache.lucene.util.TestUtil;
 +
 +import com.carrotsearch.randomizedtesting.SeedUtils;
 +
 +// nocommit make some explicit failure tests
 +
 +// MockRandom's .sd file has no index header/footer:
 +@SuppressCodecs({"MockRandom", "Memory", "Direct", "SimpleText"})
 +@SuppressSysoutChecks(bugUrl = "Stuff gets printed, important stuff for debugging a failure")
 +public class TestNRTReplication extends LuceneTestCase {
 +
 +  /** cwd where we start each child (server) node */
 +  private Path childTempDir;
 +
 +  final AtomicLong nodeStartCounter = new AtomicLong();
 +  private long nextPrimaryGen;
 +  private long lastPrimaryGen;
 +  LineFileDocs docs;
 +
 +  /** Launches a child "server" (separate JVM), which is either primary or replica node */
 +  private NodeProcess startNode(int primaryTCPPort, final int id, Path indexPath, long forcePrimaryVersion, boolean willCrash) throws IOException {
 +    List<String> cmd = new ArrayList<>();
 +
 +    cmd.add(System.getProperty("java.home") 
 +        + System.getProperty("file.separator")
 +        + "bin"
 +        + System.getProperty("file.separator")
 +        + "java");
 +    cmd.add("-Xmx512m");
 +
 +    long myPrimaryGen;
 +    if (primaryTCPPort != -1) {
 +      // I am a replica
 +      cmd.add("-Dtests.nrtreplication.primaryTCPPort=" + primaryTCPPort);
 +      myPrimaryGen = lastPrimaryGen;
 +    } else {
 +      myPrimaryGen = nextPrimaryGen++;
 +      lastPrimaryGen = myPrimaryGen;
 +    }
 +    cmd.add("-Dtests.nrtreplication.primaryGen=" + myPrimaryGen);
 +    cmd.add("-Dtests.nrtreplication.closeorcrash=false");
 +
 +    cmd.add("-Dtests.nrtreplication.node=true");
 +    cmd.add("-Dtests.nrtreplication.nodeid=" + id);
 +    cmd.add("-Dtests.nrtreplication.startNS=" + Node.globalStartNS);
 +    cmd.add("-Dtests.nrtreplication.indexpath=" + indexPath);
 +    cmd.add("-Dtests.nrtreplication.checkonclose=true");
 +
 +    if (primaryTCPPort == -1) {
 +      // We are the primary node
 +      cmd.add("-Dtests.nrtreplication.isPrimary=true");
 +      cmd.add("-Dtests.nrtreplication.forcePrimaryVersion=" + forcePrimaryVersion);
 +    }
 +
 +    // Mixin our own counter because this is called from a fresh thread which means the seed otherwise isn't changing each time we spawn a
 +    // new node:
 +    long seed = random().nextLong() * nodeStartCounter.incrementAndGet();
 +
 +    cmd.add("-Dtests.seed=" + SeedUtils.formatSeed(seed));
 +    cmd.add("-ea");
 +    cmd.add("-cp");
 +    cmd.add(System.getProperty("java.class.path"));
 +    cmd.add("org.junit.runner.JUnitCore");
 +    cmd.add(getClass().getName().replace(getClass().getSimpleName(), "SimpleServer"));
 +
 +    message("child process command: " + cmd);
 +    ProcessBuilder pb = new ProcessBuilder(cmd);
 +    pb.redirectErrorStream(true);
 +
 +    // Important, so that the scary looking hs_err_<pid>.log appear under our test temp dir:
 +    pb.directory(childTempDir.toFile());
 +
 +    Process p = pb.start();
 +
 +    BufferedReader r;
 +    try {
 +      r = new BufferedReader(new InputStreamReader(p.getInputStream(), IOUtils.UTF_8));
 +    } catch (UnsupportedEncodingException uee) {
 +      throw new RuntimeException(uee);
 +    }
 +
 +    int tcpPort = -1;
 +    long initCommitVersion = -1;
 +    long initInfosVersion = -1;
 +    Pattern logTimeStart = Pattern.compile("^[0-9\\.]+s .*");
 +    boolean sawExistingSegmentsFile = false;
 +
 +    while (true) {
 +      String l = r.readLine();
 +      if (l == null) {
 +        message("top: node=" + id + " failed to start");
 +        try {
 +          p.waitFor();
 +        } catch (InterruptedException ie) {
 +          throw new RuntimeException(ie);
 +        }
 +        message("exit value=" + p.exitValue());
 +        message("top: now fail test replica R" + id + " failed to start");
 +        throw new RuntimeException("replica R" + id + " failed to start");
 +      }
 +
 +      if (logTimeStart.matcher(l).matches()) {
 +        // Already a well-formed log output:
 +        System.out.println(l);
 +      } else {
 +        message(l);
 +      }
 +
 +      if (l.startsWith("PORT: ")) {
 +        tcpPort = Integer.parseInt(l.substring(6).trim());
 +      } else if (l.startsWith("COMMIT VERSION: ")) {
 +        initCommitVersion = Integer.parseInt(l.substring(16).trim());
 +      } else if (l.startsWith("INFOS VERSION: ")) {
 +        initInfosVersion = Integer.parseInt(l.substring(15).trim());
 +      } else if (l.contains("will crash after")) {
 +        willCrash = true;
 +      } else if (l.startsWith("NODE STARTED")) {
 +        break;
 +      } else if (l.contains("replica cannot start: existing segments file=")) {
 +        sawExistingSegmentsFile = true;
 +      }
 +    }
 +
 +    final boolean finalWillCrash = willCrash;
 +
 +    // Baby sits the child process, pulling its stdout and printing to our stdout:
 +    AtomicBoolean nodeClosing = new AtomicBoolean();
 +    Thread pumper = ThreadPumper.start(
 +                                       new Runnable() {
 +                                         @Override
 +                                         public void run() {
 +                                           message("now wait for process " + p);
 +                                           try {
 +                                             p.waitFor();
 +                                           } catch (Throwable t) {
 +                                             throw new RuntimeException(t);
 +                                           }
 +
 +                                           message("done wait for process " + p);
 +                                           int exitValue = p.exitValue();
 +                                           message("exit value=" + exitValue + " willCrash=" + finalWillCrash);
 +                                           if (exitValue != 0 && finalWillCrash == false) {
 +                                             // should fail test
 +                                             throw new RuntimeException("node " + id + " process had unexpected non-zero exit status=" + exitValue);
 +                                           }
 +                                         }
 +                                       }, r, System.out, null, nodeClosing);
 +    pumper.setName("pump" + id);
 +
 +    message("top: node=" + id + " started at tcpPort=" + tcpPort + " initCommitVersion=" + initCommitVersion + " initInfosVersion=" + initInfosVersion);
 +    return new NodeProcess(p, id, tcpPort, pumper, primaryTCPPort == -1, initCommitVersion, initInfosVersion, nodeClosing);
 +  }
 +
 +  @Override
 +  public void setUp() throws Exception {
 +    super.setUp();
 +    Node.globalStartNS = System.nanoTime();
 +    childTempDir = createTempDir("child");
 +    docs = new LineFileDocs(random());
 +  }
 +
 +  @Override
 +  public void tearDown() throws Exception {
 +    super.tearDown();
 +    docs.close();
 +  }
 +
 +  public void testReplicateDeleteAllDocuments() throws Exception {
 +
 +    Path primaryPath = createTempDir("primary");
 +    NodeProcess primary = startNode(-1, 0, primaryPath, -1, false);
 +
 +    Path replicaPath = createTempDir("replica");
 +    NodeProcess replica = startNode(primary.tcpPort, 1, replicaPath, -1, false);
 +
 +    // Tell primary current replicas:
 +    sendReplicasToPrimary(primary, replica);
 +
 +    // Index 10 docs into primary:
 +    LineFileDocs docs = new LineFileDocs(random());
 +    Connection primaryC = new Connection(primary.tcpPort);
 +    primaryC.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
 +    for(int i=0;i<10;i++) {
 +      Document doc = docs.nextDoc();
 +      primary.addOrUpdateDocument(primaryC, doc, false);
 +    }
 +
 +    // Nothing in replica index yet
 +    assertVersionAndHits(replica, 0, 0);
 +
 +    // Refresh primary, which also pushes to replica:
 +    long primaryVersion1 = primary.flush(0);
 +    assertTrue(primaryVersion1 > 0);
 +
 +    // Wait for replica to show the change
 +    waitForVersionAndHits(replica, primaryVersion1, 10);
 +
 +    // Delete all docs from primary
 +    if (random().nextBoolean()) {
 +      // Inefficiently:
 +      for(int id=0;id<10;id++) {
 +        primary.deleteDocument(primaryC, Integer.toString(id));
 +      }
 +    } else {
 +      // Efficiently:
 +      primary.deleteAllDocuments(primaryC);
 +    }
 +
 +    // Replica still shows 10 docs:
 +    assertVersionAndHits(replica, primaryVersion1, 10);
 +    
 +    // Refresh primary, which also pushes to replica:
 +    long primaryVersion2 = primary.flush(0);
 +    assertTrue(primaryVersion2 > primaryVersion1);
 +
 +    // Wait for replica to show the change
 +    waitForVersionAndHits(replica, primaryVersion2, 0);
 +
 +    // Index 10 docs again:
 +    for(int i=0;i<10;i++) {
 +      Document doc = docs.nextDoc();
 +      primary.addOrUpdateDocument(primaryC, doc, false);
 +    }
 +
 +    // Refresh primary, which also pushes to replica:
 +    long primaryVersion3 = primary.flush(0);
 +    assertTrue(primaryVersion3 > primaryVersion2);
 +
 +    // Wait for replica to show the change
 +    waitForVersionAndHits(replica, primaryVersion3, 10);
 +
 +    primaryC.close();
 +
 +    replica.close();
 +    primary.close();
 +  }
 +
 +  public void testReplicateForceMerge() throws Exception {
 +
 +    Path primaryPath = createTempDir("primary");
 +    NodeProcess primary = startNode(-1, 0, primaryPath, -1, false);
 +
 +    Path replicaPath = createTempDir("replica");
 +    NodeProcess replica = startNode(primary.tcpPort, 1, replicaPath, -1, false);
 +
 +    sendReplicasToPrimary(primary, replica);
 +
 +    // Index 10 docs into primary:
 +    LineFileDocs docs = new LineFileDocs(random());
 +    Connection primaryC = new Connection(primary.tcpPort);
 +    primaryC.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
 +    for(int i=0;i<10;i++) {
 +      Document doc = docs.nextDoc();
 +      primary.addOrUpdateDocument(primaryC, doc, false);
 +    }
 +
 +    // Refresh primary, which also pushes to replica:
 +    long primaryVersion1 = primary.flush(0);
 +    assertTrue(primaryVersion1 > 0);
 +
 +    // Index 10 more docs into primary:
 +    for(int i=0;i<10;i++) {
 +      Document doc = docs.nextDoc();
 +      primary.addOrUpdateDocument(primaryC, doc, false);
 +    }
 +
 +    // Refresh primary, which also pushes to replica:
 +    long primaryVersion2 = primary.flush(0);
 +    assertTrue(primaryVersion2 > primaryVersion1);
 +
 +    primary.forceMerge(primaryC);
 +
 +    // Refresh primary, which also pushes to replica:
 +    long primaryVersion3 = primary.flush(0);
 +    assertTrue(primaryVersion3 > primaryVersion2);
 +
 +    // Wait for replica to show the change
 +    waitForVersionAndHits(replica, primaryVersion3, 20);
 +
 +    primaryC.close();
 +
 +    replica.close();
 +    primary.close();
 +  }
 +
 +  // Start up, index 10 docs, replicate, but crash and restart the replica without committing it:
 +  public void testReplicaCrashNoCommit() throws Exception {
 +
 +    Path primaryPath = createTempDir("primary");
 +    NodeProcess primary = startNode(-1, 0, primaryPath, -1, false);
 +
 +    Path replicaPath = createTempDir("replica");
 +    NodeProcess replica = startNode(primary.tcpPort, 1, replicaPath, -1, true);
 +
 +    sendReplicasToPrimary(primary, replica);
 +
 +    // Index 10 docs into primary:
 +    LineFileDocs docs = new LineFileDocs(random());
 +    try (Connection c = new Connection(primary.tcpPort)) {
 +      c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
 +      for(int i=0;i<10;i++) {
 +        Document doc = docs.nextDoc();
 +        primary.addOrUpdateDocument(c, doc, false);
 +      }
 +    }
 +
 +    // Refresh primary, which also pushes to replica:
 +    long primaryVersion1 = primary.flush(0);
 +    assertTrue(primaryVersion1 > 0);
 +
 +    // Wait for replica to sync up:
 +    waitForVersionAndHits(replica, primaryVersion1, 10);
 +
 +    // Crash replica:
 +    replica.crash();
 +
 +    // Restart replica:
 +    replica = startNode(primary.tcpPort, 1, replicaPath, -1, false);
 +
 +    // On startup the replica searches the last commit (empty here):
 +    assertVersionAndHits(replica, 0, 0);
 +
 +    // Ask replica to sync:
 +    replica.newNRTPoint(primaryVersion1, primary.tcpPort);
 +    waitForVersionAndHits(replica, primaryVersion1, 10);
 +
 +    replica.close();
 +    primary.close();
 +  }
 +
 +  // Start up, index 10 docs, replicate, commit, crash and restart the replica
 +  public void testReplicaCrashWithCommit() throws Exception {
 +
 +    Path primaryPath = createTempDir("primary");
 +    NodeProcess primary = startNode(-1, 0, primaryPath, -1, false);
 +
 +    Path replicaPath = createTempDir("replica");
 +    NodeProcess replica = startNode(primary.tcpPort, 1, replicaPath, -1, true);
 +
 +    sendReplicasToPrimary(primary, replica);
 +
 +    // Index 10 docs into primary:
 +    LineFileDocs docs = new LineFileDocs(random());
 +    try (Connection c = new Connection(primary.tcpPort)) {
 +      c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
 +      for(int i=0;i<10;i++) {
 +        Document doc = docs.nextDoc();
 +        primary.addOrUpdateDocument(c, doc, false);
 +      }
 +    }
 +
 +    // Refresh primary, which also pushes to replica:
 +    long primaryVersion1 = primary.flush(0);
 +    assertTrue(primaryVersion1 > 0);
 +
 +    // Wait for replica to sync up:
 +    waitForVersionAndHits(replica, primaryVersion1, 10);
 +
 +    // Commit and crash replica:
 +    replica.commit();
 +    replica.crash();
 +
 +    // Restart replica:
 +    replica = startNode(primary.tcpPort, 1, replicaPath, -1, false);
 +
 +    // On startup the replica searches the last commit:
 +    assertVersionAndHits(replica, primaryVersion1, 10);
 +
 +    replica.close();
 +    primary.close();
 +  }
 +
 +  // Start up, index 10 docs, replicate, commit, crash, index more docs, replicate, then restart the replica
 +  public void testIndexingWhileReplicaIsDown() throws Exception {
 +
 +    Path primaryPath = createTempDir("primary");
 +    NodeProcess primary = startNode(-1, 0, primaryPath, -1, false);
 +
 +    Path replicaPath = createTempDir("replica");
 +    NodeProcess replica = startNode(primary.tcpPort, 1, replicaPath, -1, true);
 +
 +    sendReplicasToPrimary(primary, replica);
 +
 +    // Index 10 docs into primary:
 +    LineFileDocs docs = new LineFileDocs(random());
 +    try (Connection c = new Connection(primary.tcpPort)) {
 +      c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
 +      for(int i=0;i<10;i++) {
 +        Document doc = docs.nextDoc();
 +        primary.addOrUpdateDocument(c, doc, false);
 +      }
 +    }
 +
 +    // Refresh primary, which also pushes to replica:
 +    long primaryVersion1 = primary.flush(0);
 +    assertTrue(primaryVersion1 > 0);
 +
 +    // Wait for replica to sync up:
 +    waitForVersionAndHits(replica, primaryVersion1, 10);
 +
 +    // Commit and crash replica:
 +    replica.commit();
 +    replica.crash();
 +
 +    sendReplicasToPrimary(primary);
 +
 +    // Index 10 more docs, while replica is down
 +    try (Connection c = new Connection(primary.tcpPort)) {
 +      c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
 +      for(int i=0;i<10;i++) {
 +        Document doc = docs.nextDoc();
 +        primary.addOrUpdateDocument(c, doc, false);
 +      }
 +    }
 +
 +    // And flush:
 +    long primaryVersion2 = primary.flush(0);
 +    assertTrue(primaryVersion2 > primaryVersion1);
 +
 +    // Now restart replica:
 +    replica = startNode(primary.tcpPort, 1, replicaPath, -1, false);
 +
 +    sendReplicasToPrimary(primary, replica);
 +
 +    // On startup the replica still searches its last commit:
 +    assertVersionAndHits(replica, primaryVersion1, 10);
 +
 +    // Now ask replica to sync:
 +    replica.newNRTPoint(primaryVersion2, primary.tcpPort);
 +
 +    waitForVersionAndHits(replica, primaryVersion2, 20);
 +
 +    replica.close();
 +    primary.close();
 +  }
 + 
 +  // Crash primary and promote a replica
 +  public void testCrashPrimary1() throws Exception {
 +
 +    Path path1 = createTempDir("1");
 +    NodeProcess primary = startNode(-1, 0, path1, -1, true);
 +
 +    Path path2 = createTempDir("2");
 +    NodeProcess replica = startNode(primary.tcpPort, 1, path2, -1, true);
 +
 +    sendReplicasToPrimary(primary, replica);
 +
 +    // Index 10 docs into primary:
 +    LineFileDocs docs = new LineFileDocs(random());
 +    try (Connection c = new Connection(primary.tcpPort)) {
 +      c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
 +      for(int i=0;i<10;i++) {
 +        Document doc = docs.nextDoc();
 +        primary.addOrUpdateDocument(c, doc, false);
 +      }
 +    }
 +
 +    // Refresh primary, which also pushes to replica:
 +    long primaryVersion1 = primary.flush(0);
 +    assertTrue(primaryVersion1 > 0);
 +
 +    // Wait for replica to sync up:
 +    waitForVersionAndHits(replica, primaryVersion1, 10);
 +
 +    // Crash primary:
 +    primary.crash();
 +
 +    // Promote replica:
 +    replica.commit();
 +    replica.close();
 +    
 +    primary = startNode(-1, 1, path2, -1, false);
 +
 +    // Should still see 10 docs:
 +    assertVersionAndHits(primary, primaryVersion1, 10);
 +
 +    primary.close();
 +  }
 +
 +  // Crash primary and then restart it
 +  public void testCrashPrimary2() throws Exception {
 +
 +    Path path1 = createTempDir("1");
 +    NodeProcess primary = startNode(-1, 0, path1, -1, true);
 +
 +    Path path2 = createTempDir("2");
 +    NodeProcess replica = startNode(primary.tcpPort, 1, path2, -1, true);
 +
 +    sendReplicasToPrimary(primary, replica);
 +
 +    // Index 10 docs into primary:
 +    LineFileDocs docs = new LineFileDocs(random());
 +    try (Connection c = new Connection(primary.tcpPort)) {
 +      c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
 +      for(int i=0;i<10;i++) {
 +        Document doc = docs.nextDoc();
 +        primary.addOrUpdateDocument(c, doc, false);
 +      }
 +    }
 +
 +    // Refresh primary, which also pushes to replica:
 +    long primaryVersion1 = primary.flush(0);
 +    assertTrue(primaryVersion1 > 0);
 +
 +    // Wait for replica to sync up:
 +    waitForVersionAndHits(replica, primaryVersion1, 10);
 +
 +    primary.commit();
 +
 +    // Index 10 docs, but crash before replicating or committing:
 +    try (Connection c = new Connection(primary.tcpPort)) {
 +      c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
 +      for(int i=0;i<10;i++) {
 +        Document doc = docs.nextDoc();
 +        primary.addOrUpdateDocument(c, doc, false);
 +      }
 +    }
 +
 +    // Crash primary:
 +    primary.crash();
 +
 +    // Restart it:
 +    primary = startNode(-1, 0, path1, -1, true);
 +
 +    sendReplicasToPrimary(primary, replica);
 +
 +    // Index 10 more docs
 +    try (Connection c = new Connection(primary.tcpPort)) {
 +      c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
 +      for(int i=0;i<10;i++) {
 +        Document doc = docs.nextDoc();
 +        primary.addOrUpdateDocument(c, doc, false);
 +      }
 +    }
 +
 +    long primaryVersion2 = primary.flush(0);
 +    assertTrue(primaryVersion2 > primaryVersion1);
 +
 +    // Wait for replica to sync up:
 +    waitForVersionAndHits(replica, primaryVersion2, 20);
 +
 +    primary.close();
 +    replica.close();
 +  }
 +
 +  // Crash primary and then restart it, while a replica node is down, then bring replica node back up and make sure it properly "unforks" itself
 +  public void testCrashPrimary3() throws Exception {
 +
 +    Path path1 = createTempDir("1");
 +    NodeProcess primary = startNode(-1, 0, path1, -1, true);
 +
 +    Path path2 = createTempDir("2");
 +    NodeProcess replica = startNode(primary.tcpPort, 1, path2, -1, true);
 +
 +    sendReplicasToPrimary(primary, replica);
 +
 +    // Index 10 docs into primary:
 +    try (Connection c = new Connection(primary.tcpPort)) {
 +      c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
 +      for(int i=0;i<10;i++) {
 +        Document doc = docs.nextDoc();
 +        primary.addOrUpdateDocument(c, doc, false);
 +      }
 +    }
 +
 +    // Refresh primary, which also pushes to replica:
 +    long primaryVersion1 = primary.flush(0);
 +    assertTrue(primaryVersion1 > 0);
 +
 +    // Wait for replica to sync up:
 +    waitForVersionAndHits(replica, primaryVersion1, 10);
 +
 +    replica.commit();
 +
 +    replica.close();
 +    primary.crash();
 +
 +    // At this point replica is "in the future": it has 10 docs committed, but the primary crashed before committing so it has 0 docs
 +
 +    // Restart primary:
 +    primary = startNode(-1, 0, path1, -1, true);
 +
 +    // Index 20 docs into primary:
 +    try (Connection c = new Connection(primary.tcpPort)) {
 +      c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
 +      for(int i=0;i<20;i++) {
 +        Document doc = docs.nextDoc();
 +        primary.addOrUpdateDocument(c, doc, false);
 +      }
 +    }
 +
 +    // Flush primary, but there are no replicas to sync to:
 +    long primaryVersion2 = primary.flush(0);
 +
 +    // Now restart replica, which on init should detect on a "lost branch" because its 10 docs that were committed came from a different
 +    // primary node:
 +    replica = startNode(primary.tcpPort, 1, path2, -1, true);
 +
 +    assertVersionAndHits(replica, primaryVersion2, 20);
 +
 +    primary.close();
 +    replica.close();
 +  }
 +
 +  public void testCrashPrimaryWhileCopying() throws Exception {
 +
 +    Path path1 = createTempDir("1");
 +    NodeProcess primary = startNode(-1, 0, path1, -1, true);
 +
 +    Path path2 = createTempDir("2");
 +    NodeProcess replica = startNode(primary.tcpPort, 1, path2, -1, true);
 +
 +    sendReplicasToPrimary(primary, replica);
 +
 +    // Index 100 docs into primary:
 +    LineFileDocs docs = new LineFileDocs(random());
 +    try (Connection c = new Connection(primary.tcpPort)) {
 +      c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
 +      for(int i=0;i<100;i++) {
 +        Document doc = docs.nextDoc();
 +        primary.addOrUpdateDocument(c, doc, false);
 +      }
 +    }
 +
 +    // Refresh primary, which also pushes (async) to replica:
 +    long primaryVersion1 = primary.flush(0);
 +    assertTrue(primaryVersion1 > 0);
 +
 +    Thread.sleep(TestUtil.nextInt(random(), 1, 30));
 +
 +    // Crash primary, likely/hopefully while replica is still copying
 +    primary.crash();
 +
 +    // Could see either 100 docs (replica finished before crash) or 0 docs:
 +    try (Connection c = new Connection(replica.tcpPort)) {
 +      c.out.writeByte(SimplePrimaryNode.CMD_SEARCH_ALL);
 +      c.flush();
 +      long version = c.in.readVLong();
 +      int hitCount = c.in.readVInt();
 +      if (version == 0) {
 +        assertEquals(0, hitCount);
 +      } else {
 +        assertEquals(primaryVersion1, version);
 +        assertEquals(100, hitCount);
 +      }
 +    }
 +
 +    primary.close();
 +    replica.close();
 +  }
 +
 +  public void testCrashReplica() throws Exception {
 +
 +    Path path1 = createTempDir("1");
 +    NodeProcess primary = startNode(-1, 0, path1, -1, true);
 +
 +    Path path2 = createTempDir("2");
 +    NodeProcess replica = startNode(primary.tcpPort, 1, path2, -1, true);
 +
 +    sendReplicasToPrimary(primary, replica);
 +
 +    // Index 10 docs into primary:
 +    LineFileDocs docs = new LineFileDocs(random());
 +    try (Connection c = new Connection(primary.tcpPort)) {
 +      c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
 +      for(int i=0;i<10;i++) {
 +        Document doc = docs.nextDoc();
 +        primary.addOrUpdateDocument(c, doc, false);
 +      }
 +    }
 +
 +    // Refresh primary, which also pushes to replica:
 +    long primaryVersion1 = primary.flush(0);
 +    assertTrue(primaryVersion1 > 0);
 +
 +    // Wait for replica to sync up:
 +    waitForVersionAndHits(replica, primaryVersion1, 10);
 +
 +    // Crash replica
 +    replica.crash();
 +
 +    sendReplicasToPrimary(primary);
 +
 +    // Lots of new flushes while replica is down:
 +    long primaryVersion2 = 0;
 +    for(int iter=0;iter<10;iter++) {
 +      // Index 10 docs into primary:
 +      try (Connection c = new Connection(primary.tcpPort)) {
 +        c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
 +        for(int i=0;i<10;i++) {
 +          Document doc = docs.nextDoc();
 +          primary.addOrUpdateDocument(c, doc, false);
 +        }
 +      }
 +      primaryVersion2 = primary.flush(0);
 +    }
 +
 +    // Start up replica again:
 +    replica = startNode(primary.tcpPort, 1, path2, -1, true);
 +
 +    sendReplicasToPrimary(primary, replica);
 +
 +    // Now ask replica to sync:
 +    replica.newNRTPoint(primaryVersion2, primary.tcpPort);
 +
 +    // Make sure it sees all docs that were indexed while it was down:
 +    assertVersionAndHits(primary, primaryVersion2, 110);
 +
 +    replica.close();
 +    primary.close();
 +  }
 +
 +  public void testFullClusterCrash() throws Exception {
 +
 +    Path path1 = createTempDir("1");
 +    NodeProcess primary = startNode(-1, 0, path1, -1, true);
 +
 +    Path path2 = createTempDir("2");
 +    NodeProcess replica1 = startNode(primary.tcpPort, 1, path2, -1, true);
 +
 +    Path path3 = createTempDir("3");
 +    NodeProcess replica2 = startNode(primary.tcpPort, 2, path3, -1, true);
 +
 +    sendReplicasToPrimary(primary, replica1, replica2);
 +
 +    // Index 50 docs into primary:
 +    LineFileDocs docs = new LineFileDocs(random());
 +    long primaryVersion1 = 0;
 +    for (int iter=0;iter<5;iter++) {
 +      try (Connection c = new Connection(primary.tcpPort)) {
 +        c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
 +        for(int i=0;i<10;i++) {
 +          Document doc = docs.nextDoc();
 +          primary.addOrUpdateDocument(c, doc, false);
 +        }
 +      }
 +
 +      // Refresh primary, which also pushes to replicas:
 +      primaryVersion1 = primary.flush(0);
 +      assertTrue(primaryVersion1 > 0);
 +    }
 +
 +    // Wait for replicas to sync up:
 +    waitForVersionAndHits(replica1, primaryVersion1, 50);
 +    waitForVersionAndHits(replica2, primaryVersion1, 50);
 +
 +    primary.commit();
 +    replica1.commit();
 +    replica2.commit();
 +
 +    // Index 10 more docs, but don't sync to replicas:
 +    try (Connection c = new Connection(primary.tcpPort)) {
 +      c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
 +      for(int i=0;i<10;i++) {
 +        Document doc = docs.nextDoc();
 +        primary.addOrUpdateDocument(c, doc, false);
 +      }
 +    }
 +
 +    // Full cluster crash
 +    primary.crash();
 +    replica1.crash();
 +    replica2.crash();
 +
 +    // Full cluster restart
 +    primary = startNode(-1, 0, path1, -1, true);
 +    replica1 = startNode(primary.tcpPort, 1, path2, -1, true);
 +    replica2 = startNode(primary.tcpPort, 2, path3, -1, true);
 +
 +    // Only 50 because we didn't commit primary before the crash:
 +    
 +    // It's -1 because it's unpredictable how IW changes segments version on init:
 +    assertVersionAndHits(primary, -1, 50);
-     assertVersionAndHits(replica1, primaryVersion1, 50);
-     assertVersionAndHits(replica2, primaryVersion1, 50);
++    assertVersionAndHits(replica1, primary.initInfosVersion, 50);
++    assertVersionAndHits(replica2, primary.initInfosVersion, 50);
 +
 +    primary.close();
 +    replica1.close();
 +    replica2.close();
 +  }
 +
 +  /** Tell primary current replicas. */
 +  private void sendReplicasToPrimary(NodeProcess primary, NodeProcess... replicas) throws IOException {
 +    try (Connection c = new Connection(primary.tcpPort)) {
 +      c.out.writeByte(SimplePrimaryNode.CMD_SET_REPLICAS);
 +      c.out.writeVInt(replicas.length);
 +      for(int id=0;id<replicas.length;id++) {
 +        NodeProcess replica = replicas[id];
 +        c.out.writeVInt(replica.id);
 +        c.out.writeVInt(replica.tcpPort);
 +      }
 +      c.flush();
 +      c.in.readByte();
 +    }
 +  }
 +
 +  /** Verifies this node is currently searching the specified version with the specified total hit count, or that it eventually does when
 +   *  keepTrying is true. */
 +  private void assertVersionAndHits(NodeProcess node, long expectedVersion, int expectedHitCount) throws Exception {
 +    try (Connection c = new Connection(node.tcpPort)) {
 +      c.out.writeByte(SimplePrimaryNode.CMD_SEARCH_ALL);
 +      c.flush();
 +      long version = c.in.readVLong();
 +      int hitCount = c.in.readVInt();
 +      if (expectedVersion != -1) {
-         assertEquals("hitCount=" + hitCount, expectedVersion, version);
++        assertEquals("wrong searcher version, with hitCount=" + hitCount, expectedVersion, version);
 +      }
 +      assertEquals(expectedHitCount, hitCount);
 +    }
 +  }
 +
 +  private void waitForVersionAndHits(NodeProcess node, long expectedVersion, int expectedHitCount) throws Exception {
 +    try (Connection c = new Connection(node.tcpPort)) {
 +      while (true) {
 +        c.out.writeByte(SimplePrimaryNode.CMD_SEARCH_ALL);
 +        c.flush();
 +        long version = c.in.readVLong();
 +        int hitCount = c.in.readVInt();
 +
 +        if (version == expectedVersion) {
 +          assertEquals(expectedHitCount, hitCount);
 +          break;
 +        }
 +
 +        assertTrue(version < expectedVersion);
 +        Thread.sleep(10);
 +      }
 +    }
 +  }
 +
 +  static void message(String message) {
 +    long now = System.nanoTime();
 +    System.out.println(String.format(Locale.ROOT,
 +                                     "%5.3fs       :     parent [%11s] %s",
 +                                     (now-Node.globalStartNS)/1000000000.,
 +                                     Thread.currentThread().getName(),
 +                                     message));
 +  }
 +}