You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/02/11 17:42:37 UTC
[03/31] lucene-solr git commit: current patch
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/Node.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/Node.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/Node.java
new file mode 100644
index 0000000..742b19f
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/Node.java
@@ -0,0 +1,213 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.ReferenceManager;
+import org.apache.lucene.search.SearcherFactory;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.RAMOutputStream;
+import org.apache.lucene.util.StringHelper;
+
+/** Common base class for {@link PrimaryNode} and {@link ReplicaNode}. */
+
+abstract class Node implements Closeable {
+
+ static boolean VERBOSE_FILES = true;
+ static boolean VERBOSE_CONNECTIONS = false;
+
+ // Keys we store into IndexWriter's commit user data:
+
+ /** Key to store the primary gen in the commit data, which increments every time we promote a new primary, so replicas can detect when the
+ * primary they were talking to is changed */
+ public static String PRIMARY_GEN_KEY = "__primaryGen";
+
+ /** Key to store the version in the commit data, which increments every time we open a new NRT reader */
+ public static String VERSION_KEY = "__version";
+
+ /** Compact ordinal for this node */
+ protected final int id;
+
+ protected final Directory dir;
+
+ protected final SearcherFactory searcherFactory;
+
+ // Tracks NRT readers, opened from IW (primary) or opened from replicated SegmentInfos pulled across the wire (replica):
+ protected ReferenceManager<IndexSearcher> mgr;
+
+ /** Startup time of original test, carefully propogated to all nodes to produce consistent "seconds since start time" in messages */
+ public static long globalStartNS;
+
+ /** When this node was started */
+ public static final long localStartNS = System.nanoTime();
+
+ // public static final long globalStartNS;
+
+ // For debugging:
+ volatile String state = "idle";
+
+ /** File metadata for last sync that succeeded; we use this as a cache */
+ protected volatile Map<String,FileMetaData> lastFileMetaData;
+
+ public Node(int id, Directory dir, SearcherFactory searcherFactory) {
+ this.id = id;
+ this.dir = dir;
+ this.searcherFactory = searcherFactory;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "(id=" + id + ")";
+ }
+
+ public abstract void commit() throws IOException;
+
+ public static void nodeMessage(String message) {
+ long now = System.nanoTime();
+ System.out.println(String.format(Locale.ROOT,
+ "%5.3fs %5.1fs: [%11s] %s",
+ (now-globalStartNS)/1000000000.,
+ (now-localStartNS)/1000000000.,
+ Thread.currentThread().getName(),
+ message));
+
+ }
+
+ public static void nodeMessage(int id, String message) {
+ long now = System.nanoTime();
+ System.out.println(String.format(Locale.ROOT,
+ "%5.3fs %5.1fs: N%d [%11s] %s",
+ (now-globalStartNS)/1000000000.,
+ (now-localStartNS)/1000000000.,
+ id,
+ Thread.currentThread().getName(),
+ message));
+
+ }
+
+ protected void message(String message) {
+ long now = System.nanoTime();
+ System.out.println(String.format(Locale.ROOT,
+ "%5.3fs %5.1fs: %7s %2s [%11s] %s",
+ (now-globalStartNS)/1000000000.,
+ (now-localStartNS)/1000000000.,
+ state, name(),
+ Thread.currentThread().getName(), message));
+ }
+
+ public String name() {
+ char mode = this instanceof PrimaryNode ? 'P' : 'R';
+ return mode + Integer.toString(id);
+ }
+
+ public abstract boolean isClosed();
+
+ public long getCurrentSearchingVersion() throws IOException {
+ IndexSearcher searcher = mgr.acquire();
+ try {
+ return ((DirectoryReader) searcher.getIndexReader()).getVersion();
+ } finally {
+ mgr.release(searcher);
+ }
+ }
+
+ public static String bytesToString(long bytes) {
+ if (bytes < 1024) {
+ return bytes + " b";
+ } else if (bytes < 1024 * 1024) {
+ return String.format(Locale.ROOT, "%.1f KB", bytes/1024.);
+ } else if (bytes < 1024 * 1024 * 1024) {
+ return String.format(Locale.ROOT, "%.1f MB", bytes/1024./1024.);
+ } else {
+ return String.format(Locale.ROOT, "%.1f GB", bytes/1024./1024./1024.);
+ }
+ }
+
+ /** Opens the specified file, reads its identifying information, including file length, full index header (includes the unique segment
+ * ID) and the full footer (includes checksum), and returns the resulting {@link FileMetaData}.
+ *
+ * <p>This returns null, logging a message, if there are any problems (the file does not exist, is corrupt, truncated, etc.).</p> */
+ public FileMetaData readLocalFileMetaData(String fileName) throws IOException {
+
+ Map<String,FileMetaData> cache = lastFileMetaData;
+ FileMetaData result;
+ if (cache != null) {
+ // We may already have this file cached from the last NRT point:
+ result = cache.get(fileName);
+ } else {
+ result = null;
+ }
+
+ if (result == null) {
+ // Pull from the filesystem
+ long checksum;
+ long length;
+ byte[] header;
+ byte[] footer;
+ try (IndexInput in = dir.openInput(fileName, IOContext.DEFAULT)) {
+ try {
+ length = in.length();
+ header = CodecUtil.readIndexHeader(in);
+ footer = CodecUtil.readFooter(in);
+ checksum = CodecUtil.retrieveChecksum(in);
+ } catch (EOFException | CorruptIndexException cie) {
+ // File exists but is busted: we must copy it. This happens when node had crashed, corrupting an un-fsync'd file. On init we try
+ // to delete such unreferenced files, but virus checker can block that, leaving this bad file.
+ if (VERBOSE_FILES) {
+ message("file " + fileName + ": will copy [existing file is corrupt]");
+ }
+ return null;
+ }
+ if (VERBOSE_FILES) {
+ message("file " + fileName + " has length=" + bytesToString(length));
+ }
+ } catch (FileNotFoundException | NoSuchFileException e) {
+ if (VERBOSE_FILES) {
+ message("file " + fileName + ": will copy [file does not exist]");
+ }
+ return null;
+ }
+
+ // NOTE: checksum is redundant w/ footer, but we break it out separately because when the bits cross the wire we need direct access to
+ // checksum when copying to catch bit flips:
+ result = new FileMetaData(header, footer, length, checksum);
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/NodeCommunicationException.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/NodeCommunicationException.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/NodeCommunicationException.java
new file mode 100644
index 0000000..67a9d0a
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/NodeCommunicationException.java
@@ -0,0 +1,26 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class NodeCommunicationException extends RuntimeException {
+ public NodeCommunicationException(String when, Throwable cause) {
+ super(when);
+ assert cause != null;
+ initCause(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PreCopyMergedSegmentWarmer.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PreCopyMergedSegmentWarmer.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PreCopyMergedSegmentWarmer.java
new file mode 100644
index 0000000..1918ede
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PreCopyMergedSegmentWarmer.java
@@ -0,0 +1,80 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** A merged segment warmer that pre-copies the merged segment out to
+ * replicas before primary cuts over to the merged segment. This
+ * ensures that NRT reopen time on replicas is only in proportion to
+ * flushed segment sizes, not merged segments. */
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
+import org.apache.lucene.index.LeafReader;
+import org.apache.lucene.index.SegmentCommitInfo;
+import org.apache.lucene.index.SegmentReader;
+import org.apache.lucene.replicator.nrt.CopyJob.OnceDone;
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.DataOutput;
+import org.apache.lucene.store.InputStreamDataInput;
+import org.apache.lucene.store.OutputStreamDataOutput;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+// TODO: or ... replica node can do merging locally? tricky to keep things in sync, when one node merges more slowly than others...
+
+class PreCopyMergedSegmentWarmer extends IndexReaderWarmer {
+
+ private final PrimaryNode primary;
+
+ public PreCopyMergedSegmentWarmer(PrimaryNode primary) {
+ this.primary = primary;
+ }
+
+ @Override
+ public void warm(LeafReader reader) throws IOException {
+ long startNS = System.nanoTime();
+ final SegmentCommitInfo info = ((SegmentReader) reader).getSegmentInfo();
+ //System.out.println("TEST: warm merged segment files " + info);
+ Map<String,FileMetaData> filesMetaData = new HashMap<>();
+ for(String fileName : info.files()) {
+ FileMetaData metaData = primary.readLocalFileMetaData(fileName);
+ assert metaData != null;
+ assert filesMetaData.containsKey(fileName) == false;
+ filesMetaData.put(fileName, metaData);
+ }
+
+ // nocommit if one replica is very slow then it dos's all other replicas?
+
+ primary.preCopyMergedSegmentFiles(info, filesMetaData);
+ primary.message(String.format(Locale.ROOT, "top: done warm merge " + info + ": took %.3f sec, %.1f MB", (System.nanoTime()-startNS)/1000000000., info.sizeInBytes()/1024/1024.));
+ primary.finishedMergedFiles.addAll(filesMetaData.keySet());
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
new file mode 100644
index 0000000..183f16f
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
@@ -0,0 +1,316 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.LogMergePolicy;
+import org.apache.lucene.index.MergePolicy;
+import org.apache.lucene.index.SegmentCommitInfo;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.index.StandardDirectoryReader;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.SearcherFactory;
+import org.apache.lucene.search.SearcherManager;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.RAMFile;
+import org.apache.lucene.store.RAMOutputStream;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.PrintStreamInfoStream;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/*
+ * This just asks IndexWriter to open new NRT reader, in order to publish a new NRT point. This could be improved, if we separated out 1)
+ * nrt flush (and incRef the SIS) from 2) opening a new reader, but this is tricky with IW's concurrency, and it would also be hard-ish to share
+ * IW's reader pool with our searcher manager. So we do the simpler solution now, but that adds some unecessary latency to NRT refresh on
+ * replicas since step 2) could otherwise be done concurrently with replicas copying files over.
+ */
+
+/** Node that holds an IndexWriter, indexing documents into its local index. */
+public abstract class PrimaryNode extends Node {
+
+ // Current NRT segment infos, incRef'd with IndexWriter.deleter:
+ private SegmentInfos curInfos;
+
+ final IndexWriter writer;
+
+ // IncRef'd state of the last published NRT point; when a replica comes asking, we give it this as the current NRT point:
+ private CopyState copyState;
+
+ protected final long primaryGen;
+
+ /** Contains merged segments that have been copied to all running replicas (as of when that merge started warming). */
+ final Set<String> finishedMergedFiles = Collections.synchronizedSet(new HashSet<String>());
+
+ private final AtomicInteger copyingCount = new AtomicInteger();
+
+ public PrimaryNode(IndexWriter writer, int id, long primaryGen, long forcePrimaryVersion, SearcherFactory searcherFactory) throws IOException {
+ super(id, writer.getDirectory(), searcherFactory);
+ message("top: now init primary");
+ this.writer = writer;
+ this.primaryGen = primaryGen;
+
+ try {
+ // So that when primary node's IndexWriter finishes a merge, but before it cuts over to the merged segment,
+ // it copies it out to the replicas. This ensures the whole system's NRT latency remains low even when a
+ // large merge completes:
+ writer.getConfig().setMergedSegmentWarmer(new PreCopyMergedSegmentWarmer(this));
+
+ message("IWC:\n" + writer.getConfig());
+ message("dir:\n" + writer.getDirectory());
+ message("commitData: " + writer.getCommitData());
+
+ // Record our primaryGen in the userData, and set initial version to 0:
+ Map<String,String> commitData = new HashMap<>(writer.getCommitData());
+ commitData.put(PRIMARY_GEN_KEY, Long.toString(primaryGen));
+ if (commitData.get(VERSION_KEY) == null) {
+ commitData.put(VERSION_KEY, "0");
+ message("add initial commitData version=0");
+ } else {
+ message("keep current commitData version=" + commitData.get(VERSION_KEY));
+ }
+ writer.setCommitData(commitData, false);
+
+ // We forcefully advance the SIS version to an unused future version. This is necessary if the previous primary crashed and we are
+ // starting up on an "older" index, else versions can be illegally reused but show different results:
+ if (forcePrimaryVersion != -1) {
+ message("now forcePrimaryVersion to version=" + forcePrimaryVersion);
+ writer.advanceSegmentInfosVersion(forcePrimaryVersion);
+ }
+
+ mgr = new SearcherManager(writer, true, true, searcherFactory);
+ setCurrentInfos(Collections.<String>emptySet());
+ message("init: infos version=" + curInfos.getVersion());
+
+ IndexSearcher s = mgr.acquire();
+ try {
+ message("init: marker hit count: " + s.search(new TermQuery(new Term("marker", "marker")), 1).totalHits);
+ } finally {
+ mgr.release(s);
+ }
+
+ } catch (Throwable t) {
+ message("init: exception");
+ t.printStackTrace(System.out);
+ throw new RuntimeException(t);
+ }
+ }
+
+ // TODO: in the future, we should separate "flush" (returns an incRef'd SegmentInfos) from "refresh" (open new NRT reader from
+ // IndexWriter) so that the latter can be done concurrently while copying files out to replicas, minimizing the refresh time from the
+ // replicas. But fixing this is tricky because e.g. IndexWriter may complete a big merge just after returning the incRef'd SegmentInfos
+ // and before we can open a new reader causing us to close the just-merged readers only to then open them again from the (now stale)
+ // SegmentInfos. To fix this "properly" I think IW.inc/decRefDeleter must also incread the ReaderPool entry
+
+ /** Flush all index operations to disk and opens a new near-real-time reader.
+ * new NRT point, to make the changes visible to searching. Returns true if there were changes. */
+ public boolean flushAndRefresh() throws IOException {
+ message("top: now flushAndRefresh");
+ Set<String> completedMergeFiles;
+ synchronized(finishedMergedFiles) {
+ completedMergeFiles = Collections.unmodifiableSet(new HashSet<>(finishedMergedFiles));
+ }
+ mgr.maybeRefreshBlocking();
+ boolean result = setCurrentInfos(completedMergeFiles);
+ if (result) {
+ message("top: opened NRT reader version=" + curInfos.getVersion());
+ finishedMergedFiles.removeAll(completedMergeFiles);
+ message("flushAndRefresh: version=" + curInfos.getVersion() + " completedMergeFiles=" + completedMergeFiles + " finishedMergedFiles=" + finishedMergedFiles);
+ } else {
+ message("top: no changes in flushAndRefresh; still version=" + curInfos.getVersion());
+ }
+ return result;
+ }
+
+ public long getCopyStateVersion() {
+ return copyState.version;
+ }
+
+ public synchronized long getLastCommitVersion() {
+ String s = curInfos.getUserData().get(VERSION_KEY);
+ // In ctor we always install an initial version:
+ assert s != null;
+ return Long.parseLong(s);
+ }
+
+ @Override
+ public void commit() throws IOException {
+ Map<String,String> commitData = new HashMap<>();
+ commitData.put(PRIMARY_GEN_KEY, Long.toString(primaryGen));
+ // TODO (opto): it's a bit wasteful that we put "last refresh" version here, not the actual version we are committing, because it means
+ // on xlog replay we are replaying more ops than necessary.
+ commitData.put(VERSION_KEY, Long.toString(copyState.version));
+ message("top: commit commitData=" + commitData);
+ // nocommit this is now an NRT-visible change! make test where nothing is indexing and confirm we don't do silly commit + refresh loop forever!
+ writer.setCommitData(commitData, false);
+ writer.commit();
+ }
+
+ /** IncRef the current CopyState and return it */
+ public synchronized CopyState getCopyState() throws IOException {
+ ensureOpen(false);
+ //message("top: getCopyState replicaID=" + replicaID + " replicaNodeID=" + replicaNodeID + " version=" + curInfos.getVersion() + " infos=" + curInfos.toString());
+ assert curInfos == copyState.infos;
+ writer.incRefDeleter(copyState.infos);
+ int count = copyingCount.incrementAndGet();
+ assert count > 0;
+ return copyState;
+ }
+
+ /** Called once replica is done (or failed) copying an NRT point */
+ public void releaseCopyState(CopyState copyState) throws IOException {
+ //message("top: releaseCopyState version=" + copyState.version);
+ assert copyState.infos != null;
+ writer.decRefDeleter(copyState.infos);
+ int count = copyingCount.decrementAndGet();
+ assert count >= 0;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return isClosed(false);
+ }
+
+ boolean isClosed(boolean allowClosing) {
+ return "closed".equals(state) || (allowClosing == false && "closing".equals(state));
+ }
+
+ private void ensureOpen(boolean allowClosing) {
+ if (isClosed(allowClosing)) {
+ throw new AlreadyClosedException(state);
+ }
+ }
+
+ /** Steals incoming infos refCount; returns true if there were changes. */
+ private synchronized boolean setCurrentInfos(Set<String> completedMergeFiles) throws IOException {
+
+ IndexSearcher searcher = null;
+ SegmentInfos infos;
+ try {
+ searcher = mgr.acquire();
+ infos = ((StandardDirectoryReader) searcher.getIndexReader()).getSegmentInfos();
+ } finally {
+ if (searcher != null) {
+ mgr.release(searcher);
+ }
+ }
+ if (curInfos != null && infos.getVersion() == curInfos.getVersion()) {
+ // no change
+ message("top: skip switch to infos: version=" + infos.getVersion() + " is unchanged: " + infos.toString());
+ return false;
+ }
+
+ SegmentInfos oldInfos = curInfos;
+ writer.incRefDeleter(infos);
+ curInfos = infos;
+ if (oldInfos != null) {
+ writer.decRefDeleter(oldInfos);
+ }
+
+ message("top: switch to infos=" + infos.toString() + " version=" + infos.getVersion());
+
+ // Serialize the SegmentInfos:
+ RAMOutputStream out = new RAMOutputStream(new RAMFile(), true);
+ infos.write(dir, out);
+ byte[] infosBytes = new byte[(int) out.getFilePointer()];
+ out.writeTo(infosBytes, 0);
+
+ Map<String,FileMetaData> filesMetaData = new HashMap<String,FileMetaData>();
+ for(SegmentCommitInfo info : infos) {
+ for(String fileName : info.files()) {
+ FileMetaData metaData = readLocalFileMetaData(fileName);
+ // NOTE: we hold a refCount on this infos, so this file better exist:
+ assert metaData != null;
+ assert filesMetaData.containsKey(fileName) == false;
+ filesMetaData.put(fileName, metaData);
+ }
+ }
+
+ lastFileMetaData = Collections.unmodifiableMap(filesMetaData);
+
+ message("top: set copyState primaryGen=" + primaryGen + " version=" + infos.getVersion() + " files=" + filesMetaData.keySet());
+ copyState = new CopyState(lastFileMetaData,
+ infos.getVersion(), infos.getGeneration(), infosBytes, completedMergeFiles,
+ primaryGen, curInfos);
+ return true;
+ }
+
+ private synchronized void waitForAllRemotesToClose() throws IOException {
+
+ // Wait for replicas to finish or crash:
+ while (true) {
+ int count = copyingCount.get();
+ if (count == 0) {
+ return;
+ }
+ message("pendingCopies: " + count);
+
+ try {
+ wait(10);
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ state = "closing";
+ message("top: close primary");
+
+ synchronized (this) {
+ waitForAllRemotesToClose();
+ if (curInfos != null) {
+ writer.decRefDeleter(curInfos);
+ curInfos = null;
+ }
+ }
+
+ mgr.close();
+
+ writer.rollback();
+ dir.close();
+
+ state = "closed";
+ }
+
+ /** Called when a merge has finished, but before IW switches to the merged segment */
+ protected abstract void preCopyMergedSegmentFiles(SegmentCommitInfo info, Map<String,FileMetaData> files) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaFileDeleter.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaFileDeleter.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaFileDeleter.java
new file mode 100644
index 0000000..005f938
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaFileDeleter.java
@@ -0,0 +1,218 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+
+// TODO: can we factor/share with IFD: this is doing exactly the same thing, but on the replica side
+
+// TODO: once LUCENE-6835 is in, this class becomes a lot simpler?
+
+class ReplicaFileDeleter {
+ private final Map<String,Integer> refCounts = new HashMap<String,Integer>();
+ private final Set<String> pending = new HashSet<String>();
+ private final Directory dir;
+ private final Node node;
+
+ public ReplicaFileDeleter(Node node, Directory dir) throws IOException {
+ this.dir = dir;
+ this.node = node;
+ }
+
+ /** Used only by asserts: returns true if the file exists
+ * (can be opened), false if it cannot be opened, and
+ * (unlike Java's File.exists) throws IOException if
+ * there's some unexpected error. */
+ static boolean slowFileExists(Directory dir, String fileName) throws IOException {
+ try {
+ dir.openInput(fileName, IOContext.DEFAULT).close();
+ return true;
+ } catch (NoSuchFileException | FileNotFoundException e) {
+ return false;
+ }
+ }
+
+ public synchronized void incRef(Collection<String> fileNames) throws IOException {
+ for(String fileName : fileNames) {
+
+ if (pending.contains(fileName)) {
+ throw new IllegalStateException("cannot incRef file \"" + fileName + "\": it is pending delete");
+ }
+
+ assert slowFileExists(dir, fileName): "file " + fileName + " does not exist!";
+
+ Integer curCount = refCounts.get(fileName);
+ if (curCount == null) {
+ refCounts.put(fileName, 1);
+ } else {
+ refCounts.put(fileName, curCount.intValue() + 1);
+ }
+ }
+ }
+
+ public synchronized void decRef(Collection<String> fileNames) {
+ // We don't delete the files immediately when their RC drops to 0; instead, we add to the pending set, and then call deletePending in
+ // the end:
+ for(String fileName : fileNames) {
+ Integer curCount = refCounts.get(fileName);
+ assert curCount != null: "fileName=" + fileName;
+ assert curCount.intValue() > 0;
+ if (curCount.intValue() == 1) {
+ refCounts.remove(fileName);
+ pending.add(fileName);
+ } else {
+ refCounts.put(fileName, curCount.intValue() - 1);
+ }
+ }
+
+ deletePending();
+
+ // TODO: this local IR could incRef files here, like we do now with IW ... then we can assert this again:
+
+ // we can't assert this, e.g a search can be running when we switch to a new NRT point, holding a previous IndexReader still open for
+ // a bit:
+ /*
+ // We should never attempt deletion of a still-open file:
+ Set<String> delOpen = ((MockDirectoryWrapper) dir).getOpenDeletedFiles();
+ if (delOpen.isEmpty() == false) {
+ node.message("fail: we tried to delete these still-open files: " + delOpen);
+ throw new AssertionError("we tried to delete these still-open files: " + delOpen);
+ }
+ */
+ }
+
+ private synchronized boolean delete(String fileName) {
+ try {
+ if (Node.VERBOSE_FILES) {
+ node.message("file " + fileName + ": now delete");
+ }
+ dir.deleteFile(fileName);
+ pending.remove(fileName);
+ return true;
+ } catch (FileNotFoundException|NoSuchFileException missing) {
+ // This should never happen: we should only be asked to track files that do exist
+ node.message("file " + fileName + ": delete failed: " + missing);
+ throw new IllegalStateException("file " + fileName + ": we attempted delete but the file does not exist?", missing);
+ } catch (IOException ioe) {
+ if (Node.VERBOSE_FILES) {
+ node.message("file " + fileName + ": delete failed: " + ioe + "; will retry later");
+ }
+ pending.add(fileName);
+ return false;
+ }
+ }
+
+ public synchronized Integer getRefCount(String fileName) {
+ return refCounts.get(fileName);
+ }
+
+ public synchronized boolean isPending(String fileName) {
+ return pending.contains(fileName);
+ }
+
+ public synchronized void deletePending() {
+ if (Node.VERBOSE_FILES) {
+ node.message("now deletePending: " + pending.size() + " files to try: " + pending);
+ }
+
+ // Clone the set because it will change as we iterate:
+ List<String> toDelete = new ArrayList<>(pending);
+
+ // First pass: delete any segments_N files. We do these first to be certain stale commit points are removed
+ // before we remove any files they reference. If any delete of segments_N fails, we leave all other files
+ // undeleted so index is never in a corrupt state:
+ for (String fileName : toDelete) {
+ Integer rc = refCounts.get(fileName);
+ if (rc != null && rc > 0) {
+ // Should never happen! This means we are about to pending-delete a referenced index file
+ throw new IllegalStateException("file \"" + fileName + "\" is in pending delete set but has non-zero refCount=" + rc);
+ } else if (fileName.startsWith(IndexFileNames.SEGMENTS)) {
+ if (delete(fileName) == false) {
+ if (Node.VERBOSE_FILES) {
+ node.message("failed to remove commit point \"" + fileName + "\"; skipping deletion of all other pending files");
+ }
+ return;
+ }
+ }
+ }
+
+ // Only delete other files if we were able to remove the segments_N files; this way we never
+ // leave a corrupt commit in the index even in the presense of virus checkers:
+ for(String fileName : toDelete) {
+ if (fileName.startsWith(IndexFileNames.SEGMENTS) == false) {
+ delete(fileName);
+ }
+ }
+
+ Set<String> copy = new HashSet<String>(pending);
+ pending.clear();
+ for(String fileName : copy) {
+ delete(fileName);
+ }
+ }
+
+ /** Necessary in case we had tried to delete this fileName before, it failed, but then it was later overwritten (because primary changed
+ * and new primary didn't know this segment name had been previously attempted) and now has > 0 refCount */
+ public synchronized void clearPending(Collection<String> fileNames) {
+ for(String fileName : fileNames) {
+ if (pending.remove(fileName)) {
+ node.message("file " + fileName + ": deleter.clearPending now clear from pending");
+ }
+ }
+ }
+
+ public synchronized void deleteIfNoRef(String fileName) {
+ if (refCounts.containsKey(fileName) == false) {
+ deleteNewFile(fileName);
+ }
+ }
+
+ public synchronized void deleteNewFile(String fileName) {
+ delete(fileName);
+ }
+
+ public synchronized Set<String> getPending() {
+ return new HashSet<String>(pending);
+ }
+
+ public synchronized void deleteUnknownFiles(String segmentsFileName) throws IOException {
+ for(String fileName : dir.listAll()) {
+ if (refCounts.containsKey(fileName) == false &&
+ fileName.equals("write.lock") == false &&
+ fileName.equals(segmentsFileName) == false) {
+ node.message("will delete unknown file \"" + fileName + "\"");
+ pending.add(fileName);
+ }
+ }
+
+ deletePending();
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
new file mode 100644
index 0000000..af142d5
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
@@ -0,0 +1,772 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.SearcherFactory;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.BufferedChecksumIndexInput;
+import org.apache.lucene.store.ByteArrayDataInput;
+import org.apache.lucene.store.ByteArrayIndexInput;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.DataOutput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.InputStreamDataInput;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockObtainFailedException;
+import org.apache.lucene.store.OutputStreamDataOutput;
+import org.apache.lucene.store.RAMOutputStream;
+import org.apache.lucene.store.RateLimiter;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/** Replica node, that pulls index changes from the primary node by copying newly flushed or merged index files */
+
+abstract class ReplicaNode extends Node {
+
+ ReplicaFileDeleter deleter;
+
+ /** IncRef'd files in the current commit point: */
+ private final Collection<String> lastCommitFiles = new HashSet<>();
+
+ /** IncRef'd files in the current NRT point: */
+ protected final Collection<String> lastNRTFiles = new HashSet<>();
+
+ /** Currently running merge pre-copy jobs */
+ protected final Set<CopyJob> mergeCopyJobs = Collections.synchronizedSet(new HashSet<>());
+
+ /** Non-null when we are currently copying files from a new NRT point: */
+ protected CopyJob curNRTCopy;
+
+ /** We hold this to ensure an external IndexWriter cannot also open on our directory: */
+ private final Lock writeFileLock;
+
+ /** Merged segment files that we pre-copied, but have not yet made visible in a new NRT point. */
+ final Set<String> pendingMergeFiles = Collections.synchronizedSet(new HashSet<String>());
+
+ /** Primary gen last time we successfully replicated: */
+ protected long lastPrimaryGen;
+
+ public ReplicaNode(int id, Directory dir, SearcherFactory searcherFactory) throws IOException {
+ super(id, dir, searcherFactory);
+
+ boolean success = false;
+
+ try {
+ message("top: init replica dir=" + dir);
+
+ // Obtain a write lock on this index since we "act like" an IndexWriter, to prevent any other IndexWriter or ReplicaNode from using it:
+ writeFileLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME);
+
+ state = "init";
+ deleter = new ReplicaFileDeleter(this, dir);
+ } catch (Throwable t) {
+ message("exc on init:");
+ t.printStackTrace(System.out);
+ throw t;
+ } finally {
+ if (success == false) {
+ IOUtils.closeWhileHandlingException(this);
+ }
+ }
+ }
+
+ /** Start up this replica, which possibly requires heavy copying of files from the primary node, if we were down for a long time */
+ protected synchronized void start(long curPrimaryGen) throws IOException {
+
+ if (state.equals("init") == false) {
+ throw new IllegalStateException("already started");
+ }
+
+ message("top: now start");
+ try {
+
+ // Figure out what state our local index is in now:
+ String segmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(dir);
+
+ // Also look for any pending_segments_N, in case we crashed mid-commit. We must "inflate" our infos gen to at least this, since
+ // otherwise we may wind up re-using the pending_segments_N file name on commit, and then our deleter can get angry because it still
+ // wants to delete this file:
+ long maxPendingGen = -1;
+ for(String fileName : dir.listAll()) {
+ if (fileName.startsWith(IndexFileNames.PENDING_SEGMENTS)) {
+ long gen = Long.parseLong(fileName.substring(IndexFileNames.PENDING_SEGMENTS.length()+1), Character.MAX_RADIX);
+ if (gen > maxPendingGen) {
+ maxPendingGen = gen;
+ }
+ }
+ }
+
+ SegmentInfos infos;
+ if (segmentsFileName == null) {
+ // No index here yet:
+ infos = new SegmentInfos();
+ message("top: init: no segments in index");
+ } else {
+ message("top: init: read existing segments commit " + segmentsFileName);
+ infos = SegmentInfos.readCommit(dir, segmentsFileName);
+ message("top: init: segments: " + infos.toString() + " version=" + infos.getVersion());
+ Collection<String> indexFiles = infos.files(false);
+
+ lastCommitFiles.add(segmentsFileName);
+ lastCommitFiles.addAll(indexFiles);
+
+ // Always protect the last commit:
+ deleter.incRef(lastCommitFiles);
+
+ lastNRTFiles.addAll(indexFiles);
+ deleter.incRef(lastNRTFiles);
+ message("top: commitFiles=" + lastCommitFiles);
+ message("top: nrtFiles=" + lastNRTFiles);
+ }
+
+ message("top: delete unknown files on init: all files=" + Arrays.toString(dir.listAll()));
+ deleter.deleteUnknownFiles(segmentsFileName);
+ message("top: done delete unknown files on init: all files=" + Arrays.toString(dir.listAll()));
+
+ // nocommit make test where index has all docs deleted (all segments dropped, so 0 segments) and is then replicated
+
+ String s = infos.getUserData().get(PRIMARY_GEN_KEY);
+ long myPrimaryGen;
+ if (s == null) {
+ assert infos.size() == 0;
+ myPrimaryGen = -1;
+ } else {
+ myPrimaryGen = Long.parseLong(s);
+ }
+ message("top: myPrimaryGen=" + myPrimaryGen);
+
+ boolean doCommit;
+
+ if (infos.size() > 0 && myPrimaryGen != -1 && myPrimaryGen != curPrimaryGen) {
+
+ assert myPrimaryGen < curPrimaryGen;
+
+ // Primary changed while we were down. In this case, we must sync from primary before opening a reader, because it's possible current
+ // files we have will need to be overwritten with different ones (if index rolled back and "forked"), and we can't overwrite open
+ // files on Windows:
+
+ final long initSyncStartNS = System.nanoTime();
+
+ message("top: init: primary changed while we were down myPrimaryGen=" + myPrimaryGen +
+ " vs curPrimaryGen=" + curPrimaryGen +
+ "; sync now before mgr init");
+
+ // Try until we succeed in copying over the latest NRT point:
+ CopyJob job = null;
+
+ // We may need to overwrite files referenced by our latest commit, either right now on initial sync, or on a later sync. To make
+ // sure the index is never even in an "apparently" corrupt state (where an old segments_N references invalid files) we forcefully
+ // remove the commit now, and refuse to start the replica if this delete fails:
+ message("top: now delete starting commit point " + segmentsFileName);
+
+ // If this throws exc (e.g. due to virus checker), we cannot start this replica:
+ assert deleter.getRefCount(segmentsFileName) == 1;
+ deleter.decRef(Collections.singleton(segmentsFileName));
+ if (deleter.isPending(segmentsFileName)) {
+ // If e.g. virus checker blocks us from deleting, we absolutely cannot start this node else we can cause corruption:
+ throw new RuntimeException("replica cannot start: existing segments file=" + segmentsFileName + " must be removed in order to start, but the file delete failed");
+ }
+ // So we don't later try to decRef it (illegally) again:
+ boolean didRemove = lastCommitFiles.remove(segmentsFileName);
+ assert didRemove;
+
+ while (true) {
+ job = newCopyJob("sync on startup replica=" + name() + " myVersion=" + infos.getVersion(),
+ null,
+ null,
+ true,
+ null);
+ job.start();
+
+ message("top: init: sync sis.version=" + job.getCopyState().version);
+
+ Collection<String> fileNamesToCopy = job.getFileNamesToCopy();
+
+ // Force this copy job to finish while we wait, now. Note that this can be very time consuming!
+ // NOTE: newNRTPoint detects we are still in init (mgr is null) and does not cancel our copy if a flush happens
+ try {
+ job.runBlocking();
+ job.finish();
+
+ // Success!
+ break;
+ } catch (IOException ioe) {
+ job.cancel("startup failed", ioe);
+ if (ioe.getMessage().contains("checksum mismatch after file copy")) {
+ // OK-ish
+ message("top: failed to copy: " + ioe + "; retrying");
+ } else {
+ throw ioe;
+ }
+ }
+ }
+
+ lastPrimaryGen = job.getCopyState().primaryGen;
+ byte[] infosBytes = job.getCopyState().infosBytes;
+
+ SegmentInfos syncInfos = SegmentInfos.readCommit(dir,
+ new BufferedChecksumIndexInput(new ByteArrayIndexInput("SegmentInfos", job.getCopyState().infosBytes)),
+ job.getCopyState().gen);
+
+ // Must always commit to a larger generation than what's currently in the index:
+ syncInfos.updateGeneration(infos);
+ infos = syncInfos;
+
+ assert infos.getVersion() == job.getCopyState().version;
+ message(" version=" + infos.getVersion() + " segments=" + infos.toString());
+ message("top: init: incRef nrtFiles=" + job.getFileNames());
+ deleter.incRef(job.getFileNames());
+ message("top: init: decRef lastNRTFiles=" + lastNRTFiles);
+ deleter.decRef(lastNRTFiles);
+
+ lastNRTFiles.clear();
+ lastNRTFiles.addAll(job.getFileNames());
+
+ message("top: init: set lastNRTFiles=" + lastNRTFiles);
+ lastFileMetaData = job.getCopyState().files;
+ message(String.format(Locale.ROOT, "top: %d: start: done sync: took %.3fs for %s, opened NRT reader version=%d",
+ id,
+ (System.nanoTime()-initSyncStartNS)/1000000000.0,
+ bytesToString(job.getTotalBytesCopied()),
+ job.getCopyState().version));
+
+ doCommit = true;
+ } else {
+ doCommit = false;
+ lastPrimaryGen = curPrimaryGen;
+ message("top: same primary as before");
+ }
+
+ if (infos.getGeneration() < maxPendingGen) {
+ message("top: move infos generation from " + infos.getGeneration() + " to " + maxPendingGen);
+ infos.setNextWriteGeneration(maxPendingGen);
+ }
+
+ // Notify primary we started, to give it a chance to send any warming merges our way to reduce NRT latency of first sync:
+ sendNewReplica();
+
+ // Finally, we are open for business, since our index now "agrees" with the primary:
+ mgr = new SegmentInfosSearcherManager(dir, this, infos, searcherFactory);
+
+ // Must commit after init mgr:
+ if (doCommit) {
+ // Very important to commit what we just sync'd over, because we removed the pre-existing commit point above if we had to
+ // overwrite any files it referenced:
+ commit();
+ }
+
+ message("top: done start");
+ state = "idle";
+ } catch (Throwable t) {
+ message("exc on start:");
+ t.printStackTrace(System.out);
+ throw new RuntimeException(t);
+ }
+ }
+
+ final Object commitLock = new Object();
+
+ @Override
+ public void commit() throws IOException {
+
+ synchronized(commitLock) {
+
+ SegmentInfos infos;
+ Collection<String> indexFiles;
+
+ synchronized (this) {
+ infos = ((SegmentInfosSearcherManager) mgr).getCurrentInfos();
+ indexFiles = infos.files(false);
+ deleter.incRef(indexFiles);
+ }
+
+ message("top: commit primaryGen=" + lastPrimaryGen + " infos=" + infos.toString() + " files=" + indexFiles);
+
+ // fsync all index files we are now referencing
+ dir.sync(indexFiles);
+
+ Map<String,String> commitData = new HashMap<>();
+ commitData.put(PRIMARY_GEN_KEY, Long.toString(lastPrimaryGen));
+ commitData.put(VERSION_KEY, Long.toString(getCurrentSearchingVersion()));
+ infos.setUserData(commitData, false);
+
+ // write and fsync a new segments_N
+ infos.commit(dir);
+
+ // Notify current infos (which may have changed while we were doing dir.sync above) what generation we are up to; this way future
+ // commits are guaranteed to go to the next (unwritten) generations:
+ if (mgr != null) {
+ ((SegmentInfosSearcherManager) mgr).getCurrentInfos().updateGeneration(infos);
+ }
+ String segmentsFileName = infos.getSegmentsFileName();
+ message("top: commit wrote segments file " + segmentsFileName + " version=" + infos.getVersion() + " sis=" + infos.toString() + " commitData=" + commitData);
+ deleter.incRef(Collections.singletonList(segmentsFileName));
+ message("top: commit decRef lastCommitFiles=" + lastCommitFiles);
+ deleter.decRef(lastCommitFiles);
+ lastCommitFiles.clear();
+ lastCommitFiles.addAll(indexFiles);
+ lastCommitFiles.add(segmentsFileName);
+ message("top: commit version=" + infos.getVersion() + " files now " + lastCommitFiles);
+ }
+ }
+
+ void finishNRTCopy(CopyJob job, long startNS) throws IOException {
+ CopyState copyState = job.getCopyState();
+ message("top: finishNRTCopy: version=" + copyState.version + (job.getFailed() ? " FAILED" : "") + " job=" + job);
+
+ // NOTE: if primary crashed while we were still copying then the job will hit an exc trying to read bytes for the files from the primary node,
+ // and the job will be marked as failed here:
+
+ synchronized (this) {
+
+ if ("syncing".equals(state)) {
+ state = "idle";
+ }
+
+ if (curNRTCopy == job) {
+ message("top: now clear curNRTCopy; job=" + job);
+ curNRTCopy = null;
+ } else {
+ assert job.getFailed();
+ message("top: skip clear curNRTCopy: we were cancelled; job=" + job);
+ }
+
+ if (job.getFailed()) {
+ return;
+ }
+
+ // Does final file renames:
+ job.finish();
+
+ // Turn byte[] back to SegmentInfos:
+ byte[] infosBytes = copyState.infosBytes;
+ SegmentInfos infos = SegmentInfos.readCommit(dir,
+ new BufferedChecksumIndexInput(new ByteArrayIndexInput("SegmentInfos", copyState.infosBytes)),
+ copyState.gen);
+ assert infos.getVersion() == copyState.version;
+
+ message(" version=" + infos.getVersion() + " segments=" + infos.toString());
+
+ // Cutover to new searcher:
+ if (mgr != null) {
+ ((SegmentInfosSearcherManager) mgr).setCurrentInfos(infos);
+ }
+
+ // Must first incRef new NRT files, then decRef old ones, to make sure we don't remove an NRT file that's in common to both:
+ Collection<String> newFiles = copyState.files.keySet();
+ message("top: incRef newNRTFiles=" + newFiles);
+ deleter.incRef(newFiles);
+
+ // If any of our new files were previously copied merges, we clear them now, so we don't try to later delete a non-existent file:
+ pendingMergeFiles.removeAll(newFiles);
+ message("top: after remove from pending merges pendingMergeFiles=" + pendingMergeFiles);
+
+ message("top: decRef lastNRTFiles=" + lastNRTFiles);
+ deleter.decRef(lastNRTFiles);
+ lastNRTFiles.clear();
+ lastNRTFiles.addAll(newFiles);
+ message("top: set lastNRTFiles=" + lastNRTFiles);
+
+ // At this point we can remove any completed merge segment files that we still do not reference. This can happen when a merge
+ // finishes, copies its files out to us, but is then merged away (or dropped due to 100% deletions) before we ever cutover to it
+ // in an NRT point:
+ if (copyState.completedMergeFiles.isEmpty() == false) {
+ message("now remove-if-not-ref'd completed merge files: " + copyState.completedMergeFiles);
+ for(String fileName : copyState.completedMergeFiles) {
+ if (pendingMergeFiles.contains(fileName)) {
+ pendingMergeFiles.remove(fileName);
+ deleter.deleteIfNoRef(fileName);
+ }
+ }
+ }
+
+ lastFileMetaData = copyState.files;
+
+ // It's a good time to delete pending files, since we just refreshed and some previously open files are now closed:
+ deleter.deletePending();
+ }
+
+ int markerCount;
+ IndexSearcher s = mgr.acquire();
+ try {
+ markerCount = s.search(new TermQuery(new Term("marker", "marker")), 1).totalHits;
+ } finally {
+ mgr.release(s);
+ }
+
+ message(String.format(Locale.ROOT, "top: done sync: took %.3fs for %s, opened NRT reader version=%d markerCount=%d",
+ (System.nanoTime()-startNS)/1000000000.0,
+ bytesToString(job.getTotalBytesCopied()),
+ copyState.version,
+ markerCount));
+ }
+
+ /** Start a background copying job, to copy the specified files from the current primary node. If files is null then the latest copy
+ * state should be copied. If prevJob is not null, then the new copy job is replacing it and should 1) cancel the previous one, and
+ * 2) optionally salvage e.g. partially copied and, shared with the new copy job, files. */
+ protected abstract CopyJob newCopyJob(String reason, Map<String,FileMetaData> files, Map<String,FileMetaData> prevFiles,
+ boolean highPriority, CopyJob.OnceDone onceDone) throws IOException;
+
+ /** Runs this job async'd */
+ protected abstract void launch(CopyJob job);
+
+ /** Tell primary we (replica) just started, so primary can tell us to warm any already warming merges. This lets us keep low nrt refresh
+ * time for the first nrt sync after we started. */
+ protected abstract void sendNewReplica() throws IOException;
+
+ /** Call this to notify this replica node that a new NRT infos is available on the primary.
+ * We kick off a job (runs in the background) to copy files across, and open a new reader once that's done. */
+ public synchronized CopyJob newNRTPoint(long version) throws IOException {
+
+ if (isClosed()) {
+ throw new AlreadyClosedException("this replica is closed: state=" + state);
+ }
+
+ // Caller should not "publish" us until we have finished .start():
+ assert mgr != null;
+
+ if ("idle".equals(state)) {
+ state = "syncing";
+ }
+
+ long curVersion = getCurrentSearchingVersion();
+
+ message("top: start sync sis.version=" + version);
+
+ if (version == curVersion) {
+ // Caller releases the CopyState:
+ message("top: new NRT point has same version as current; skipping");
+ return null;
+ }
+
+ if (version < curVersion) {
+ // This can happen, if two syncs happen close together, and due to thread scheduling, the incoming older version runs after the newer version
+ message("top: new NRT point (version=" + version + ") is older than current (version=" + version + "); skipping");
+ return null;
+ }
+
+ final long startNS = System.nanoTime();
+
+ message("top: newNRTPoint");
+ CopyJob job = null;
+ try {
+ job = newCopyJob("NRT point sync version=" + version,
+ null,
+ lastFileMetaData,
+ true,
+ new CopyJob.OnceDone() {
+ @Override
+ public void run(CopyJob job) {
+ try {
+ finishNRTCopy(job, startNS);
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+ });
+ } catch (NodeCommunicationException nce) {
+ // E.g. primary could crash/close when we are asking it for the copy state:
+ message("top: ignoring communication exception creating CopyJob: " + nce);
+ nce.printStackTrace(System.out);
+ if (state.equals("syncing")) {
+ state = "idle";
+ }
+ return null;
+ }
+
+ Collection<String> newNRTFiles = job.getFileNames();
+ long newPrimaryGen = job.getCopyState().primaryGen;
+ maybeNewPrimary(newPrimaryGen);
+
+ message("top: newNRTPoint: job files=" + newNRTFiles);
+
+ if (curNRTCopy != null) {
+ job.transferAndCancel(curNRTCopy);
+ assert curNRTCopy.getFailed();
+ }
+
+ curNRTCopy = job;
+
+ for(String fileName : curNRTCopy.getFileNamesToCopy()) {
+ assert lastCommitFiles.contains(fileName) == false: "fileName=" + fileName + " is in lastCommitFiles and is being copied?";
+ synchronized (mergeCopyJobs) {
+ for (CopyJob mergeJob : mergeCopyJobs) {
+ if (mergeJob.getFileNames().contains(fileName)) {
+ // nocommit can't we .transferAndCancel?
+ message("top: now cancel merge copy job=" + mergeJob + ": file " + fileName + " is now being copied via NRT point");
+ mergeJob.cancel("newNRTPoint is copying over the same file", null);
+ }
+ }
+ }
+ }
+
+ try {
+ job.start();
+ } catch (NodeCommunicationException nce) {
+ // E.g. primary could crash/close when we are asking it for the copy state:
+ message("top: ignoring exception starting CopyJob: " + nce);
+ nce.printStackTrace(System.out);
+ if (state.equals("syncing")) {
+ state = "idle";
+ }
+ return null;
+ }
+
+ // Runs in the background jobs thread, maybe slowly/throttled, and calls finishSync once it's done:
+ launch(curNRTCopy);
+ return curNRTCopy;
+ }
+
+ public synchronized boolean isCopying() {
+ return curNRTCopy != null;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return "closed".equals(state) || "closing".equals(state) || "crashing".equals(state) || "crashed".equals(state);
+ }
+
+ @Override
+ public void close() throws IOException {
+ message("top: now close");
+
+ synchronized (this) {
+ state = "closing";
+ if (curNRTCopy != null) {
+ curNRTCopy.cancel("closing", null);
+ }
+ }
+
+ synchronized (this) {
+ message("top: close mgr");
+ mgr.close();
+
+ message("top: decRef lastNRTFiles=" + lastNRTFiles);
+ deleter.decRef(lastNRTFiles);
+ lastNRTFiles.clear();
+
+ // NOTE: do not decRef these!
+ lastCommitFiles.clear();
+
+ message("top: delete if no ref pendingMergeFiles=" + pendingMergeFiles);
+ for(String fileName : pendingMergeFiles) {
+ deleter.deleteIfNoRef(fileName);
+ }
+ pendingMergeFiles.clear();
+
+ message("top: close dir");
+ IOUtils.close(writeFileLock, dir);
+ }
+ message("top: done close");
+ state = "closed";
+ }
+
+ /** Called when the primary changed */
+ protected synchronized void maybeNewPrimary(long newPrimaryGen) {
+ if (newPrimaryGen != lastPrimaryGen) {
+ message("top: now change lastPrimaryGen from " + lastPrimaryGen + " to " + newPrimaryGen + " pendingMergeFiles=" + pendingMergeFiles);
+ assert newPrimaryGen > lastPrimaryGen: "newPrimaryGen=" + newPrimaryGen + " vs lastPrimaryGen=" + lastPrimaryGen;
+ lastPrimaryGen = newPrimaryGen;
+ pendingMergeFiles.clear();
+ } else {
+ message("top: keep current lastPrimaryGen=" + lastPrimaryGen);
+ }
+ }
+
+ protected synchronized CopyJob launchPreCopyMerge(AtomicBoolean finished, long newPrimaryGen, Map<String,FileMetaData> files) throws IOException {
+
+ CopyJob job;
+
+ maybeNewPrimary(newPrimaryGen);
+ final long primaryGenStart = lastPrimaryGen;
+ Set<String> fileNames = files.keySet();
+ message("now pre-copy warm merge files=" + fileNames + " primaryGen=" + newPrimaryGen);
+
+ for(String fileName : fileNames) {
+ assert pendingMergeFiles.contains(fileName) == false: "file \"" + fileName + "\" is already being warmed!";
+ assert lastNRTFiles.contains(fileName) == false: "file \"" + fileName + "\" is already NRT visible!";
+ }
+
+ job = newCopyJob("warm merge on " + name() + " filesNames=" + fileNames,
+ files, null, false,
+ new CopyJob.OnceDone() {
+
+ @Override
+ public void run(CopyJob job) throws IOException {
+ // Signals that this replica has finished
+ mergeCopyJobs.remove(job);
+ message("done warming merge " + fileNames + " failed?=" + job.getFailed());
+ synchronized(this) {
+ if (job.getFailed() == false) {
+ if (lastPrimaryGen != primaryGenStart) {
+ message("merge pre copy finished but primary has changed; cancelling job files=" + fileNames);
+ job.cancel("primary changed during merge copy", null);
+ } else {
+ boolean abort = false;
+ for (String fileName : fileNames) {
+ if (lastNRTFiles.contains(fileName)) {
+ message("abort merge finish: file " + fileName + " is referenced by last NRT point");
+ abort = true;
+ }
+ if (lastCommitFiles.contains(fileName)) {
+ message("abort merge finish: file " + fileName + " is referenced by last commit point");
+ abort = true;
+ }
+ }
+ if (abort) {
+ // Even though in newNRTPoint we have similar logic, which cancels any merge copy jobs if an NRT point
+ // shows up referencing the files we are warming (because primary got impatient and gave up on us), we also
+ // need it here in case replica is way far behind and fails to even receive the merge pre-copy request
+ // until after the newNRTPoint referenced those files:
+ job.cancel("merged segment was separately copied via NRT point", null);
+ } else {
+ job.finish();
+ message("merge pre copy finished files=" + fileNames);
+ for(String fileName : fileNames) {
+ assert pendingMergeFiles.contains(fileName) == false : "file \"" + fileName + "\" is already in pendingMergeFiles";
+ message("add file " + fileName + " to pendingMergeFiles");
+ pendingMergeFiles.add(fileName);
+ }
+ }
+ }
+ } else {
+ message("merge copy finished with failure");
+ }
+ }
+ finished.set(true);
+ }
+ });
+
+ job.start();
+
+ // When warming a merge we better not already have any of these files copied!
+ assert job.getFileNamesToCopy().size() == files.size();
+
+ mergeCopyJobs.add(job);
+ launch(job);
+
+ return job;
+ }
+
+ public IndexOutput createTempOutput(String prefix, String suffix, IOContext ioContext) throws IOException {
+ return dir.createTempOutput(prefix, suffix, IOContext.DEFAULT);
+ }
+
+ /** Compares incoming per-file identity (id, checksum, header, footer) versus what we have locally and returns the subset of the incoming
+ * files that need copying */
+ public List<Map.Entry<String,FileMetaData>> getFilesToCopy(Map<String,FileMetaData> files) throws IOException {
+
+ boolean doCopyCommitFiles = false;
+ List<Map.Entry<String,FileMetaData>> toCopy = new ArrayList<>();
+ for (Map.Entry<String,FileMetaData> ent : files.entrySet()) {
+ String fileName = ent.getKey();
+ FileMetaData fileMetaData = ent.getValue();
+ if (fileIsIdentical(fileName, fileMetaData) == false) {
+ toCopy.add(ent);
+ }
+ }
+
+ return toCopy;
+ }
+
+ /** Carefully determine if the file on the primary, identified by its {@code String fileName} along with the {@link FileMetaData}
+ * "summarizing" its contents, is precisely the same file that we have locally. If the file does not exist locally, or if its its header
+ * (inclues the segment id), length, footer (including checksum) differ, then this returns false, else true. */
+ private boolean fileIsIdentical(String fileName, FileMetaData srcMetaData) throws IOException {
+
+ if (deleter.isPending(fileName)) {
+ // This was a file we had wanted to delete yet a virus checker prevented us, and now we need to overwrite it.
+ // Such files are in an unknown state, and even if their header and footer and length all
+ // match, since they may not have been fsync'd by the previous node instance on this directory,
+ // they could in theory have corruption internally. So we always force ourselves to copy them here:
+ if (Node.VERBOSE_FILES) {
+ message("file " + fileName + ": will copy [we had wanted to delete this file on init, but failed]");
+ }
+ return false;
+ }
+
+ FileMetaData destMetaData = readLocalFileMetaData(fileName);
+ if (destMetaData == null) {
+ // Something went wrong in reading the file (it's corrupt, truncated, does not exist, etc.):
+ return false;
+ }
+
+ if (Arrays.equals(destMetaData.header, srcMetaData.header) == false ||
+ Arrays.equals(destMetaData.footer, srcMetaData.footer) == false) {
+ // Segment name was reused! This is rare but possible and otherwise devastating:
+ if (Node.VERBOSE_FILES) {
+ message("file " + fileName + ": will copy [header/footer is different]");
+ }
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ private ConcurrentMap<String,Boolean> copying = new ConcurrentHashMap<>();
+
+ // Used only to catch bugs, ensuring a given file name is only ever being copied bye one job:
+ public void startCopyFile(String name) {
+ if (copying.putIfAbsent(name, Boolean.TRUE) != null) {
+ throw new IllegalStateException("file " + name + " is being copied in two places!");
+ }
+ }
+
+ public void finishCopyFile(String name) {
+ if (copying.remove(name) == null) {
+ throw new IllegalStateException("file " + name + " was not actually being copied?");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/SegmentInfosSearcherManager.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/SegmentInfosSearcherManager.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/SegmentInfosSearcherManager.java
new file mode 100644
index 0000000..72ed921
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/SegmentInfosSearcherManager.java
@@ -0,0 +1,129 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.LeafReader;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.index.StandardDirectoryReader;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.ReferenceManager;
+import org.apache.lucene.search.SearcherFactory;
+import org.apache.lucene.search.SearcherManager;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/** A SearcherManager that refreshes via an externally provided (NRT) SegmentInfos, either from {@link IndexWriter} or via
+ * nrt replication to another index. */
+class SegmentInfosSearcherManager extends ReferenceManager<IndexSearcher> {
+ private volatile SegmentInfos currentInfos;
+ private final Directory dir;
+ private final Node node;
+ private final AtomicInteger openReaderCount = new AtomicInteger();
+ private final SearcherFactory searcherFactory;
+
+ public SegmentInfosSearcherManager(Directory dir, Node node, SegmentInfos infosIn, SearcherFactory searcherFactory) throws IOException {
+ this.dir = dir;
+ this.node = node;
+ if (searcherFactory == null) {
+ searcherFactory = new SearcherFactory();
+ }
+ this.searcherFactory = searcherFactory;
+ currentInfos = infosIn;
+ node.message("SegmentInfosSearcherManager.init: use incoming infos=" + infosIn.toString());
+ current = SearcherManager.getSearcher(searcherFactory, StandardDirectoryReader.open(dir, currentInfos, null), null);
+ addReaderClosedListener(current.getIndexReader());
+ }
+
+ @Override
+ protected int getRefCount(IndexSearcher s) {
+ return s.getIndexReader().getRefCount();
+ }
+
+ @Override
+ protected boolean tryIncRef(IndexSearcher s) {
+ return s.getIndexReader().tryIncRef();
+ }
+
+ @Override
+ protected void decRef(IndexSearcher s) throws IOException {
+ s.getIndexReader().decRef();
+ }
+
+ public SegmentInfos getCurrentInfos() {
+ return currentInfos;
+ }
+
+ /** Switch to new segments, refreshing if necessary. Note that it's the caller job to ensure there's a held refCount for the
+ * incoming infos, so all files exist. */
+ public void setCurrentInfos(SegmentInfos infos) throws IOException {
+ if (currentInfos != null) {
+ // So that if we commit, we will go to the next
+ // (unwritten so far) generation:
+ infos.updateGeneration(currentInfos);
+ node.message("mgr.setCurrentInfos: carry over infos gen=" + infos.getSegmentsFileName());
+ }
+ currentInfos = infos;
+ maybeRefresh();
+ }
+
+ @Override
+ protected IndexSearcher refreshIfNeeded(IndexSearcher old) throws IOException {
+ List<LeafReader> subs;
+ if (old == null) {
+ subs = null;
+ } else {
+ subs = new ArrayList<>();
+ for(LeafReaderContext ctx : old.getIndexReader().leaves()) {
+ subs.add(ctx.reader());
+ }
+ }
+
+ // Open a new reader, sharing any common segment readers with the old one:
+ DirectoryReader r = StandardDirectoryReader.open(dir, currentInfos, subs);
+ addReaderClosedListener(r);
+ node.message("refreshed to version=" + currentInfos.getVersion() + " r=" + r);
+ return SearcherManager.getSearcher(searcherFactory, r, (DirectoryReader) old.getIndexReader());
+ }
+
+ private void addReaderClosedListener(IndexReader r) {
+ openReaderCount.incrementAndGet();
+ r.addReaderClosedListener(new IndexReader.ReaderClosedListener() {
+ @Override
+ public void onClose(IndexReader reader) {
+ onReaderClosed();
+ }
+ });
+ }
+
+ /** Tracks how many readers are still open, so that when we are closed,
+ * we can additionally wait until all in-flight searchers are
+ * closed. */
+ synchronized void onReaderClosed() {
+ if (openReaderCount.decrementAndGet() == 0) {
+ notifyAll();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Connection.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Connection.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Connection.java
new file mode 100644
index 0000000..7db7bc1
--- /dev/null
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Connection.java
@@ -0,0 +1,63 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketException;
+
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.DataOutput;
+import org.apache.lucene.store.InputStreamDataInput;
+import org.apache.lucene.store.OutputStreamDataOutput;
+
+/** Simple point-to-point TCP connection */
+class Connection implements Closeable {
+ public final DataInput in;
+ public final DataOutput out;
+ public final InputStream sockIn;
+ public final BufferedOutputStream bos;
+ public final Socket s;
+ public final int destTCPPort;
+ public long lastKeepAliveNS = System.nanoTime();
+
+ public Connection(int tcpPort) throws IOException {
+ this.destTCPPort = tcpPort;
+ this.s = new Socket(InetAddress.getLoopbackAddress(), tcpPort);
+ this.sockIn = s.getInputStream();
+ this.in = new InputStreamDataInput(sockIn);
+ this.bos = new BufferedOutputStream(s.getOutputStream());
+ this.out = new OutputStreamDataOutput(bos);
+ if (Node.VERBOSE_CONNECTIONS) {
+ System.out.println("make new client Connection socket=" + this.s + " destPort=" + tcpPort);
+ }
+ }
+
+ public void flush() throws IOException {
+ bos.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ s.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Jobs.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Jobs.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Jobs.java
new file mode 100644
index 0000000..369414f
--- /dev/null
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Jobs.java
@@ -0,0 +1,152 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.lucene.store.AlreadyClosedException;
+
+/** Runs CopyJob(s) in background thread; each ReplicaNode has an instance of this
+ * running. At a given there could be one NRT copy job running, and multiple
+ * pre-warm merged segments jobs. */
+class Jobs extends Thread implements Closeable {
+
+ private final PriorityQueue<CopyJob> queue = new PriorityQueue<>();
+
+ private final Node node;
+
+ public Jobs(Node node) {
+ this.node = node;
+ }
+
+ private boolean finish;
+
+ /** Returns null if we are closing, else, returns the top job or waits for one to arrive if the queue is empty. */
+ private synchronized SimpleCopyJob getNextJob() {
+ while (true) {
+ if (finish) {
+ return null;
+ } else if (queue.isEmpty()) {
+ try {
+ wait();
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ }
+ } else {
+ return (SimpleCopyJob) queue.poll();
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ SimpleCopyJob topJob = getNextJob();
+ if (topJob == null) {
+ assert finish;
+ break;
+ }
+
+ this.setName("jobs o" + topJob.ord);
+
+ assert topJob != null;
+
+ boolean result;
+ try {
+ result = topJob.visit();
+ } catch (Throwable t) {
+ if ((t instanceof AlreadyClosedException) == false) {
+ node.message("exception during job.visit job=" + topJob + "; now cancel");
+ t.printStackTrace(System.out);
+ } else {
+ node.message("AlreadyClosedException during job.visit job=" + topJob + "; now cancel");
+ }
+ topJob.cancel("unexpected exception in visit", t);
+ try {
+ topJob.onceDone.run(topJob);
+ } catch (Throwable t2) {
+ node.message("ignore exception calling OnceDone: " + t2);
+ }
+ continue;
+ }
+
+ if (result == false) {
+ // Job isn't done yet; put it back:
+ synchronized (this) {
+ queue.offer(topJob);
+ }
+ } else {
+ // Job finished, now notify caller:
+ try {
+ topJob.onceDone.run(topJob);
+ } catch (Throwable t) {
+ node.message("ignore exception calling OnceDone: " + t);
+ }
+ }
+ }
+
+ node.message("top: jobs now exit run thread");
+
+ synchronized(this) {
+ // Gracefully cancel any jobs we didn't finish:
+ while (queue.isEmpty() == false) {
+ SimpleCopyJob job = (SimpleCopyJob) queue.poll();
+ node.message("top: Jobs: now cancel job=" + job);
+ job.cancel("jobs closing", null);
+ try {
+ job.onceDone.run(job);
+ } catch (Throwable t) {
+ node.message("ignore exception calling OnceDone: " + t);
+ }
+ }
+ }
+ }
+
+ public synchronized void launch(CopyJob job) {
+ if (finish == false) {
+ queue.offer(job);
+ notify();
+ } else {
+ throw new AlreadyClosedException("closed");
+ }
+ }
+
+ /** Cancels any existing jobs that are copying the same file names as this one */
+ public synchronized void cancelConflictingJobs(CopyJob newJob) {
+ for (CopyJob job : queue) {
+ if (job.conflicts(newJob)) {
+ node.message("top: now cancel existing conflicting job=" + job + " due to newJob=" + newJob);
+ job.cancel("conflicts with new job", null);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ finish = true;
+ notify();
+ try {
+ join();
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ }
+ }
+}