You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/02/06 17:24:08 UTC
[14/15] lucene-solr git commit: Merge branch 'master' into
nrt_replicas
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/java/org/apache/lucene/codecs/CodecUtil.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/java/org/apache/lucene/index/DirectoryReader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/java/org/apache/lucene/index/ReaderManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/java/org/apache/lucene/search/SearcherManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/java/org/apache/lucene/store/NRTCachingDirectory.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/java/org/apache/lucene/util/IOUtils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions2.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnJRECrash.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnVMError.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/test/org/apache/lucene/index/TestRollingUpdates.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/test/org/apache/lucene/index/TestStressDeletes.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/test/org/apache/lucene/search/TestLRUQueryCache.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/core/src/test/org/apache/lucene/search/TestSearcherManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/SearcherTaxonomyManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/directory/DirectoryTaxonomyWriter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
----------------------------------------------------------------------
diff --cc lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
index 133992f,0000000..a7adbe2
mode 100644,000000..100644
--- a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
@@@ -1,763 -1,0 +1,768 @@@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.SearcherFactory;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.BufferedChecksumIndexInput;
+import org.apache.lucene.store.ByteArrayIndexInput;
+import org.apache.lucene.store.Directory;
++import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.util.IOUtils;
+
+/** Replica node, that pulls index changes from the primary node by copying newly flushed or merged index files */
+
+abstract class ReplicaNode extends Node {
+
+ ReplicaFileDeleter deleter;
+
+ /** IncRef'd files in the current commit point: */
+ private final Collection<String> lastCommitFiles = new HashSet<>();
+
+ /** IncRef'd files in the current NRT point: */
+ protected final Collection<String> lastNRTFiles = new HashSet<>();
+
+ /** Currently running merge pre-copy jobs */
+ protected final Set<CopyJob> mergeCopyJobs = Collections.synchronizedSet(new HashSet<>());
+
+ /** Non-null when we are currently copying files from a new NRT point: */
+ protected CopyJob curNRTCopy;
+
+ /** We hold this to ensure an external IndexWriter cannot also open on our directory: */
+ private final Lock writeFileLock;
+
+ /** Merged segment files that we pre-copied, but have not yet made visible in a new NRT point. */
+ final Set<String> pendingMergeFiles = Collections.synchronizedSet(new HashSet<String>());
+
+ /** Primary gen last time we successfully replicated: */
+ protected long lastPrimaryGen;
+
+ public ReplicaNode(int id, Directory dir, SearcherFactory searcherFactory) throws IOException {
+ super(id, dir, searcherFactory);
+
++ if (dir instanceof FSDirectory && ((FSDirectory) dir).checkPendingDeletions()) {
++ throw new IllegalArgumentException("Directory " + dir + " still has pending deleted files; cannot initialize IndexWriter");
++ }
++
+ boolean success = false;
+
+ try {
+ message("top: init replica dir=" + dir);
+
+ // Obtain a write lock on this index since we "act like" an IndexWriter, to prevent any other IndexWriter or ReplicaNode from using it:
+ writeFileLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME);
+
+ // nocommit must check for no pending deletes here, like IW does
+
+ state = "init";
+ deleter = new ReplicaFileDeleter(this, dir);
+ } catch (Throwable t) {
+ message("exc on init:");
+ t.printStackTrace(System.out);
+ throw t;
+ } finally {
+ if (success == false) {
+ IOUtils.closeWhileHandlingException(this);
+ }
+ }
+ }
+
+ /** Start up this replica, which possibly requires heavy copying of files from the primary node, if we were down for a long time */
+ protected synchronized void start(long curPrimaryGen) throws IOException {
+
+ if (state.equals("init") == false) {
+ throw new IllegalStateException("already started");
+ }
+
+ message("top: now start");
+ try {
+
+ // Figure out what state our local index is in now:
+ String segmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(dir);
+
+ // Also look for any pending_segments_N, in case we crashed mid-commit. We must "inflate" our infos gen to at least this, since
+ // otherwise we may wind up re-using the pending_segments_N file name on commit, and then our deleter can get angry because it still
+ // wants to delete this file:
+ long maxPendingGen = -1;
+ for(String fileName : dir.listAll()) {
+ if (fileName.startsWith(IndexFileNames.PENDING_SEGMENTS)) {
+ long gen = Long.parseLong(fileName.substring(IndexFileNames.PENDING_SEGMENTS.length()+1), Character.MAX_RADIX);
+ if (gen > maxPendingGen) {
+ maxPendingGen = gen;
+ }
+ }
+ }
+
+ SegmentInfos infos;
+ if (segmentsFileName == null) {
+ // No index here yet:
+ infos = new SegmentInfos();
+ message("top: init: no segments in index");
+ } else {
+ message("top: init: read existing segments commit " + segmentsFileName);
+ infos = SegmentInfos.readCommit(dir, segmentsFileName);
+ message("top: init: segments: " + infos.toString() + " version=" + infos.getVersion());
+ Collection<String> indexFiles = infos.files(false);
+
+ lastCommitFiles.add(segmentsFileName);
+ lastCommitFiles.addAll(indexFiles);
+
+ // Always protect the last commit:
+ deleter.incRef(lastCommitFiles);
+
+ lastNRTFiles.addAll(indexFiles);
+ deleter.incRef(lastNRTFiles);
+ message("top: commitFiles=" + lastCommitFiles);
+ message("top: nrtFiles=" + lastNRTFiles);
+ }
+
+ message("top: delete unknown files on init: all files=" + Arrays.toString(dir.listAll()));
+ deleter.deleteUnknownFiles(segmentsFileName);
+ message("top: done delete unknown files on init: all files=" + Arrays.toString(dir.listAll()));
+
+ String s = infos.getUserData().get(PRIMARY_GEN_KEY);
+ long myPrimaryGen;
+ if (s == null) {
+ assert infos.size() == 0;
+ myPrimaryGen = -1;
+ } else {
+ myPrimaryGen = Long.parseLong(s);
+ }
+ message("top: myPrimaryGen=" + myPrimaryGen);
+
+ boolean doCommit;
+
+ if (infos.size() > 0 && myPrimaryGen != -1 && myPrimaryGen != curPrimaryGen) {
+
+ assert myPrimaryGen < curPrimaryGen;
+
+ // Primary changed while we were down. In this case, we must sync from primary before opening a reader, because it's possible current
+ // files we have will need to be overwritten with different ones (if index rolled back and "forked"), and we can't overwrite open
+ // files on Windows:
+
+ final long initSyncStartNS = System.nanoTime();
+
+ message("top: init: primary changed while we were down myPrimaryGen=" + myPrimaryGen +
+ " vs curPrimaryGen=" + curPrimaryGen +
+ "; sync now before mgr init");
+
+ // Try until we succeed in copying over the latest NRT point:
+ CopyJob job = null;
+
+ // We may need to overwrite files referenced by our latest commit, either right now on initial sync, or on a later sync. To make
+ // sure the index is never even in an "apparently" corrupt state (where an old segments_N references invalid files) we forcefully
+ // remove the commit now, and refuse to start the replica if this delete fails:
+ message("top: now delete starting commit point " + segmentsFileName);
+
+ // If this throws exc (e.g. due to virus checker), we cannot start this replica:
+ assert deleter.getRefCount(segmentsFileName) == 1;
+ deleter.decRef(Collections.singleton(segmentsFileName));
+ if (deleter.isPending(segmentsFileName)) {
+ // If e.g. virus checker blocks us from deleting, we absolutely cannot start this node else we can cause corruption:
+ throw new RuntimeException("replica cannot start: existing segments file=" + segmentsFileName + " must be removed in order to start, but the file delete failed");
+ }
+ // So we don't later try to decRef it (illegally) again:
+ boolean didRemove = lastCommitFiles.remove(segmentsFileName);
+ assert didRemove;
+
+ while (true) {
+ job = newCopyJob("sync on startup replica=" + name() + " myVersion=" + infos.getVersion(),
+ null,
+ null,
+ true,
+ null);
+ job.start();
+
+ message("top: init: sync sis.version=" + job.getCopyState().version);
+
+ Collection<String> fileNamesToCopy = job.getFileNamesToCopy();
+
+ // Force this copy job to finish while we wait, now. Note that this can be very time consuming!
+ // NOTE: newNRTPoint detects we are still in init (mgr is null) and does not cancel our copy if a flush happens
+ try {
+ job.runBlocking();
+ job.finish();
+
+ // Success!
+ break;
+ } catch (IOException ioe) {
+ job.cancel("startup failed", ioe);
+ if (ioe.getMessage().contains("checksum mismatch after file copy")) {
+ // OK-ish
+ message("top: failed to copy: " + ioe + "; retrying");
+ } else {
+ throw ioe;
+ }
+ }
+ }
+
+ lastPrimaryGen = job.getCopyState().primaryGen;
+ byte[] infosBytes = job.getCopyState().infosBytes;
+
+ SegmentInfos syncInfos = SegmentInfos.readCommit(dir,
+ new BufferedChecksumIndexInput(new ByteArrayIndexInput("SegmentInfos", job.getCopyState().infosBytes)),
+ job.getCopyState().gen);
+
+ // Must always commit to a larger generation than what's currently in the index:
+ syncInfos.updateGeneration(infos);
+ infos = syncInfos;
+
+ assert infos.getVersion() == job.getCopyState().version;
+ message(" version=" + infos.getVersion() + " segments=" + infos.toString());
+ message("top: init: incRef nrtFiles=" + job.getFileNames());
+ deleter.incRef(job.getFileNames());
+ message("top: init: decRef lastNRTFiles=" + lastNRTFiles);
+ deleter.decRef(lastNRTFiles);
+
+ lastNRTFiles.clear();
+ lastNRTFiles.addAll(job.getFileNames());
+
+ message("top: init: set lastNRTFiles=" + lastNRTFiles);
+ lastFileMetaData = job.getCopyState().files;
+ message(String.format(Locale.ROOT, "top: %d: start: done sync: took %.3fs for %s, opened NRT reader version=%d",
+ id,
+ (System.nanoTime()-initSyncStartNS)/1000000000.0,
+ bytesToString(job.getTotalBytesCopied()),
+ job.getCopyState().version));
+
+ doCommit = true;
+ } else {
+ doCommit = false;
+ lastPrimaryGen = curPrimaryGen;
+ message("top: same primary as before");
+ }
+
+ if (infos.getGeneration() < maxPendingGen) {
+ message("top: move infos generation from " + infos.getGeneration() + " to " + maxPendingGen);
+ infos.setNextWriteGeneration(maxPendingGen);
+ }
+
+ // Notify primary we started, to give it a chance to send any warming merges our way to reduce NRT latency of first sync:
+ sendNewReplica();
+
+ // Finally, we are open for business, since our index now "agrees" with the primary:
+ mgr = new SegmentInfosSearcherManager(dir, this, infos, searcherFactory);
+
+ IndexSearcher searcher = mgr.acquire();
+ try {
+ // TODO: this is test specific:
+ int hitCount = searcher.count(new TermQuery(new Term("marker", "marker")));
+ message("top: marker count=" + hitCount + " version=" + ((DirectoryReader) searcher.getIndexReader()).getVersion());
+ } finally {
+ mgr.release(searcher);
+ }
+
+ // Must commit after init mgr:
+ if (doCommit) {
+ // Very important to commit what we just sync'd over, because we removed the pre-existing commit point above if we had to
+ // overwrite any files it referenced:
+ commit();
+ }
+
+ message("top: done start");
+ state = "idle";
+ } catch (Throwable t) {
+ if (t.getMessage().startsWith("replica cannot start") == false) {
+ message("exc on start:");
+ t.printStackTrace(System.out);
+ } else {
+ dir.close();
+ }
+ IOUtils.reThrow(t);
+ }
+ }
+
+ final Object commitLock = new Object();
+
+ @Override
+ public void commit() throws IOException {
+
+ synchronized(commitLock) {
+
+ SegmentInfos infos;
+ Collection<String> indexFiles;
+
+ synchronized (this) {
+ infos = ((SegmentInfosSearcherManager) mgr).getCurrentInfos();
+ indexFiles = infos.files(false);
+ deleter.incRef(indexFiles);
+ }
+
+ message("top: commit primaryGen=" + lastPrimaryGen + " infos=" + infos.toString() + " files=" + indexFiles);
+
+ // fsync all index files we are now referencing
+ dir.sync(indexFiles);
+
+ Map<String,String> commitData = new HashMap<>();
+ commitData.put(PRIMARY_GEN_KEY, Long.toString(lastPrimaryGen));
+ commitData.put(VERSION_KEY, Long.toString(getCurrentSearchingVersion()));
+ infos.setUserData(commitData, false);
+
+ // write and fsync a new segments_N
+ infos.commit(dir);
+
+ // Notify current infos (which may have changed while we were doing dir.sync above) what generation we are up to; this way future
+ // commits are guaranteed to go to the next (unwritten) generations:
+ if (mgr != null) {
+ ((SegmentInfosSearcherManager) mgr).getCurrentInfos().updateGeneration(infos);
+ }
+ String segmentsFileName = infos.getSegmentsFileName();
+ message("top: commit wrote segments file " + segmentsFileName + " version=" + infos.getVersion() + " sis=" + infos.toString() + " commitData=" + commitData);
+ deleter.incRef(Collections.singletonList(segmentsFileName));
+ message("top: commit decRef lastCommitFiles=" + lastCommitFiles);
+ deleter.decRef(lastCommitFiles);
+ lastCommitFiles.clear();
+ lastCommitFiles.addAll(indexFiles);
+ lastCommitFiles.add(segmentsFileName);
+ message("top: commit version=" + infos.getVersion() + " files now " + lastCommitFiles);
+ }
+ }
+
+ void finishNRTCopy(CopyJob job, long startNS) throws IOException {
+ CopyState copyState = job.getCopyState();
+ message("top: finishNRTCopy: version=" + copyState.version + (job.getFailed() ? " FAILED" : "") + " job=" + job);
+
+ // NOTE: if primary crashed while we were still copying then the job will hit an exc trying to read bytes for the files from the primary node,
+ // and the job will be marked as failed here:
+
+ synchronized (this) {
+
+ if ("syncing".equals(state)) {
+ state = "idle";
+ }
+
+ if (curNRTCopy == job) {
+ message("top: now clear curNRTCopy; job=" + job);
+ curNRTCopy = null;
+ } else {
+ assert job.getFailed();
+ message("top: skip clear curNRTCopy: we were cancelled; job=" + job);
+ }
+
+ if (job.getFailed()) {
+ return;
+ }
+
+ // Does final file renames:
+ job.finish();
+
+ // Turn byte[] back to SegmentInfos:
+ byte[] infosBytes = copyState.infosBytes;
+ SegmentInfos infos = SegmentInfos.readCommit(dir,
+ new BufferedChecksumIndexInput(new ByteArrayIndexInput("SegmentInfos", copyState.infosBytes)),
+ copyState.gen);
+ assert infos.getVersion() == copyState.version;
+
+ message(" version=" + infos.getVersion() + " segments=" + infos.toString());
+
+ // Cutover to new searcher:
+ if (mgr != null) {
+ ((SegmentInfosSearcherManager) mgr).setCurrentInfos(infos);
+ }
+
+ // Must first incRef new NRT files, then decRef old ones, to make sure we don't remove an NRT file that's in common to both:
+ Collection<String> newFiles = copyState.files.keySet();
+ message("top: incRef newNRTFiles=" + newFiles);
+ deleter.incRef(newFiles);
+
+ // If any of our new files were previously copied merges, we clear them now, so we don't try to later delete a non-existent file:
+ pendingMergeFiles.removeAll(newFiles);
+ message("top: after remove from pending merges pendingMergeFiles=" + pendingMergeFiles);
+
+ message("top: decRef lastNRTFiles=" + lastNRTFiles);
+ deleter.decRef(lastNRTFiles);
+ lastNRTFiles.clear();
+ lastNRTFiles.addAll(newFiles);
+ message("top: set lastNRTFiles=" + lastNRTFiles);
+
+ // At this point we can remove any completed merge segment files that we still do not reference. This can happen when a merge
+ // finishes, copies its files out to us, but is then merged away (or dropped due to 100% deletions) before we ever cutover to it
+ // in an NRT point:
+ if (copyState.completedMergeFiles.isEmpty() == false) {
+ message("now remove-if-not-ref'd completed merge files: " + copyState.completedMergeFiles);
+ for(String fileName : copyState.completedMergeFiles) {
+ if (pendingMergeFiles.contains(fileName)) {
+ pendingMergeFiles.remove(fileName);
+ deleter.deleteIfNoRef(fileName);
+ }
+ }
+ }
+
+ lastFileMetaData = copyState.files;
+
+ // It's a good time to delete pending files, since we just refreshed and some previously open files are now closed:
+ deleter.deletePending();
+ }
+
+ int markerCount;
+ IndexSearcher s = mgr.acquire();
+ try {
+ markerCount = s.count(new TermQuery(new Term("marker", "marker")));
+ } finally {
+ mgr.release(s);
+ }
+
+ message(String.format(Locale.ROOT, "top: done sync: took %.3fs for %s, opened NRT reader version=%d markerCount=%d",
+ (System.nanoTime()-startNS)/1000000000.0,
+ bytesToString(job.getTotalBytesCopied()),
+ copyState.version,
+ markerCount));
+ }
+
+ /** Start a background copying job, to copy the specified files from the current primary node. If files is null then the latest copy
+ * state should be copied. If prevJob is not null, then the new copy job is replacing it and should 1) cancel the previous one, and
+ * 2) optionally salvage e.g. partially copied and, shared with the new copy job, files. */
+ protected abstract CopyJob newCopyJob(String reason, Map<String,FileMetaData> files, Map<String,FileMetaData> prevFiles,
+ boolean highPriority, CopyJob.OnceDone onceDone) throws IOException;
+
+ /** Runs this job async'd */
+ protected abstract void launch(CopyJob job);
+
+ /** Tell primary we (replica) just started, so primary can tell us to warm any already warming merges. This lets us keep low nrt refresh
+ * time for the first nrt sync after we started. */
+ protected abstract void sendNewReplica() throws IOException;
+
+ /** Call this to notify this replica node that a new NRT infos is available on the primary.
+ * We kick off a job (runs in the background) to copy files across, and open a new reader once that's done. */
+ public synchronized CopyJob newNRTPoint(long version) throws IOException {
+
+ if (isClosed()) {
+ throw new AlreadyClosedException("this replica is closed: state=" + state);
+ }
+
+ // Caller should not "publish" us until we have finished .start():
+ assert mgr != null;
+
+ if ("idle".equals(state)) {
+ state = "syncing";
+ }
+
+ long curVersion = getCurrentSearchingVersion();
+
+ message("top: start sync sis.version=" + version);
+
+ if (version == curVersion) {
+ // Caller releases the CopyState:
+ message("top: new NRT point has same version as current; skipping");
+ return null;
+ }
+
+ if (version < curVersion) {
+ // This can happen, if two syncs happen close together, and due to thread scheduling, the incoming older version runs after the newer version
+ message("top: new NRT point (version=" + version + ") is older than current (version=" + version + "); skipping");
+ return null;
+ }
+
+ final long startNS = System.nanoTime();
+
+ message("top: newNRTPoint");
+ CopyJob job = null;
+ try {
+ job = newCopyJob("NRT point sync version=" + version,
+ null,
+ lastFileMetaData,
+ true,
+ new CopyJob.OnceDone() {
+ @Override
+ public void run(CopyJob job) {
+ try {
+ finishNRTCopy(job, startNS);
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+ });
+ } catch (NodeCommunicationException nce) {
+ // E.g. primary could crash/close when we are asking it for the copy state:
+ message("top: ignoring communication exception creating CopyJob: " + nce);
+ //nce.printStackTrace(System.out);
+ if (state.equals("syncing")) {
+ state = "idle";
+ }
+ return null;
+ }
+
+ Collection<String> newNRTFiles = job.getFileNames();
+ long newPrimaryGen = job.getCopyState().primaryGen;
+ maybeNewPrimary(newPrimaryGen);
+
+ message("top: newNRTPoint: job files=" + newNRTFiles);
+
+ if (curNRTCopy != null) {
+ job.transferAndCancel(curNRTCopy);
+ assert curNRTCopy.getFailed();
+ }
+
+ curNRTCopy = job;
+
+ for(String fileName : curNRTCopy.getFileNamesToCopy()) {
+ assert lastCommitFiles.contains(fileName) == false: "fileName=" + fileName + " is in lastCommitFiles and is being copied?";
+ synchronized (mergeCopyJobs) {
+ for (CopyJob mergeJob : mergeCopyJobs) {
+ if (mergeJob.getFileNames().contains(fileName)) {
+ // TODO: we could maybe transferAndCancel here? except CopyJob can't transferAndCancel more than one currently
+ message("top: now cancel merge copy job=" + mergeJob + ": file " + fileName + " is now being copied via NRT point");
+ mergeJob.cancel("newNRTPoint is copying over the same file", null);
+ }
+ }
+ }
+ }
+
+ try {
+ job.start();
+ } catch (NodeCommunicationException nce) {
+ // E.g. primary could crash/close when we are asking it for the copy state:
+ message("top: ignoring exception starting CopyJob: " + nce);
+ nce.printStackTrace(System.out);
+ if (state.equals("syncing")) {
+ state = "idle";
+ }
+ return null;
+ }
+
+ // Runs in the background jobs thread, maybe slowly/throttled, and calls finishSync once it's done:
+ launch(curNRTCopy);
+ return curNRTCopy;
+ }
+
+ public synchronized boolean isCopying() {
+ return curNRTCopy != null;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return "closed".equals(state) || "closing".equals(state) || "crashing".equals(state) || "crashed".equals(state);
+ }
+
+ @Override
+ public void close() throws IOException {
+ message("top: now close");
+
+ synchronized (this) {
+ state = "closing";
+ if (curNRTCopy != null) {
+ curNRTCopy.cancel("closing", null);
+ }
+ }
+
+ synchronized (this) {
+ message("top: close mgr");
+ mgr.close();
+
+ message("top: decRef lastNRTFiles=" + lastNRTFiles);
+ deleter.decRef(lastNRTFiles);
+ lastNRTFiles.clear();
+
+ // NOTE: do not decRef these!
+ lastCommitFiles.clear();
+
+ message("top: delete if no ref pendingMergeFiles=" + pendingMergeFiles);
+ for(String fileName : pendingMergeFiles) {
+ deleter.deleteIfNoRef(fileName);
+ }
+ pendingMergeFiles.clear();
+
+ message("top: close dir");
+ IOUtils.close(writeFileLock, dir);
+ }
+ message("top: done close");
+ state = "closed";
+ }
+
+ /** Called when the primary changed */
+ protected synchronized void maybeNewPrimary(long newPrimaryGen) {
+ if (newPrimaryGen != lastPrimaryGen) {
+ message("top: now change lastPrimaryGen from " + lastPrimaryGen + " to " + newPrimaryGen + " pendingMergeFiles=" + pendingMergeFiles);
+ assert newPrimaryGen > lastPrimaryGen: "newPrimaryGen=" + newPrimaryGen + " vs lastPrimaryGen=" + lastPrimaryGen;
+ lastPrimaryGen = newPrimaryGen;
+ pendingMergeFiles.clear();
+ } else {
+ message("top: keep current lastPrimaryGen=" + lastPrimaryGen);
+ }
+ }
+
+ protected synchronized CopyJob launchPreCopyMerge(AtomicBoolean finished, long newPrimaryGen, Map<String,FileMetaData> files) throws IOException {
+
+ CopyJob job;
+
+ maybeNewPrimary(newPrimaryGen);
+ final long primaryGenStart = lastPrimaryGen;
+ Set<String> fileNames = files.keySet();
+ message("now pre-copy warm merge files=" + fileNames + " primaryGen=" + newPrimaryGen);
+
+ for(String fileName : fileNames) {
+ assert pendingMergeFiles.contains(fileName) == false: "file \"" + fileName + "\" is already being warmed!";
+ assert lastNRTFiles.contains(fileName) == false: "file \"" + fileName + "\" is already NRT visible!";
+ }
+
+ job = newCopyJob("warm merge on " + name() + " filesNames=" + fileNames,
+ files, null, false,
+ new CopyJob.OnceDone() {
+
+ @Override
+ public void run(CopyJob job) throws IOException {
+ // Signals that this replica has finished
+ mergeCopyJobs.remove(job);
+ message("done warming merge " + fileNames + " failed?=" + job.getFailed());
+ synchronized(this) {
+ if (job.getFailed() == false) {
+ if (lastPrimaryGen != primaryGenStart) {
+ message("merge pre copy finished but primary has changed; cancelling job files=" + fileNames);
+ job.cancel("primary changed during merge copy", null);
+ } else {
+ boolean abort = false;
+ for (String fileName : fileNames) {
+ if (lastNRTFiles.contains(fileName)) {
+ message("abort merge finish: file " + fileName + " is referenced by last NRT point");
+ abort = true;
+ }
+ if (lastCommitFiles.contains(fileName)) {
+ message("abort merge finish: file " + fileName + " is referenced by last commit point");
+ abort = true;
+ }
+ }
+ if (abort) {
+ // Even though in newNRTPoint we have similar logic, which cancels any merge copy jobs if an NRT point
+ // shows up referencing the files we are warming (because primary got impatient and gave up on us), we also
+ // need it here in case replica is way far behind and fails to even receive the merge pre-copy request
+ // until after the newNRTPoint referenced those files:
+ job.cancel("merged segment was separately copied via NRT point", null);
+ } else {
+ job.finish();
+ message("merge pre copy finished files=" + fileNames);
+ for(String fileName : fileNames) {
+ assert pendingMergeFiles.contains(fileName) == false : "file \"" + fileName + "\" is already in pendingMergeFiles";
+ message("add file " + fileName + " to pendingMergeFiles");
+ pendingMergeFiles.add(fileName);
+ }
+ }
+ }
+ } else {
+ message("merge copy finished with failure");
+ }
+ }
+ finished.set(true);
+ }
+ });
+
+ job.start();
+
+ // When warming a merge we better not already have any of these files copied!
+ assert job.getFileNamesToCopy().size() == files.size();
+
+ mergeCopyJobs.add(job);
+ launch(job);
+
+ return job;
+ }
+
+ public IndexOutput createTempOutput(String prefix, String suffix, IOContext ioContext) throws IOException {
+ return dir.createTempOutput(prefix, suffix, IOContext.DEFAULT);
+ }
+
+ /** Compares incoming per-file identity (id, checksum, header, footer) versus what we have locally and returns the subset of the incoming
+ * files that need copying */
+ public List<Map.Entry<String,FileMetaData>> getFilesToCopy(Map<String,FileMetaData> files) throws IOException {
+
+ boolean doCopyCommitFiles = false;
+ List<Map.Entry<String,FileMetaData>> toCopy = new ArrayList<>();
+ for (Map.Entry<String,FileMetaData> ent : files.entrySet()) {
+ String fileName = ent.getKey();
+ FileMetaData fileMetaData = ent.getValue();
+ if (fileIsIdentical(fileName, fileMetaData) == false) {
+ toCopy.add(ent);
+ }
+ }
+
+ return toCopy;
+ }
+
+ /** Carefully determine if the file on the primary, identified by its {@code String fileName} along with the {@link FileMetaData}
+ * "summarizing" its contents, is precisely the same file that we have locally. If the file does not exist locally, or if its its header
+ * (inclues the segment id), length, footer (including checksum) differ, then this returns false, else true. */
+ private boolean fileIsIdentical(String fileName, FileMetaData srcMetaData) throws IOException {
+
+ if (deleter.isPending(fileName)) {
+ // This was a file we had wanted to delete yet a virus checker prevented us, and now we need to overwrite it.
+ // Such files are in an unknown state, and even if their header and footer and length all
+ // match, since they may not have been fsync'd by the previous node instance on this directory,
+ // they could in theory have corruption internally. So we always force ourselves to copy them here:
+ if (Node.VERBOSE_FILES) {
+ message("file " + fileName + ": will copy [we had wanted to delete this file on init, but failed]");
+ }
+ return false;
+ }
+
+ FileMetaData destMetaData = readLocalFileMetaData(fileName);
+ if (destMetaData == null) {
+ // Something went wrong in reading the file (it's corrupt, truncated, does not exist, etc.):
+ return false;
+ }
+
+ if (Arrays.equals(destMetaData.header, srcMetaData.header) == false ||
+ Arrays.equals(destMetaData.footer, srcMetaData.footer) == false) {
+ // Segment name was reused! This is rare but possible and otherwise devastating:
+ if (Node.VERBOSE_FILES) {
+ message("file " + fileName + ": will copy [header/footer is different]");
+ }
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ private ConcurrentMap<String,Boolean> copying = new ConcurrentHashMap<>();
+
+ // Used only to catch bugs, ensuring a given file name is only ever being copied bye one job:
+ public void startCopyFile(String name) {
+ if (copying.putIfAbsent(name, Boolean.TRUE) != null) {
+ throw new IllegalStateException("file " + name + " is being copied in two places!");
+ }
+ }
+
+ public void finishCopyFile(String name) {
+ if (copying.remove(name) == null) {
+ throw new IllegalStateException("file " + name + " was not actually being copied?");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bd6804bc/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
----------------------------------------------------------------------
diff --cc lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
index 15e9c8c,0000000..cd98b48
mode 100644,000000..100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
@@@ -1,876 -1,0 +1,876 @@@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.LineFileDocs;
+import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
+import org.apache.lucene.util.LuceneTestCase.SuppressSysoutChecks;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.TestUtil;
+
+import com.carrotsearch.randomizedtesting.SeedUtils;
+
+// nocommit make some explicit failure tests
+
+// MockRandom's .sd file has no index header/footer:
+@SuppressCodecs({"MockRandom", "Memory", "Direct", "SimpleText"})
+@SuppressSysoutChecks(bugUrl = "Stuff gets printed, important stuff for debugging a failure")
+public class TestNRTReplication extends LuceneTestCase {
+
+ /** cwd where we start each child (server) node */
+ private Path childTempDir;
+
+ final AtomicLong nodeStartCounter = new AtomicLong();
+ private long nextPrimaryGen;
+ private long lastPrimaryGen;
+ LineFileDocs docs;
+
+ /** Launches a child "server" (separate JVM), which is either primary or replica node */
+ private NodeProcess startNode(int primaryTCPPort, final int id, Path indexPath, long forcePrimaryVersion, boolean willCrash) throws IOException {
+ List<String> cmd = new ArrayList<>();
+
+ cmd.add(System.getProperty("java.home")
+ + System.getProperty("file.separator")
+ + "bin"
+ + System.getProperty("file.separator")
+ + "java");
+ cmd.add("-Xmx512m");
+
+ long myPrimaryGen;
+ if (primaryTCPPort != -1) {
+ // I am a replica
+ cmd.add("-Dtests.nrtreplication.primaryTCPPort=" + primaryTCPPort);
+ myPrimaryGen = lastPrimaryGen;
+ } else {
+ myPrimaryGen = nextPrimaryGen++;
+ lastPrimaryGen = myPrimaryGen;
+ }
+ cmd.add("-Dtests.nrtreplication.primaryGen=" + myPrimaryGen);
+ cmd.add("-Dtests.nrtreplication.closeorcrash=false");
+
+ cmd.add("-Dtests.nrtreplication.node=true");
+ cmd.add("-Dtests.nrtreplication.nodeid=" + id);
+ cmd.add("-Dtests.nrtreplication.startNS=" + Node.globalStartNS);
+ cmd.add("-Dtests.nrtreplication.indexpath=" + indexPath);
+ cmd.add("-Dtests.nrtreplication.checkonclose=true");
+
+ if (primaryTCPPort == -1) {
+ // We are the primary node
+ cmd.add("-Dtests.nrtreplication.isPrimary=true");
+ cmd.add("-Dtests.nrtreplication.forcePrimaryVersion=" + forcePrimaryVersion);
+ }
+
+ // Mixin our own counter because this is called from a fresh thread which means the seed otherwise isn't changing each time we spawn a
+ // new node:
+ long seed = random().nextLong() * nodeStartCounter.incrementAndGet();
+
+ cmd.add("-Dtests.seed=" + SeedUtils.formatSeed(seed));
+ cmd.add("-ea");
+ cmd.add("-cp");
+ cmd.add(System.getProperty("java.class.path"));
+ cmd.add("org.junit.runner.JUnitCore");
+ cmd.add(getClass().getName().replace(getClass().getSimpleName(), "SimpleServer"));
+
+ message("child process command: " + cmd);
+ ProcessBuilder pb = new ProcessBuilder(cmd);
+ pb.redirectErrorStream(true);
+
+ // Important, so that the scary looking hs_err_<pid>.log appear under our test temp dir:
+ pb.directory(childTempDir.toFile());
+
+ Process p = pb.start();
+
+ BufferedReader r;
+ try {
+ r = new BufferedReader(new InputStreamReader(p.getInputStream(), IOUtils.UTF_8));
+ } catch (UnsupportedEncodingException uee) {
+ throw new RuntimeException(uee);
+ }
+
+ int tcpPort = -1;
+ long initCommitVersion = -1;
+ long initInfosVersion = -1;
+ Pattern logTimeStart = Pattern.compile("^[0-9\\.]+s .*");
+ boolean sawExistingSegmentsFile = false;
+
+ while (true) {
+ String l = r.readLine();
+ if (l == null) {
+ message("top: node=" + id + " failed to start");
+ try {
+ p.waitFor();
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ }
+ message("exit value=" + p.exitValue());
+ message("top: now fail test replica R" + id + " failed to start");
+ throw new RuntimeException("replica R" + id + " failed to start");
+ }
+
+ if (logTimeStart.matcher(l).matches()) {
+ // Already a well-formed log output:
+ System.out.println(l);
+ } else {
+ message(l);
+ }
+
+ if (l.startsWith("PORT: ")) {
+ tcpPort = Integer.parseInt(l.substring(6).trim());
+ } else if (l.startsWith("COMMIT VERSION: ")) {
+ initCommitVersion = Integer.parseInt(l.substring(16).trim());
+ } else if (l.startsWith("INFOS VERSION: ")) {
+ initInfosVersion = Integer.parseInt(l.substring(15).trim());
+ } else if (l.contains("will crash after")) {
+ willCrash = true;
+ } else if (l.startsWith("NODE STARTED")) {
+ break;
+ } else if (l.contains("replica cannot start: existing segments file=")) {
+ sawExistingSegmentsFile = true;
+ }
+ }
+
+ final boolean finalWillCrash = willCrash;
+
+ // Baby sits the child process, pulling its stdout and printing to our stdout:
+ AtomicBoolean nodeClosing = new AtomicBoolean();
+ Thread pumper = ThreadPumper.start(
+ new Runnable() {
+ @Override
+ public void run() {
+ message("now wait for process " + p);
+ try {
+ p.waitFor();
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+
+ message("done wait for process " + p);
+ int exitValue = p.exitValue();
+ message("exit value=" + exitValue + " willCrash=" + finalWillCrash);
+ if (exitValue != 0 && finalWillCrash == false) {
+ // should fail test
+ throw new RuntimeException("node " + id + " process had unexpected non-zero exit status=" + exitValue);
+ }
+ }
+ }, r, System.out, null, nodeClosing);
+ pumper.setName("pump" + id);
+
+ message("top: node=" + id + " started at tcpPort=" + tcpPort + " initCommitVersion=" + initCommitVersion + " initInfosVersion=" + initInfosVersion);
+ return new NodeProcess(p, id, tcpPort, pumper, primaryTCPPort == -1, initCommitVersion, initInfosVersion, nodeClosing);
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ Node.globalStartNS = System.nanoTime();
+ childTempDir = createTempDir("child");
+ docs = new LineFileDocs(random());
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ docs.close();
+ }
+
+ public void testReplicateDeleteAllDocuments() throws Exception {
+
+ Path primaryPath = createTempDir("primary");
+ NodeProcess primary = startNode(-1, 0, primaryPath, -1, false);
+
+ Path replicaPath = createTempDir("replica");
+ NodeProcess replica = startNode(primary.tcpPort, 1, replicaPath, -1, false);
+
+ // Tell primary current replicas:
+ sendReplicasToPrimary(primary, replica);
+
+ // Index 10 docs into primary:
+ LineFileDocs docs = new LineFileDocs(random());
+ Connection primaryC = new Connection(primary.tcpPort);
+ primaryC.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+ for(int i=0;i<10;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(primaryC, doc, false);
+ }
+
+ // Nothing in replica index yet
+ assertVersionAndHits(replica, 0, 0);
+
+ // Refresh primary, which also pushes to replica:
+ long primaryVersion1 = primary.flush(0);
+ assertTrue(primaryVersion1 > 0);
+
+ // Wait for replica to show the change
+ waitForVersionAndHits(replica, primaryVersion1, 10);
+
+ // Delete all docs from primary
+ if (random().nextBoolean()) {
+ // Inefficiently:
+ for(int id=0;id<10;id++) {
+ primary.deleteDocument(primaryC, Integer.toString(id));
+ }
+ } else {
+ // Efficiently:
+ primary.deleteAllDocuments(primaryC);
+ }
+
+ // Replica still shows 10 docs:
+ assertVersionAndHits(replica, primaryVersion1, 10);
+
+ // Refresh primary, which also pushes to replica:
+ long primaryVersion2 = primary.flush(0);
+ assertTrue(primaryVersion2 > primaryVersion1);
+
+ // Wait for replica to show the change
+ waitForVersionAndHits(replica, primaryVersion2, 0);
+
+ // Index 10 docs again:
+ for(int i=0;i<10;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(primaryC, doc, false);
+ }
+
+ // Refresh primary, which also pushes to replica:
+ long primaryVersion3 = primary.flush(0);
+ assertTrue(primaryVersion3 > primaryVersion2);
+
+ // Wait for replica to show the change
+ waitForVersionAndHits(replica, primaryVersion3, 10);
+
+ primaryC.close();
+
+ replica.close();
+ primary.close();
+ }
+
+ public void testReplicateForceMerge() throws Exception {
+
+ Path primaryPath = createTempDir("primary");
+ NodeProcess primary = startNode(-1, 0, primaryPath, -1, false);
+
+ Path replicaPath = createTempDir("replica");
+ NodeProcess replica = startNode(primary.tcpPort, 1, replicaPath, -1, false);
+
+ sendReplicasToPrimary(primary, replica);
+
+ // Index 10 docs into primary:
+ LineFileDocs docs = new LineFileDocs(random());
+ Connection primaryC = new Connection(primary.tcpPort);
+ primaryC.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+ for(int i=0;i<10;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(primaryC, doc, false);
+ }
+
+ // Refresh primary, which also pushes to replica:
+ long primaryVersion1 = primary.flush(0);
+ assertTrue(primaryVersion1 > 0);
+
+ // Index 10 more docs into primary:
+ for(int i=0;i<10;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(primaryC, doc, false);
+ }
+
+ // Refresh primary, which also pushes to replica:
+ long primaryVersion2 = primary.flush(0);
+ assertTrue(primaryVersion2 > primaryVersion1);
+
+ primary.forceMerge(primaryC);
+
+ // Refresh primary, which also pushes to replica:
+ long primaryVersion3 = primary.flush(0);
+ assertTrue(primaryVersion3 > primaryVersion2);
+
+ // Wait for replica to show the change
+ waitForVersionAndHits(replica, primaryVersion3, 20);
+
+ primaryC.close();
+
+ replica.close();
+ primary.close();
+ }
+
+ // Start up, index 10 docs, replicate, but crash and restart the replica without committing it:
+ public void testReplicaCrashNoCommit() throws Exception {
+
+ Path primaryPath = createTempDir("primary");
+ NodeProcess primary = startNode(-1, 0, primaryPath, -1, false);
+
+ Path replicaPath = createTempDir("replica");
+ NodeProcess replica = startNode(primary.tcpPort, 1, replicaPath, -1, true);
+
+ sendReplicasToPrimary(primary, replica);
+
+ // Index 10 docs into primary:
+ LineFileDocs docs = new LineFileDocs(random());
+ try (Connection c = new Connection(primary.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+ for(int i=0;i<10;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(c, doc, false);
+ }
+ }
+
+ // Refresh primary, which also pushes to replica:
+ long primaryVersion1 = primary.flush(0);
+ assertTrue(primaryVersion1 > 0);
+
+ // Wait for replica to sync up:
+ waitForVersionAndHits(replica, primaryVersion1, 10);
+
+ // Crash replica:
+ replica.crash();
+
+ // Restart replica:
+ replica = startNode(primary.tcpPort, 1, replicaPath, -1, false);
+
+ // On startup the replica searches the last commit (empty here):
+ assertVersionAndHits(replica, 0, 0);
+
+ // Ask replica to sync:
+ replica.newNRTPoint(primaryVersion1, primary.tcpPort);
+ waitForVersionAndHits(replica, primaryVersion1, 10);
+
+ replica.close();
+ primary.close();
+ }
+
+ // Start up, index 10 docs, replicate, commit, crash and restart the replica
+ public void testReplicaCrashWithCommit() throws Exception {
+
+ Path primaryPath = createTempDir("primary");
+ NodeProcess primary = startNode(-1, 0, primaryPath, -1, false);
+
+ Path replicaPath = createTempDir("replica");
+ NodeProcess replica = startNode(primary.tcpPort, 1, replicaPath, -1, true);
+
+ sendReplicasToPrimary(primary, replica);
+
+ // Index 10 docs into primary:
+ LineFileDocs docs = new LineFileDocs(random());
+ try (Connection c = new Connection(primary.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+ for(int i=0;i<10;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(c, doc, false);
+ }
+ }
+
+ // Refresh primary, which also pushes to replica:
+ long primaryVersion1 = primary.flush(0);
+ assertTrue(primaryVersion1 > 0);
+
+ // Wait for replica to sync up:
+ waitForVersionAndHits(replica, primaryVersion1, 10);
+
+ // Commit and crash replica:
+ replica.commit();
+ replica.crash();
+
+ // Restart replica:
+ replica = startNode(primary.tcpPort, 1, replicaPath, -1, false);
+
+ // On startup the replica searches the last commit:
+ assertVersionAndHits(replica, primaryVersion1, 10);
+
+ replica.close();
+ primary.close();
+ }
+
+ // Start up, index 10 docs, replicate, commit, crash, index more docs, replicate, then restart the replica
+ public void testIndexingWhileReplicaIsDown() throws Exception {
+
+ Path primaryPath = createTempDir("primary");
+ NodeProcess primary = startNode(-1, 0, primaryPath, -1, false);
+
+ Path replicaPath = createTempDir("replica");
+ NodeProcess replica = startNode(primary.tcpPort, 1, replicaPath, -1, true);
+
+ sendReplicasToPrimary(primary, replica);
+
+ // Index 10 docs into primary:
+ LineFileDocs docs = new LineFileDocs(random());
+ try (Connection c = new Connection(primary.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+ for(int i=0;i<10;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(c, doc, false);
+ }
+ }
+
+ // Refresh primary, which also pushes to replica:
+ long primaryVersion1 = primary.flush(0);
+ assertTrue(primaryVersion1 > 0);
+
+ // Wait for replica to sync up:
+ waitForVersionAndHits(replica, primaryVersion1, 10);
+
+ // Commit and crash replica:
+ replica.commit();
+ replica.crash();
+
+ sendReplicasToPrimary(primary);
+
+ // Index 10 more docs, while replica is down
+ try (Connection c = new Connection(primary.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+ for(int i=0;i<10;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(c, doc, false);
+ }
+ }
+
+ // And flush:
+ long primaryVersion2 = primary.flush(0);
+ assertTrue(primaryVersion2 > primaryVersion1);
+
+ // Now restart replica:
+ replica = startNode(primary.tcpPort, 1, replicaPath, -1, false);
+
+ sendReplicasToPrimary(primary, replica);
+
+ // On startup the replica still searches its last commit:
+ assertVersionAndHits(replica, primaryVersion1, 10);
+
+ // Now ask replica to sync:
+ replica.newNRTPoint(primaryVersion2, primary.tcpPort);
+
+ waitForVersionAndHits(replica, primaryVersion2, 20);
+
+ replica.close();
+ primary.close();
+ }
+
+ // Crash primary and promote a replica
+ public void testCrashPrimary1() throws Exception {
+
+ Path path1 = createTempDir("1");
+ NodeProcess primary = startNode(-1, 0, path1, -1, true);
+
+ Path path2 = createTempDir("2");
+ NodeProcess replica = startNode(primary.tcpPort, 1, path2, -1, true);
+
+ sendReplicasToPrimary(primary, replica);
+
+ // Index 10 docs into primary:
+ LineFileDocs docs = new LineFileDocs(random());
+ try (Connection c = new Connection(primary.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+ for(int i=0;i<10;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(c, doc, false);
+ }
+ }
+
+ // Refresh primary, which also pushes to replica:
+ long primaryVersion1 = primary.flush(0);
+ assertTrue(primaryVersion1 > 0);
+
+ // Wait for replica to sync up:
+ waitForVersionAndHits(replica, primaryVersion1, 10);
+
+ // Crash primary:
+ primary.crash();
+
+ // Promote replica:
+ replica.commit();
+ replica.close();
+
+ primary = startNode(-1, 1, path2, -1, false);
+
+ // Should still see 10 docs:
+ assertVersionAndHits(primary, primaryVersion1, 10);
+
+ primary.close();
+ }
+
+ // Crash primary and then restart it
+ public void testCrashPrimary2() throws Exception {
+
+ Path path1 = createTempDir("1");
+ NodeProcess primary = startNode(-1, 0, path1, -1, true);
+
+ Path path2 = createTempDir("2");
+ NodeProcess replica = startNode(primary.tcpPort, 1, path2, -1, true);
+
+ sendReplicasToPrimary(primary, replica);
+
+ // Index 10 docs into primary:
+ LineFileDocs docs = new LineFileDocs(random());
+ try (Connection c = new Connection(primary.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+ for(int i=0;i<10;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(c, doc, false);
+ }
+ }
+
+ // Refresh primary, which also pushes to replica:
+ long primaryVersion1 = primary.flush(0);
+ assertTrue(primaryVersion1 > 0);
+
+ // Wait for replica to sync up:
+ waitForVersionAndHits(replica, primaryVersion1, 10);
+
+ primary.commit();
+
+ // Index 10 docs, but crash before replicating or committing:
+ try (Connection c = new Connection(primary.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+ for(int i=0;i<10;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(c, doc, false);
+ }
+ }
+
+ // Crash primary:
+ primary.crash();
+
+ // Restart it:
+ primary = startNode(-1, 0, path1, -1, true);
+
+ sendReplicasToPrimary(primary, replica);
+
+ // Index 10 more docs
+ try (Connection c = new Connection(primary.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+ for(int i=0;i<10;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(c, doc, false);
+ }
+ }
+
+ long primaryVersion2 = primary.flush(0);
+ assertTrue(primaryVersion2 > primaryVersion1);
+
+ // Wait for replica to sync up:
+ waitForVersionAndHits(replica, primaryVersion2, 20);
+
+ primary.close();
+ replica.close();
+ }
+
+ // Crash primary and then restart it, while a replica node is down, then bring replica node back up and make sure it properly "unforks" itself
+ public void testCrashPrimary3() throws Exception {
+
+ Path path1 = createTempDir("1");
+ NodeProcess primary = startNode(-1, 0, path1, -1, true);
+
+ Path path2 = createTempDir("2");
+ NodeProcess replica = startNode(primary.tcpPort, 1, path2, -1, true);
+
+ sendReplicasToPrimary(primary, replica);
+
+ // Index 10 docs into primary:
+ try (Connection c = new Connection(primary.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+ for(int i=0;i<10;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(c, doc, false);
+ }
+ }
+
+ // Refresh primary, which also pushes to replica:
+ long primaryVersion1 = primary.flush(0);
+ assertTrue(primaryVersion1 > 0);
+
+ // Wait for replica to sync up:
+ waitForVersionAndHits(replica, primaryVersion1, 10);
+
+ replica.commit();
+
+ replica.close();
+ primary.crash();
+
+ // At this point replica is "in the future": it has 10 docs committed, but the primary crashed before committing so it has 0 docs
+
+ // Restart primary:
+ primary = startNode(-1, 0, path1, -1, true);
+
+ // Index 20 docs into primary:
+ try (Connection c = new Connection(primary.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+ for(int i=0;i<20;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(c, doc, false);
+ }
+ }
+
+ // Flush primary, but there are no replicas to sync to:
+ long primaryVersion2 = primary.flush(0);
+
+ // Now restart replica, which on init should detect on a "lost branch" because its 10 docs that were committed came from a different
+ // primary node:
+ replica = startNode(primary.tcpPort, 1, path2, -1, true);
+
+ assertVersionAndHits(replica, primaryVersion2, 20);
+
+ primary.close();
+ replica.close();
+ }
+
+ public void testCrashPrimaryWhileCopying() throws Exception {
+
+ Path path1 = createTempDir("1");
+ NodeProcess primary = startNode(-1, 0, path1, -1, true);
+
+ Path path2 = createTempDir("2");
+ NodeProcess replica = startNode(primary.tcpPort, 1, path2, -1, true);
+
+ sendReplicasToPrimary(primary, replica);
+
+ // Index 100 docs into primary:
+ LineFileDocs docs = new LineFileDocs(random());
+ try (Connection c = new Connection(primary.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+ for(int i=0;i<100;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(c, doc, false);
+ }
+ }
+
+ // Refresh primary, which also pushes (async) to replica:
+ long primaryVersion1 = primary.flush(0);
+ assertTrue(primaryVersion1 > 0);
+
+ Thread.sleep(TestUtil.nextInt(random(), 1, 30));
+
+ // Crash primary, likely/hopefully while replica is still copying
+ primary.crash();
+
+ // Could see either 100 docs (replica finished before crash) or 0 docs:
+ try (Connection c = new Connection(replica.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_SEARCH_ALL);
+ c.flush();
+ long version = c.in.readVLong();
+ int hitCount = c.in.readVInt();
+ if (version == 0) {
+ assertEquals(0, hitCount);
+ } else {
+ assertEquals(primaryVersion1, version);
+ assertEquals(100, hitCount);
+ }
+ }
+
+ primary.close();
+ replica.close();
+ }
+
+ public void testCrashReplica() throws Exception {
+
+ Path path1 = createTempDir("1");
+ NodeProcess primary = startNode(-1, 0, path1, -1, true);
+
+ Path path2 = createTempDir("2");
+ NodeProcess replica = startNode(primary.tcpPort, 1, path2, -1, true);
+
+ sendReplicasToPrimary(primary, replica);
+
+ // Index 10 docs into primary:
+ LineFileDocs docs = new LineFileDocs(random());
+ try (Connection c = new Connection(primary.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+ for(int i=0;i<10;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(c, doc, false);
+ }
+ }
+
+ // Refresh primary, which also pushes to replica:
+ long primaryVersion1 = primary.flush(0);
+ assertTrue(primaryVersion1 > 0);
+
+ // Wait for replica to sync up:
+ waitForVersionAndHits(replica, primaryVersion1, 10);
+
+ // Crash replica
+ replica.crash();
+
+ sendReplicasToPrimary(primary);
+
+ // Lots of new flushes while replica is down:
+ long primaryVersion2 = 0;
+ for(int iter=0;iter<10;iter++) {
+ // Index 10 docs into primary:
+ try (Connection c = new Connection(primary.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+ for(int i=0;i<10;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(c, doc, false);
+ }
+ }
+ primaryVersion2 = primary.flush(0);
+ }
+
+ // Start up replica again:
+ replica = startNode(primary.tcpPort, 1, path2, -1, true);
+
+ sendReplicasToPrimary(primary, replica);
+
+ // Now ask replica to sync:
+ replica.newNRTPoint(primaryVersion2, primary.tcpPort);
+
+ // Make sure it sees all docs that were indexed while it was down:
+ assertVersionAndHits(primary, primaryVersion2, 110);
+
+ replica.close();
+ primary.close();
+ }
+
+ public void testFullClusterCrash() throws Exception {
+
+ Path path1 = createTempDir("1");
+ NodeProcess primary = startNode(-1, 0, path1, -1, true);
+
+ Path path2 = createTempDir("2");
+ NodeProcess replica1 = startNode(primary.tcpPort, 1, path2, -1, true);
+
+ Path path3 = createTempDir("3");
+ NodeProcess replica2 = startNode(primary.tcpPort, 2, path3, -1, true);
+
+ sendReplicasToPrimary(primary, replica1, replica2);
+
+ // Index 50 docs into primary:
+ LineFileDocs docs = new LineFileDocs(random());
+ long primaryVersion1 = 0;
+ for (int iter=0;iter<5;iter++) {
+ try (Connection c = new Connection(primary.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+ for(int i=0;i<10;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(c, doc, false);
+ }
+ }
+
+ // Refresh primary, which also pushes to replicas:
+ primaryVersion1 = primary.flush(0);
+ assertTrue(primaryVersion1 > 0);
+ }
+
+ // Wait for replicas to sync up:
+ waitForVersionAndHits(replica1, primaryVersion1, 50);
+ waitForVersionAndHits(replica2, primaryVersion1, 50);
+
+ primary.commit();
+ replica1.commit();
+ replica2.commit();
+
+ // Index 10 more docs, but don't sync to replicas:
+ try (Connection c = new Connection(primary.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+ for(int i=0;i<10;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(c, doc, false);
+ }
+ }
+
+ // Full cluster crash
+ primary.crash();
+ replica1.crash();
+ replica2.crash();
+
+ // Full cluster restart
+ primary = startNode(-1, 0, path1, -1, true);
+ replica1 = startNode(primary.tcpPort, 1, path2, -1, true);
+ replica2 = startNode(primary.tcpPort, 2, path3, -1, true);
+
+ // Only 50 because we didn't commit primary before the crash:
+
+ // It's -1 because it's unpredictable how IW changes segments version on init:
+ assertVersionAndHits(primary, -1, 50);
- assertVersionAndHits(replica1, primaryVersion1, 50);
- assertVersionAndHits(replica2, primaryVersion1, 50);
++ assertVersionAndHits(replica1, primary.initInfosVersion, 50);
++ assertVersionAndHits(replica2, primary.initInfosVersion, 50);
+
+ primary.close();
+ replica1.close();
+ replica2.close();
+ }
+
+ /** Tell primary current replicas. */
+ private void sendReplicasToPrimary(NodeProcess primary, NodeProcess... replicas) throws IOException {
+ try (Connection c = new Connection(primary.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_SET_REPLICAS);
+ c.out.writeVInt(replicas.length);
+ for(int id=0;id<replicas.length;id++) {
+ NodeProcess replica = replicas[id];
+ c.out.writeVInt(replica.id);
+ c.out.writeVInt(replica.tcpPort);
+ }
+ c.flush();
+ c.in.readByte();
+ }
+ }
+
+ /** Verifies this node is currently searching the specified version with the specified total hit count, or that it eventually does when
+ * keepTrying is true. */
+ private void assertVersionAndHits(NodeProcess node, long expectedVersion, int expectedHitCount) throws Exception {
+ try (Connection c = new Connection(node.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_SEARCH_ALL);
+ c.flush();
+ long version = c.in.readVLong();
+ int hitCount = c.in.readVInt();
+ if (expectedVersion != -1) {
- assertEquals("hitCount=" + hitCount, expectedVersion, version);
++ assertEquals("wrong searcher version, with hitCount=" + hitCount, expectedVersion, version);
+ }
+ assertEquals(expectedHitCount, hitCount);
+ }
+ }
+
+ private void waitForVersionAndHits(NodeProcess node, long expectedVersion, int expectedHitCount) throws Exception {
+ try (Connection c = new Connection(node.tcpPort)) {
+ while (true) {
+ c.out.writeByte(SimplePrimaryNode.CMD_SEARCH_ALL);
+ c.flush();
+ long version = c.in.readVLong();
+ int hitCount = c.in.readVInt();
+
+ if (version == expectedVersion) {
+ assertEquals(expectedHitCount, hitCount);
+ break;
+ }
+
+ assertTrue(version < expectedVersion);
+ Thread.sleep(10);
+ }
+ }
+ }
+
+ static void message(String message) {
+ long now = System.nanoTime();
+ System.out.println(String.format(Locale.ROOT,
+ "%5.3fs : parent [%11s] %s",
+ (now-Node.globalStartNS)/1000000000.,
+ Thread.currentThread().getName(),
+ message));
+ }
+}