You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/02/06 17:24:01 UTC
[07/15] lucene-solr git commit: fix more nocommits;
add separate test that deleteAll can replicate
fix more nocommits; add separate test that deleteAll can replicate
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/022540e8
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/022540e8
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/022540e8
Branch: refs/heads/jira/lucene-5438-nrt-replication
Commit: 022540e8c23e57046f1b54a9cd485dab5ff4b563
Parents: 3389068
Author: Mike McCandless <mi...@apache.org>
Authored: Wed Jan 27 06:00:12 2016 -0500
Committer: Mike McCandless <mi...@apache.org>
Committed: Wed Jan 27 06:00:12 2016 -0500
----------------------------------------------------------------------
.../nrt/PreCopyMergedSegmentWarmer.java | 18 -
.../lucene/replicator/nrt/PrimaryNode.java | 18 -
.../lucene/replicator/nrt/ReplicaNode.java | 25 +-
.../lucene/replicator/nrt/NodeProcess.java | 18 +-
.../replicator/nrt/SimplePrimaryNode.java | 53 +-
.../replicator/nrt/SimpleReplicaNode.java | 30 +-
.../lucene/replicator/nrt/SimpleServer.java | 38 +-
.../replicator/nrt/TestNRTReplication.java | 1090 ++--------------
.../nrt/TestStressNRTReplication.java | 1160 ++++++++++++++++++
9 files changed, 1359 insertions(+), 1091 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/022540e8/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PreCopyMergedSegmentWarmer.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PreCopyMergedSegmentWarmer.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PreCopyMergedSegmentWarmer.java
index 1918ede..77f23ab 100644
--- a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PreCopyMergedSegmentWarmer.java
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PreCopyMergedSegmentWarmer.java
@@ -23,30 +23,14 @@ package org.apache.lucene.replicator.nrt;
* flushed segment sizes, not merged segments. */
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentReader;
-import org.apache.lucene.replicator.nrt.CopyJob.OnceDone;
-import org.apache.lucene.store.AlreadyClosedException;
-import org.apache.lucene.store.DataInput;
-import org.apache.lucene.store.DataOutput;
-import org.apache.lucene.store.InputStreamDataInput;
-import org.apache.lucene.store.OutputStreamDataOutput;
-import org.apache.lucene.util.IOUtils;
-import org.apache.lucene.util.ThreadInterruptedException;
// TODO: or ... replica node can do merging locally? tricky to keep things in sync, when one node merges more slowly than others...
@@ -71,8 +55,6 @@ class PreCopyMergedSegmentWarmer extends IndexReaderWarmer {
filesMetaData.put(fileName, metaData);
}
- // nocommit if one replica is very slow then it dos's all other replicas?
-
primary.preCopyMergedSegmentFiles(info, filesMetaData);
primary.message(String.format(Locale.ROOT, "top: done warm merge " + info + ": took %.3f sec, %.1f MB", (System.nanoTime()-startNS)/1000000000., info.sizeInBytes()/1024/1024.));
primary.finishedMergedFiles.addAll(filesMetaData.keySet());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/022540e8/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
index 183f16f..ccd8848 100644
--- a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
@@ -17,43 +17,26 @@ package org.apache.lucene.replicator.nrt;
* limitations under the License.
*/
-import java.io.Closeable;
import java.io.IOException;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.lucene.codecs.CodecUtil;
-import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.LogMergePolicy;
-import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.index.Term;
-import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.AlreadyClosedException;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IOContext;
-import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.RAMFile;
import org.apache.lucene.store.RAMOutputStream;
-import org.apache.lucene.util.IOUtils;
-import org.apache.lucene.util.PrintStreamInfoStream;
import org.apache.lucene.util.ThreadInterruptedException;
/*
@@ -178,7 +161,6 @@ public abstract class PrimaryNode extends Node {
// on xlog replay we are replaying more ops than necessary.
commitData.put(VERSION_KEY, Long.toString(copyState.version));
message("top: commit commitData=" + commitData);
- // nocommit this is now an NRT-visible change! make test where nothing is indexing and confirm we don't do silly commit + refresh loop forever!
writer.setCommitData(commitData, false);
writer.commit();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/022540e8/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
index af142d5..713c6f1 100644
--- a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
@@ -17,14 +17,7 @@ package org.apache.lucene.replicator.nrt;
* limitations under the License.
*/
-import java.io.EOFException;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.nio.file.NoSuchFileException;
-import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -34,15 +27,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.SegmentInfos;
@@ -52,24 +41,12 @@ import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BufferedChecksumIndexInput;
-import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ByteArrayIndexInput;
-import org.apache.lucene.store.ChecksumIndexInput;
-import org.apache.lucene.store.DataInput;
-import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IOContext;
-import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.InputStreamDataInput;
import org.apache.lucene.store.Lock;
-import org.apache.lucene.store.LockObtainFailedException;
-import org.apache.lucene.store.OutputStreamDataOutput;
-import org.apache.lucene.store.RAMOutputStream;
-import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.IOUtils;
-import org.apache.lucene.util.ThreadInterruptedException;
/** Replica node, that pulls index changes from the primary node by copying newly flushed or merged index files */
@@ -544,7 +521,7 @@ abstract class ReplicaNode extends Node {
synchronized (mergeCopyJobs) {
for (CopyJob mergeJob : mergeCopyJobs) {
if (mergeJob.getFileNames().contains(fileName)) {
- // nocommit can't we .transferAndCancel?
+ // TODO: we could maybe transferAndCancel here? except CopyJob can't transferAndCancel more than one currently
message("top: now cancel merge copy job=" + mergeJob + ": file " + fileName + " is now being copied via NRT point");
mergeJob.cancel("newNRTPoint is copying over the same file", null);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/022540e8/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
index 4e29508..9de2c04 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
@@ -19,18 +19,9 @@ package org.apache.lucene.replicator.nrt;
import java.io.Closeable;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.document.Document;
-import org.apache.lucene.store.DataInput;
-import org.apache.lucene.store.DataOutput;
-import org.apache.lucene.store.InputStreamDataInput;
-import org.apache.lucene.store.OutputStreamDataOutput;
/** Parent JVM hold this "wrapper" to refer to each child JVM. This is roughly equivalent e.g. to a client-side "sugar" API. */
class NodeProcess implements Closeable {
@@ -234,5 +225,14 @@ class NodeProcess implements Closeable {
c.flush();
c.in.readByte();
}
+
+ public void deleteAllDocuments(Connection c) throws IOException {
+ if (isPrimary == false) {
+ throw new IllegalStateException("only primary can index");
+ }
+ c.out.writeByte(SimplePrimaryNode.CMD_DELETE_ALL_DOCS);
+ c.flush();
+ c.in.readByte();
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/022540e8/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
index 18e77ef..0afd1b4 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
@@ -43,17 +43,16 @@ import org.apache.lucene.document.FieldType;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LogMergePolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
-import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.DataInput;
@@ -61,8 +60,6 @@ import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.MockDirectoryWrapper;
-import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
@@ -74,6 +71,8 @@ class SimplePrimaryNode extends PrimaryNode {
final int tcpPort;
+ final Random random;
+
// These are updated by parent test process whenever replicas change:
int[] replicaTCPPorts;
int[] replicaIDs;
@@ -81,6 +80,10 @@ class SimplePrimaryNode extends PrimaryNode {
// So we only flip a bit once per file name:
final Set<String> bitFlipped = Collections.synchronizedSet(new HashSet<>());
+ final List<MergePreCopy> warmingSegments = Collections.synchronizedList(new ArrayList<>());
+
+ final boolean doFlipBitsDuringCopy;
+
static class MergePreCopy {
final List<Connection> connections = Collections.synchronizedList(new ArrayList<>());
final Map<String,FileMetaData> files;
@@ -109,11 +112,12 @@ class SimplePrimaryNode extends PrimaryNode {
}
}
- final List<MergePreCopy> warmingSegments = Collections.synchronizedList(new ArrayList<>());
-
- public SimplePrimaryNode(Random random, Path indexPath, int id, int tcpPort, long primaryGen, long forcePrimaryVersion, SearcherFactory searcherFactory) throws IOException {
+ public SimplePrimaryNode(Random random, Path indexPath, int id, int tcpPort, long primaryGen, long forcePrimaryVersion, SearcherFactory searcherFactory,
+ boolean doFlipBitsDuringCopy) throws IOException {
super(initWriter(id, random, indexPath), id, primaryGen, forcePrimaryVersion, searcherFactory);
this.tcpPort = tcpPort;
+ this.random = new Random(random.nextLong());
+ this.doFlipBitsDuringCopy = doFlipBitsDuringCopy;
}
/** Records currently alive replicas. */
@@ -187,7 +191,7 @@ class SimplePrimaryNode extends PrimaryNode {
long startNS = System.nanoTime();
long lastWarnNS = startNS;
- // TODO: maybe ... place some sort of time limit on how long we are willing to wait for slow replicas to finish copying?
+ // TODO: maybe ... place some sort of time limit on how long we are willing to wait for slow replica(s) to finish copying?
while (preCopy.finished() == false) {
try {
Thread.sleep(10);
@@ -241,7 +245,16 @@ class SimplePrimaryNode extends PrimaryNode {
message("top: warning: replica socket=" + c.s + " for segment=" + info + " seems to be dead; closing files=" + files.keySet());
IOUtils.closeWhileHandlingException(c);
it.remove();
+ done = true;
}
+
+ if (done == false && random.nextInt(1000) == 17) {
+ message("top: warning: now randomly dropping replica from merge warming; files=" + files.keySet());
+ IOUtils.closeWhileHandlingException(c);
+ it.remove();
+ done = true;
+ }
+
} catch (Throwable t) {
message("top: ignore exception trying to read byte during warm for segment=" + info + " to replica socket=" + c.s + ": " + t + " files=" + files.keySet());
IOUtils.closeWhileHandlingException(c);
@@ -368,7 +381,7 @@ class SimplePrimaryNode extends PrimaryNode {
while (upto < len) {
int chunk = (int) Math.min(buffer.length, (len-upto));
in.readBytes(buffer, 0, chunk);
- if (TestNRTReplication.DO_BIT_FLIPS_DURING_COPY) {
+ if (doFlipBitsDuringCopy) {
if (random.nextInt(3000) == 17 && bitFlipped.contains(fileName) == false) {
bitFlipped.add(fileName);
message("file " + fileName + " to R" + replicaID + ": now randomly flipping a bit at byte=" + upto);
@@ -435,6 +448,10 @@ class SimplePrimaryNode extends PrimaryNode {
handleDeleteDocument(in, out);
out.writeByte((byte) 1);
bos.flush();
+ } else if (cmd == CMD_DELETE_ALL_DOCS) {
+ writer.deleteAll();
+ out.writeByte((byte) 1);
+ bos.flush();
} else if (cmd == CMD_INDEXING_DONE) {
out.writeByte((byte) 1);
bos.flush();
@@ -508,6 +525,7 @@ class SimplePrimaryNode extends PrimaryNode {
static final byte CMD_MARKER_SEARCH = 3;
static final byte CMD_COMMIT = 4;
static final byte CMD_CLOSE = 5;
+ static final byte CMD_SEARCH_ALL = 21;
// Send (to primary) the list of currently running replicas:
static final byte CMD_SET_REPLICAS = 16;
@@ -518,6 +536,7 @@ class SimplePrimaryNode extends PrimaryNode {
static final byte CMD_UPDATE_DOC = 7;
static final byte CMD_DELETE_DOC = 8;
static final byte CMD_INDEXING_DONE = 19;
+ static final byte CMD_DELETE_ALL_DOCS = 22;
// Sent by replica to primary when replica first starts up, so primary can add it to any warming merges:
static final byte CMD_NEW_REPLICA = 20;
@@ -579,6 +598,22 @@ class SimplePrimaryNode extends PrimaryNode {
}
continue outer;
+ case CMD_SEARCH_ALL:
+ {
+ Thread.currentThread().setName("search all");
+ IndexSearcher searcher = mgr.acquire();
+ try {
+ long version = ((DirectoryReader) searcher.getIndexReader()).getVersion();
+ int hitCount = searcher.search(new MatchAllDocsQuery(), 1).totalHits;
+ //message("version=" + version + " searcher=" + searcher);
+ out.writeVLong(version);
+ out.writeVInt(hitCount);
+ } finally {
+ mgr.release(searcher);
+ }
+ }
+ continue outer;
+
case CMD_MARKER_SEARCH:
{
Thread.currentThread().setName("msearch");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/022540e8/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
index 8667df1..27a5547 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
@@ -24,19 +24,16 @@ import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.file.Path;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.DataInput;
@@ -45,7 +42,6 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.MockDirectoryWrapper;
-import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.RateLimitedIndexOutput;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.LuceneTestCase;
@@ -182,7 +178,7 @@ class SimpleReplicaNode extends ReplicaNode {
break;
case SimplePrimaryNode.CMD_GET_SEARCHING_VERSION:
- // nocommit this is hacky:
+ // This is called when primary has crashed and we need to elect a new primary from all the still running replicas:
// Tricky: if a sync is just finishing up, i.e. managed to finish copying all files just before we crashed primary, and is now
// in the process of opening a new reader, we need to wait for it, to be sure we really pick the most current replica:
@@ -190,7 +186,7 @@ class SimpleReplicaNode extends ReplicaNode {
message("top: getSearchingVersion: now wait for finish sync");
// TODO: use immediate concurrency instead of polling:
while (isCopying() && stop.get() == false) {
- Thread.sleep(50);
+ Thread.sleep(10);
message("top: curNRTCopy=" + curNRTCopy);
}
message("top: getSearchingVersion: done wait for finish sync");
@@ -212,6 +208,24 @@ class SimpleReplicaNode extends ReplicaNode {
//node.message("version=" + version + " searcher=" + searcher);
out.writeVLong(version);
out.writeVInt(hitCount);
+ bos.flush();
+ } finally {
+ mgr.release(searcher);
+ }
+ }
+ continue outer;
+
+ case SimplePrimaryNode.CMD_SEARCH_ALL:
+ {
+ Thread.currentThread().setName("search all");
+ IndexSearcher searcher = mgr.acquire();
+ try {
+ long version = ((DirectoryReader) searcher.getIndexReader()).getVersion();
+ int hitCount = searcher.search(new MatchAllDocsQuery(), 1).totalHits;
+ //node.message("version=" + version + " searcher=" + searcher);
+ out.writeVLong(version);
+ out.writeVInt(hitCount);
+ bos.flush();
} finally {
mgr.release(searcher);
}
@@ -227,6 +241,7 @@ class SimpleReplicaNode extends ReplicaNode {
int hitCount = searcher.search(new TermQuery(new Term("marker", "marker")), 1).totalHits;
out.writeVLong(version);
out.writeVInt(hitCount);
+ bos.flush();
} finally {
mgr.release(searcher);
}
@@ -290,6 +305,7 @@ class SimpleReplicaNode extends ReplicaNode {
default:
throw new IllegalArgumentException("unrecognized cmd=" + cmd);
}
+ System.out.println("NOW FLUSH");
bos.flush();
break;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/022540e8/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java
index 7a257de..5b04721 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java
@@ -19,61 +19,32 @@ package org.apache.lucene.replicator.nrt;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
-import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
import java.lang.reflect.Method;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.nio.file.Path;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.nio.file.Paths;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.Set;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.lucene.analysis.MockAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.FieldType;
-import org.apache.lucene.document.StringField;
-import org.apache.lucene.document.TextField;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexOptions;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
-import org.apache.lucene.store.IOContext;
-import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.InputStreamDataInput;
-import org.apache.lucene.store.MockDirectoryWrapper;
-import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.OutputStreamDataOutput;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.lucene.util.LuceneTestCase.SuppressSysoutChecks;
-import org.apache.lucene.util.LuceneTestCase;
-import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
/** Child process with silly naive TCP socket server to handle
@@ -262,8 +233,9 @@ public class SimpleServer extends LuceneTestCase {
long primaryGen = Long.parseLong(System.getProperty("tests.nrtreplication.primaryGen"));
Node.globalStartNS = Long.parseLong(System.getProperty("tests.nrtreplication.startNS"));
- boolean doRandomCrash = isPrimary ? TestNRTReplication.DO_CRASH_PRIMARY : TestNRTReplication.DO_CRASH_REPLICA;
- boolean doRandomClose = isPrimary ? false : TestNRTReplication.DO_CLOSE_REPLICA;
+ boolean doRandomCrash = "true".equals(System.getProperty("tests.nrtreplication.doRandomCrash"));
+ boolean doRandomClose = "true".equals(System.getProperty("tests.nrtreplication.doRandomClose"));
+ boolean doFlipBitsDuringCopy = "true".equals(System.getProperty("tests.nrtreplication.doFlipBitsDuringCopy"));
// Create server socket that we listen for incoming requests on:
try (final ServerSocket ss = new ServerSocket(0)) {
@@ -272,7 +244,7 @@ public class SimpleServer extends LuceneTestCase {
System.out.println("\nPORT: " + tcpPort);
final Node node;
if (isPrimary) {
- node = new SimplePrimaryNode(random(), indexPath, id, tcpPort, primaryGen, forcePrimaryVersion, null);
+ node = new SimplePrimaryNode(random(), indexPath, id, tcpPort, primaryGen, forcePrimaryVersion, null, doFlipBitsDuringCopy);
System.out.println("\nCOMMIT VERSION: " + ((PrimaryNode) node).getLastCommitVersion());
} else {
node = new SimpleReplicaNode(random(), id, tcpPort, indexPath, primaryGen, primaryTCPPort, null);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/022540e8/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
index 5a073ff..755b234 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
@@ -18,499 +18,39 @@ package org.apache.lucene.replicator.nrt;
*/
import java.io.BufferedReader;
-import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
-import java.io.Writer;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.nio.charset.MalformedInputException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
-import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.index.ConcurrentMergeScheduler;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.SegmentInfos;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.ScoreDoc;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.search.TopDocs;
-import org.apache.lucene.store.AlreadyClosedException;
-import org.apache.lucene.store.DataInput;
-import org.apache.lucene.store.DataOutput;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.InputStreamDataInput;
-import org.apache.lucene.store.MockDirectoryWrapper;
-import org.apache.lucene.store.NIOFSDirectory;
-import org.apache.lucene.store.OutputStreamDataOutput;
-import org.apache.lucene.store.RateLimiter;
-import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LineFileDocs;
+import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.lucene.util.LuceneTestCase.SuppressSysoutChecks;
-import org.apache.lucene.util.LuceneTestCase;
-import org.apache.lucene.util.TestUtil;
-import org.apache.lucene.util.ThreadInterruptedException;
import com.carrotsearch.randomizedtesting.SeedUtils;
-// nocommit sometimes have one replica be really slow at copying / have random pauses (fake GC) / etc.
-
-// nocommit randomly p.destroy() one replica?
-
-/*
- TODO
- - why do we do the "rename temp to actual" all at the end...? what really does that buy us?
- - replica should also track maxSegmentName its seen, and tap into inflateGens if it's later promoted to primary?
- - test should not print scary exceptions and then succeed!
- - since all nodes are local, we could have a different test only impl that just does local file copies instead of via tcp...
- - are the pre-copied-completed-merged files not being cleared in primary?
- - hmm the logic isn't right today? a replica may skip pulling a given copy state, that recorded the finished merged segments?
- - beast & fix bugs
- - graceful cluster restart
- - better translog integration
- - get "graceful primary shutdown" working
- - there is still some global state we rely on for "correctness", e.g. lastPrimaryVersion
- - clean up how version is persisted in commit data
- - why am i not using hashes here? how does ES use them?
- - get all other "single shard" functions working too: this cluster should "act like" a single shard
- - SLM
- - controlled nrt reopen thread / returning long gen on write
- - live field values
- - add indexes
- - make cluster level APIs to search, index, that deal w/ primary failover, etc.
- - must prune xlog
- - refuse to start primary unless we have quorum
- - later
- - if we named index files using segment's ID we wouldn't have file name conflicts after primary crash / rollback?
- - back pressure on indexing if replicas can't keep up?
- - get xlog working on top? needs to be checkpointed, so we can correlate IW ops to NRT reader version and prune xlog based on commit
- quorum
- - maybe fix IW to return "gen" or "seq id" or "segment name" or something?
- - replica can copy files from other replicas too / use multicast / rsync / something
- - each replica could also pre-open a SegmentReader after pre-copy when warming a merge
- - we can pre-copy newly flushed files too, for cases where reopen rate is low vs IW's flushing because RAM buffer is full
- - opto: pre-copy files as they are written; if they will become CFS, we can build CFS on the replica?
- - what about multiple commit points?
- - fix primary to init directly from an open replica, instead of having to commit/close the replica first
-*/
-
-// Tricky cases:
-// - we are pre-copying a merge, then replica starts up part way through, so it misses that pre-copy and must do it on next nrt point
-// - a down replica starts up, but it's "from the future" vs the current primary, and must copy over file names with different contents
-// but referenced by its latest commit point, so it must fully remove that commit ... which is a hazardous window
-// - replica comes up just as the primary is crashing / moving
-// - electing a new primary when a replica is just finishing its nrt sync: we need to wait for it so we are sure to get the "most up to
-// date" replica
-// - replica comes up after merged segment finished so it doesn't copy over the merged segment "promptly" (i.e. only sees it on NRT refresh)
-
-/**
- * Test case showing how to implement NRT replication. This test spawns a sub-process per-node, running TestNRTReplicationChild.
- *
- * One node is primary, and segments are periodically flushed there, then concurrently the N replica nodes copy the new files over and open new readers, while
- * primary also opens a new reader.
- *
- * Nodes randomly crash and are restarted. If the primary crashes, a replica is promoted.
- *
- * Merges are currently first finished on the primary and then pre-copied out to replicas with a merged segment warmer so they don't block
- * ongoing NRT reopens. Probably replicas could do their own merging instead, but this is more complex and may not be better overall
- * (merging takes a lot of IO resources).
- *
- * Slow network is simulated with a RateLimiter.
- */
-
-// nocommit randomly delete all doc sometimes, 1) using IW.deleteAll and 2) doing it inefficiently (by query, by id)
-
// MockRandom's .sd file has no index header/footer:
@SuppressCodecs({"MockRandom", "Memory", "Direct", "SimpleText"})
@SuppressSysoutChecks(bugUrl = "Stuff gets printed, important stuff for debugging a failure")
public class TestNRTReplication extends LuceneTestCase {
- // Test evilness controls:
-
- /** Randomly crash the current primary (losing data!) and promote the "next best" replica. */
- static final boolean DO_CRASH_PRIMARY = true;
-
- /** Randomly crash (JVM core dumps) a replica; it will later randomly be restarted and sync itself. */
- static final boolean DO_CRASH_REPLICA = true;
-
- /** Randomly gracefully close a replica; it will later be restarted and sync itself. */
- static final boolean DO_CLOSE_REPLICA = true;
-
- /** If false, all child + parent output is interleaved into single stdout/err */
- static final boolean SEPARATE_CHILD_OUTPUT = false;
-
- // nocommit DO_CLOSE_PRIMARY?
-
- /** Randomly crash whole cluster and then restart it */
- static final boolean DO_FULL_CLUSTER_CRASH = true;
-
- /** True if we randomly flip a bit while copying files out */
- static final boolean DO_BIT_FLIPS_DURING_COPY = true;
-
- /** Set to a non-null value to force exactly that many nodes; else, it's random. */
- static final Integer NUM_NODES = null;
-
- static final boolean DO_RANDOM_XLOG_REPLAY = false;
-
- final AtomicBoolean failed = new AtomicBoolean();
-
- final AtomicBoolean stop = new AtomicBoolean();
-
/** cwd where we start each child (server) node */
private Path childTempDir;
- long primaryGen;
-
- volatile long lastPrimaryVersion;
-
- volatile NodeProcess primary;
- volatile NodeProcess[] nodes;
- volatile long[] nodeTimeStamps;
- volatile boolean[] starting;
-
- Path[] indexPaths;
-
- Path transLogPath;
- SimpleTransLog transLog;
- final AtomicInteger markerUpto = new AtomicInteger();
-
- /** Maps searcher version to how many hits the query body:the matched. */
- final Map<Long,Integer> hitCounts = new ConcurrentHashMap<>();
-
- /** Maps searcher version to how many marker documents matched. This should only ever grow (we never delete marker documents). */
- final Map<Long,Integer> versionToMarker = new ConcurrentHashMap<>();
-
- /** Maps searcher version to xlog location when refresh of this version started. */
- final Map<Long,Long> versionToTransLogLocation = new ConcurrentHashMap<>();
-
- public void test() throws Exception {
-
- Node.globalStartNS = System.nanoTime();
-
- message("change thread name from " + Thread.currentThread().getName());
- Thread.currentThread().setName("main");
-
- childTempDir = createTempDir("child");
-
- // We are parent process:
-
- // Silly bootstrapping:
- versionToTransLogLocation.put(0L, 0L);
- versionToTransLogLocation.put(1L, 0L);
-
- int numNodes;
-
- if (NUM_NODES == null) {
- numNodes = TestUtil.nextInt(random(), 2, 10);
- } else {
- numNodes = NUM_NODES.intValue();
- }
-
- System.out.println("TEST: using " + numNodes + " nodes");
-
- transLogPath = createTempDir("NRTReplication").resolve("translog");
- transLog = new SimpleTransLog(transLogPath);
-
- //state.rateLimiters = new RateLimiter[numNodes];
- indexPaths = new Path[numNodes];
- nodes = new NodeProcess[numNodes];
- nodeTimeStamps = new long[numNodes];
- Arrays.fill(nodeTimeStamps, Node.globalStartNS);
- starting = new boolean[numNodes];
-
- for(int i=0;i<numNodes;i++) {
- indexPaths[i] = createTempDir("index" + i);
- }
-
- Thread[] indexers = new Thread[TestUtil.nextInt(random(), 1, 3)];
- System.out.println("TEST: launch " + indexers.length + " indexer threads");
- for(int i=0;i<indexers.length;i++) {
- indexers[i] = new IndexThread();
- indexers[i].setName("indexer" + i);
- indexers[i].setDaemon(true);
- indexers[i].start();
- }
-
- Thread[] searchers = new Thread[TestUtil.nextInt(random(), 1, 3)];
- System.out.println("TEST: launch " + searchers.length + " searcher threads");
- for(int i=0;i<searchers.length;i++) {
- searchers[i] = new SearchThread();
- searchers[i].setName("searcher" + i);
- searchers[i].setDaemon(true);
- searchers[i].start();
- }
-
- Thread restarter = new RestartThread();
- restarter.setName("restarter");
- restarter.setDaemon(true);
- restarter.start();
-
- int runTimeSec;
- if (TEST_NIGHTLY) {
- runTimeSec = RANDOM_MULTIPLIER * TestUtil.nextInt(random(), 120, 240);
- } else {
- runTimeSec = RANDOM_MULTIPLIER * TestUtil.nextInt(random(), 45, 120);
- }
-
- System.out.println("TEST: will run for " + runTimeSec + " sec");
-
- long endTime = System.nanoTime() + runTimeSec*1000000000L;
-
- sendReplicasToPrimary();
-
- while (failed.get() == false && System.nanoTime() < endTime) {
-
- // Wait a bit:
- Thread.sleep(TestUtil.nextInt(random(), Math.min(runTimeSec*4, 200), runTimeSec*4));
- if (primary != null && random().nextBoolean()) {
- message("top: now flush primary");
- NodeProcess curPrimary = primary;
- if (curPrimary != null) {
-
- // Save these before we start flush:
- long nextTransLogLoc = transLog.getNextLocation();
- int markerUptoSav = markerUpto.get();
-
- long result;
- try {
- result = primary.flush();
- } catch (Throwable t) {
- message("top: flush failed; skipping: " + t.getMessage());
- result = -1;
- }
- if (result > 0) {
- // There were changes
- lastPrimaryVersion = result;
- addTransLogLoc(lastPrimaryVersion, nextTransLogLoc);
- addVersionMarker(lastPrimaryVersion, markerUptoSav);
- }
- }
- }
-
- StringBuilder sb = new StringBuilder();
- int liveCount = 0;
- for(int i=0;i<nodes.length;i++) {
- NodeProcess node = nodes[i];
- if (node != null) {
- if (sb.length() != 0) {
- sb.append(" ");
- }
- liveCount++;
- if (node.isPrimary) {
- sb.append('P');
- } else {
- sb.append('R');
- }
- sb.append(i);
- }
- }
-
- message("PG=" + (primary == null ? "X" : primaryGen) + " " + liveCount + " (of " + nodes.length + ") nodes running: " + sb);
-
- // Commit a random node, primary or replica
-
- {
- NodeProcess node = nodes[random().nextInt(nodes.length)];
- if (node != null) {
- // TODO: if this node is primary, it means we committed a "partial" version (not exposed as an NRT point)... not sure it matters.
- // maybe we somehow allow IW to commit a specific sis (the one we just flushed)?
- message("top: now commit node=" + node);
- node.commitAsync();
- }
- }
- }
-
- message("TEST: top: test done, now close");
- stop.set(true);
- for(Thread thread : indexers) {
- thread.join();
- }
- for(Thread thread : searchers) {
- thread.join();
- }
- restarter.join();
-
- // Close replicas before primary so we cancel any in-progres replications:
- System.out.println("TEST: top: now close replicas");
- List<Closeable> toClose = new ArrayList<>();
- for(NodeProcess node : nodes) {
- if (node != primary && node != null) {
- toClose.add(node);
- }
- }
- IOUtils.close(toClose);
- IOUtils.close(primary);
- IOUtils.close(transLog);
-
- if (failed.get() == false) {
- message("TEST: top: now checkIndex");
- for(Path path : indexPaths) {
- message("TEST: check " + path);
- MockDirectoryWrapper dir = newMockFSDirectory(path);
- // Just too slow otherwise
- dir.setCrossCheckTermVectorsOnClose(false);
- dir.close();
- }
- } else {
- message("TEST: failed; skip checkIndex");
- }
- }
-
- private boolean anyNodesStarting() {
- for(int id=0;id<nodes.length;id++) {
- if (starting[id]) {
- return true;
- }
- }
-
- return false;
- }
-
- /** Picks a replica and promotes it as new primary. */
- private void promoteReplica() throws IOException {
- message("top: primary crashed; now pick replica to promote");
- long maxSearchingVersion = -1;
- NodeProcess replicaToPromote = null;
-
- // We must promote the most current replica, because otherwise file name reuse can cause a replication to fail when it needs to copy
- // over a file currently held open for searching. This also minimizes recovery work since the most current replica means less xlog
- // replay to catch up:
- for (NodeProcess node : nodes) {
- if (node != null) {
- message("ask " + node + " for its current searching version");
- long searchingVersion = node.getSearchingVersion();
- message(node + " has searchingVersion=" + searchingVersion);
- if (searchingVersion > maxSearchingVersion) {
- maxSearchingVersion = searchingVersion;
- replicaToPromote = node;
- }
- }
- }
-
- if (replicaToPromote == null) {
- message("top: no replicas running; skipping primary promotion");
- return;
- }
-
- message("top: promote " + replicaToPromote + " version=" + maxSearchingVersion + "; now commit");
- if (replicaToPromote.commit() == false) {
- message("top: commit failed; skipping primary promotion");
- return;
- }
-
- message("top: now shutdown " + replicaToPromote);
- if (replicaToPromote.shutdown() == false) {
- message("top: shutdown failed for R" + replicaToPromote.id + "; skipping primary promotion");
- return;
- }
-
- int id = replicaToPromote.id;
- message("top: now startPrimary " + replicaToPromote);
- startPrimary(replicaToPromote.id);
- }
-
- void startPrimary(int id) throws IOException {
- message(id + ": top: startPrimary lastPrimaryVersion=" + lastPrimaryVersion);
- assert nodes[id] == null;
-
- // Force version of new primary to advance beyond where old primary was, so we never re-use versions. It may have
- // already advanced beyond newVersion, e.g. if it flushed new segments while during xlog replay:
-
- // First start node as primary (it opens an IndexWriter) but do not publish it for searching until we replay xlog:
- NodeProcess newPrimary = startNode(id, indexPaths[id], true, lastPrimaryVersion+1);
- if (newPrimary == null) {
- message("top: newPrimary failed to start; abort");
- return;
- }
-
- // Get xlog location that this node was guaranteed to already have indexed through; this may replay some ops already indexed but it's OK
- // because the ops are idempotent: we updateDocument (by docid) on replay even for original addDocument:
- Long startTransLogLoc;
- Integer markerCount;
- if (newPrimary.initCommitVersion == 0) {
- startTransLogLoc = 0L;
- markerCount = 0;
- } else {
- startTransLogLoc = versionToTransLogLocation.get(newPrimary.initCommitVersion);
- markerCount = versionToMarker.get(newPrimary.initCommitVersion);
- }
- assert startTransLogLoc != null: "newPrimary.initCommitVersion=" + newPrimary.initCommitVersion + " is missing from versionToTransLogLocation: keys=" + versionToTransLogLocation.keySet();
- assert markerCount != null: "newPrimary.initCommitVersion=" + newPrimary.initCommitVersion + " is missing from versionToMarker: keys=" + versionToMarker.keySet();
-
- // When the primary starts, the userData in its latest commit point tells us which version it had indexed up to, so we know where to
- // replay from in the xlog. However, we forcefuly advance the version, and then IW on init (or maybe getReader) also adds 1 to it.
- // Since we publish the primary in this state (before xlog replay is done), a replica can start up at this point and pull this version,
- // and possibly later be chosen as a primary, causing problems if the version is known recorded in the translog map. So we record it
- // here:
-
- addTransLogLoc(newPrimary.initInfosVersion, startTransLogLoc);
- addVersionMarker(newPrimary.initInfosVersion, markerCount);
-
- assert newPrimary.initInfosVersion >= lastPrimaryVersion;
- message("top: now change lastPrimaryVersion from " + lastPrimaryVersion + " to " + newPrimary.initInfosVersion);
- lastPrimaryVersion = newPrimary.initInfosVersion;
-
- // Publish new primary, before replaying xlog. This means other indexing ops can come in at the same time as we catch up indexing
- // previous ops. Effectively, we have "forked" the indexing ops, by rolling back in time a bit, and replaying old indexing ops (from
- // translog) concurrently with new incoming ops.
- nodes[id] = newPrimary;
- primary = newPrimary;
-
- sendReplicasToPrimary();
-
- long nextTransLogLoc = transLog.getNextLocation();
- int nextMarkerUpto = markerUpto.get();
- message("top: replay trans log " + startTransLogLoc + " (version=" + newPrimary.initCommitVersion + ") to " + nextTransLogLoc + " (translog end)");
- try {
- transLog.replay(newPrimary, startTransLogLoc, nextTransLogLoc);
- } catch (IOException ioe) {
- message("top: replay xlog failed; abort");
- return;
- }
- message("top: done replay trans log");
- }
-
final AtomicLong nodeStartCounter = new AtomicLong();
- final Set<Integer> crashingNodes = Collections.synchronizedSet(new HashSet<>());
-
/** Launches a child "server" (separate JVM), which is either primary or replica node */
- NodeProcess startNode(final int id, Path indexPath, boolean isPrimary, long forcePrimaryVersion) throws IOException {
- nodeTimeStamps[id] = System.nanoTime();
+ NodeProcess startNode(int primaryTCPPort, final int id, Path indexPath, boolean isPrimary, long forcePrimaryVersion) throws IOException {
List<String> cmd = new ArrayList<>();
- NodeProcess curPrimary = primary;
-
cmd.add(System.getProperty("java.home")
+ System.getProperty("file.separator")
+ "bin"
@@ -518,12 +58,13 @@ public class TestNRTReplication extends LuceneTestCase {
+ "java");
cmd.add("-Xmx512m");
- if (curPrimary != null) {
- cmd.add("-Dtests.nrtreplication.primaryTCPPort=" + curPrimary.tcpPort);
+ if (primaryTCPPort != -1) {
+ cmd.add("-Dtests.nrtreplication.primaryTCPPort=" + primaryTCPPort);
} else if (isPrimary == false) {
// We cannot start a replica when there is no primary:
return null;
}
+ cmd.add("-Dtests.nrtreplication.closeorcrash=false");
cmd.add("-Dtests.nrtreplication.node=true");
cmd.add("-Dtests.nrtreplication.nodeid=" + id);
@@ -534,7 +75,7 @@ public class TestNRTReplication extends LuceneTestCase {
cmd.add("-Dtests.nrtreplication.forcePrimaryVersion=" + forcePrimaryVersion);
}
- long myPrimaryGen = primaryGen;
+ long myPrimaryGen = 0;
cmd.add("-Dtests.nrtreplication.primaryGen=" + myPrimaryGen);
// Mixin our own counter because this is called from a fresh thread which means the seed otherwise isn't changing each time we spawn a
@@ -548,17 +89,6 @@ public class TestNRTReplication extends LuceneTestCase {
cmd.add("org.junit.runner.JUnitCore");
cmd.add(getClass().getName().replace(getClass().getSimpleName(), "SimpleServer"));
- Writer childLog;
-
- if (SEPARATE_CHILD_OUTPUT) {
- Path childOut = childTempDir.resolve(id + ".log");
- message("logging to " + childOut);
- childLog = Files.newBufferedWriter(childOut, StandardCharsets.UTF_8, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
- childLog.write("\n\nSTART NEW CHILD:\n");
- } else {
- childLog = null;
- }
-
message("child process command: " + cmd);
ProcessBuilder pb = new ProcessBuilder(cmd);
pb.redirectErrorStream(true);
@@ -592,41 +122,11 @@ public class TestNRTReplication extends LuceneTestCase {
throw new RuntimeException(ie);
}
message("exit value=" + p.exitValue());
-
- // Hackity hack, in case primary crashed/closed and we haven't noticed (reaped the process) yet:
- if (isPrimary == false) {
- if (sawExistingSegmentsFile) {
- // This means MDW's virus checker blocked us from deleting segments_N that we must delete in order to start ... just return null
- // and retry again later:
- message("failed to remove segments_N; skipping");
- return null;
- }
- for(int i=0;i<10;i++) {
- if (primaryGen != myPrimaryGen || primary == null) {
- // OK: primary crashed while we were trying to start, so it's expected/allowed that we could not start the replica:
- message("primary crashed/closed while replica R" + id + " tried to start; skipping");
- return null;
- } else {
- try {
- Thread.sleep(10);
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
- }
- }
- }
- }
-
- // Should fail the test:
message("top: now fail test replica R" + id + " failed to start");
- failed.set(true);
throw new RuntimeException("replica R" + id + " failed to start");
}
- if (childLog != null) {
- childLog.write(l);
- childLog.write("\n");
- childLog.flush();
- } else if (logTimeStart.matcher(l).matches()) {
+ if (logTimeStart.matcher(l).matches()) {
// Already a well-formed log output:
System.out.println(l);
} else {
@@ -650,7 +150,7 @@ public class TestNRTReplication extends LuceneTestCase {
final boolean finalWillCrash = willCrash;
- // Baby sits the child process, pulling its stdout and printing to our stdout, calling nodeClosed once it exits:
+ // Baby sits the child process, pulling its stdout and printing to our stdout:
Thread pumper = ThreadPumper.start(
new Runnable() {
@Override
@@ -665,503 +165,147 @@ public class TestNRTReplication extends LuceneTestCase {
message("done wait for process " + p);
int exitValue = p.exitValue();
message("exit value=" + exitValue + " willCrash=" + finalWillCrash);
- if (childLog != null) {
- try {
- childLog.write("process done; exitValue=" + exitValue + "\n");
- childLog.close();
- } catch (IOException ioe) {
- throw new RuntimeException(ioe);
- }
- }
- if (exitValue != 0 && finalWillCrash == false && crashingNodes.remove(id) == false) {
+ if (exitValue != 0) {
// should fail test
- failed.set(true);
- if (childLog != null) {
- throw new RuntimeException("node " + id + " process had unexpected non-zero exit status=" + exitValue + "; see " + childLog + " for details");
- } else {
- throw new RuntimeException("node " + id + " process had unexpected non-zero exit status=" + exitValue);
- }
+ throw new RuntimeException("node " + id + " process had unexpected non-zero exit status=" + exitValue);
}
- nodeClosed(id);
}
- }, r, System.out, childLog);
+ }, r, System.out, null);
pumper.setName("pump" + id);
message("top: node=" + id + " started at tcpPort=" + tcpPort + " initCommitVersion=" + initCommitVersion + " initInfosVersion=" + initInfosVersion);
return new NodeProcess(p, id, tcpPort, pumper, isPrimary, initCommitVersion, initInfosVersion);
}
- private void nodeClosed(int id) {
- NodeProcess oldNode = nodes[id];
- if (primary != null && oldNode == primary) {
- message("top: " + primary + ": primary process finished");
- primary = null;
- primaryGen++;
- } else {
- message("top: " + oldNode + ": replica process finished");
+ public void testReplicateDeleteAllDocuments() throws Exception {
+
+ Node.globalStartNS = System.nanoTime();
+ childTempDir = createTempDir("child");
+
+ message("change thread name from " + Thread.currentThread().getName());
+ Thread.currentThread().setName("main");
+
+ Path primaryPath = createTempDir("primary");
+ NodeProcess primary = startNode(-1, 0, primaryPath, true, -1);
+
+ Path replicaPath = createTempDir("replica");
+ NodeProcess replica = startNode(primary.tcpPort, 1, replicaPath, false, -1);
+
+ // Tell primary current replicas:
+ try (Connection c = new Connection(primary.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_SET_REPLICAS);
+ c.out.writeVInt(1);
+ c.out.writeVInt(replica.id);
+ c.out.writeVInt(replica.tcpPort);
+ c.flush();
+ c.in.readByte();
}
- if (oldNode != null) {
- oldNode.isOpen = false;
+
+ // Index 10 docs into primary:
+ LineFileDocs docs = new LineFileDocs(random());
+ Connection primaryC = new Connection(primary.tcpPort);
+ primaryC.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+ for(int i=0;i<10;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(primaryC, doc, false);
}
- nodes[id] = null;
- nodeTimeStamps[id] = System.nanoTime();
- sendReplicasToPrimary();
- }
+ // Nothing in replica index yet
+ Connection replicaC = new Connection(replica.tcpPort);
+ replicaC.out.writeByte(SimplePrimaryNode.CMD_SEARCH_ALL);
+ replicaC.flush();
+ long version1 = replicaC.in.readVLong();
+ assertEquals(0L, version1);
+ int hitCount = replicaC.in.readVInt();
+ assertEquals(0, hitCount);
- /** Sends currently alive replicas to primary, which uses this to know who to notify when it does a refresh */
- private void sendReplicasToPrimary() {
- NodeProcess curPrimary = primary;
- if (curPrimary != null) {
- List<NodeProcess> replicas = new ArrayList<>();
- for (NodeProcess node : nodes) {
- if (node != null && node.isPrimary == false) {
- replicas.add(node);
- }
- }
+ // Refresh primary, which also pushes to replica:
+ long primaryVersion1 = primary.flush();
+ assertTrue(primaryVersion1 > 0);
- message("top: send " + replicas.size() + " replicas to primary");
+ long version2;
- try (Connection c = new Connection(curPrimary.tcpPort)) {
- c.out.writeByte(SimplePrimaryNode.CMD_SET_REPLICAS);
- c.out.writeVInt(replicas.size());
- for(NodeProcess replica : replicas) {
- c.out.writeVInt(replica.id);
- c.out.writeVInt(replica.tcpPort);
- }
- c.flush();
- c.in.readByte();
- } catch (Throwable t) {
- message("top: ignore exc sending replicas to primary: " + t);
+ // Wait for replica to show the change
+ while (true) {
+ replicaC.out.writeByte(SimplePrimaryNode.CMD_SEARCH_ALL);
+ replicaC.flush();
+ version2 = replicaC.in.readVLong();
+ hitCount = replicaC.in.readVInt();
+ if (hitCount == 10) {
+ assertTrue(version2 > version1);
+ // good!
+ break;
}
+ Thread.sleep(10);
}
- }
- void addVersionMarker(long version, int count) {
- //System.out.println("ADD VERSION MARKER version=" + version + " count=" + count);
- if (versionToMarker.containsKey(version)) {
- int curCount = versionToMarker.get(version);
- if (curCount != count) {
- message("top: wrong marker count version=" + version + " count=" + count + " curCount=" + curCount);
- throw new IllegalStateException("version=" + version + " count=" + count + " curCount=" + curCount);
+ // Delete all docs from primary
+ if (random().nextBoolean()) {
+ // Inefficiently:
+ for(int id=0;id<10;id++) {
+ primary.deleteDocument(primaryC, Integer.toString(id));
}
} else {
- message("top: record marker count: version=" + version + " count=" + count);
- versionToMarker.put(version, count);
+ // Efficiently:
+ primary.deleteAllDocuments(primaryC);
}
- }
-
- void addTransLogLoc(long version, long loc) {
- message("top: record transLogLoc: version=" + version + " loc=" + loc);
- versionToTransLogLocation.put(version, loc);
- }
-
- // Periodically wakes up and starts up any down nodes:
- private class RestartThread extends Thread {
- @Override
- public void run() {
-
- List<Thread> startupThreads = Collections.synchronizedList(new ArrayList<>());
-
- try {
- while (stop.get() == false) {
- Thread.sleep(TestUtil.nextInt(random(), 50, 500));
- message("top: restarter cycle");
-
- // Randomly crash full cluster:
- if (DO_FULL_CLUSTER_CRASH && random().nextInt(50) == 17) {
- message("top: full cluster crash");
- for(int i=0;i<nodes.length;i++) {
- if (starting[i]) {
- message("N" + i + ": top: wait for startup so we can crash...");
- while (starting[i]) {
- Thread.sleep(10);
- }
- message("N" + i + ": top: done wait for startup");
- }
- NodeProcess node = nodes[i];
- if (node != null) {
- crashingNodes.add(i);
- message("top: N" + node.id + ": top: now crash node");
- node.crash();
- message("top: N" + node.id + ": top: done crash node");
- }
- }
- }
-
- List<Integer> downNodes = new ArrayList<>();
- StringBuilder b = new StringBuilder();
- long nowNS = System.nanoTime();
- for(int i=0;i<nodes.length;i++) {
- b.append(' ');
- double sec = (nowNS - nodeTimeStamps[i])/1000000000.0;
- String prefix;
- if (nodes[i] == null) {
- downNodes.add(i);
- if (starting[i]) {
- prefix = "s";
- } else {
- prefix = "x";
- }
- } else {
- prefix = "";
- }
- if (primary != null && nodes[i] == primary) {
- prefix += "p";
- }
- b.append(String.format(Locale.ROOT, "%s%d(%.1fs)", prefix, i, sec));
- }
- message("node status" + b.toString());
- message("downNodes=" + downNodes);
-
- // If primary is down, promote a replica:
- if (primary == null) {
- if (anyNodesStarting()) {
- message("top: skip promote replica: nodes are still starting");
- continue;
- }
- promoteReplica();
- }
-
- // Randomly start up a down a replica:
-
- // Stop or start a replica
- if (downNodes.isEmpty() == false) {
- int idx = downNodes.get(random().nextInt(downNodes.size()));
- if (starting[idx] == false) {
- if (primary == null) {
- if (downNodes.size() == nodes.length) {
- // Cold start: entire cluster is down, start this node up as the new primary
- message("N" + idx + ": top: cold start as primary");
- startPrimary(idx);
- }
- } else if (random().nextDouble() < ((double) downNodes.size())/nodes.length) {
- // Start up replica:
- starting[idx] = true;
- message("N" + idx + ": top: start up: launch thread");
- Thread t = new Thread() {
- @Override
- public void run() {
- try {
- message("N" + idx + ": top: start up thread");
- nodes[idx] = startNode(idx, indexPaths[idx], false, -1);
- sendReplicasToPrimary();
- } catch (Throwable t) {
- failed.set(true);
- stop.set(true);
- throw new RuntimeException(t);
- } finally {
- starting[idx] = false;
- startupThreads.remove(Thread.currentThread());
- }
- }
- };
- t.setName("start R" + idx);
- t.start();
- startupThreads.add(t);
- }
- } else {
- message("node " + idx + " still starting");
- }
- }
- }
- System.out.println("Restarter: now stop: join " + startupThreads.size() + " startup threads");
-
- while (startupThreads.size() > 0) {
- Thread.sleep(10);
- }
+ // Replica still shows 10 docs:
+ replicaC.out.writeByte(SimplePrimaryNode.CMD_SEARCH_ALL);
+ replicaC.flush();
+ long version3 = replicaC.in.readVLong();
+ assertEquals(version2, version3);
+ hitCount = replicaC.in.readVInt();
+ assertEquals(10, hitCount);
+
+ // Refresh primary, which also pushes to replica:
+ long primaryVersion2 = primary.flush();
+ assertTrue(primaryVersion2 > primaryVersion1);
- } catch (Throwable t) {
- failed.set(true);
- stop.set(true);
- throw new RuntimeException(t);
+ // Wait for replica to show the change
+ long version4;
+ while (true) {
+ replicaC.out.writeByte(SimplePrimaryNode.CMD_SEARCH_ALL);
+ replicaC.flush();
+ version4 = replicaC.in.readVLong();
+ hitCount = replicaC.in.readVInt();
+ if (hitCount == 0) {
+ assertTrue(version4 > version3);
+ // good!
+ break;
}
+ Thread.sleep(10);
}
- }
-
- /** Randomly picks a node and runs a search against it */
- private class SearchThread extends Thread {
-
- @Override
- public void run() {
- // Maps version to number of hits for silly 'the' TermQuery:
- Query theQuery = new TermQuery(new Term("body", "the"));
-
- // Persists connections
- Map<Integer,Connection> connections = new HashMap<>();
-
- while (stop.get() == false) {
- NodeProcess node = nodes[random().nextInt(nodes.length)];
- if (node == null || node.isOpen == false) {
- continue;
- }
-
- if (node.lock.tryLock() == false) {
- // Node is in the process of closing or crashing or something
- continue;
- }
-
- try {
-
- Thread.currentThread().setName("Searcher node=" + node);
-
- //System.out.println("S: cycle; conns=" + connections);
-
- Connection c = connections.get(node.id);
-
- long version;
- try {
- if (c == null) {
- //System.out.println("S: new connection " + node.id + " " + Thread.currentThread().getName());
- c = new Connection(node.tcpPort);
- connections.put(node.id, c);
- } else {
- //System.out.println("S: reuse connection " + node.id + " " + Thread.currentThread().getName());
- }
-
- c.out.writeByte(SimplePrimaryNode.CMD_SEARCH);
- c.flush();
-
- while (c.sockIn.available() == 0) {
- if (stop.get()) {
- break;
- }
- if (node.isOpen == false) {
- throw new IOException("node closed");
- }
- Thread.sleep(1);
- }
- version = c.in.readVLong();
-
- while (c.sockIn.available() == 0) {
- if (stop.get()) {
- break;
- }
- if (node.isOpen == false) {
- throw new IOException("node closed");
- }
- Thread.sleep(1);
- }
- int hitCount = c.in.readVInt();
-
- Integer oldHitCount = hitCounts.get(version);
-
- // TODO: we never prune this map...
- if (oldHitCount == null) {
- hitCounts.put(version, hitCount);
- message("top: searcher: record search hitCount version=" + version + " hitCount=" + hitCount + " node=" + node);
- } else {
- // Just ensure that all nodes show the same hit count for
- // the same version, i.e. they really are replicas of one another:
- if (oldHitCount.intValue() != hitCount) {
- failed.set(true);
- stop.set(true);
- message("top: searcher: wrong version hitCount: version=" + version + " oldHitCount=" + oldHitCount.intValue() + " hitCount=" + hitCount);
- fail("version=" + version + " oldHitCount=" + oldHitCount.intValue() + " hitCount=" + hitCount);
- }
- }
- } catch (IOException ioe) {
- //message("top: searcher: ignore exc talking to node " + node + ": " + ioe);
- //ioe.printStackTrace(System.out);
- IOUtils.closeWhileHandlingException(c);
- connections.remove(node.id);
- continue;
- }
-
- // This can be null if we got the new primary after crash and that primary is still catching up (replaying xlog):
- Integer expectedAtLeastHitCount = versionToMarker.get(version);
-
- if (expectedAtLeastHitCount != null && expectedAtLeastHitCount > 0 && random().nextInt(10) == 7) {
- try {
- c.out.writeByte(SimplePrimaryNode.CMD_MARKER_SEARCH);
- c.flush();
- while (c.sockIn.available() == 0) {
- if (stop.get()) {
- break;
- }
- if (node.isOpen == false) {
- throw new IOException("node died");
- }
- Thread.sleep(1);
- }
-
- version = c.in.readVLong();
- while (c.sockIn.available() == 0) {
- if (stop.get()) {
- break;
- }
- if (node.isOpen == false) {
- throw new IOException("node died");
- }
- Thread.sleep(1);
- }
-
- int hitCount = c.in.readVInt();
-
- // Look for data loss: make sure all marker docs are visible:
-
- if (hitCount < expectedAtLeastHitCount) {
-
- String failMessage = "node=" + node + ": documents were lost version=" + version + " hitCount=" + hitCount + " vs expectedAtLeastHitCount=" + expectedAtLeastHitCount;
- message(failMessage);
- failed.set(true);
- stop.set(true);
- fail(failMessage);
- }
- } catch (IOException ioe) {
- //message("top: searcher: ignore exc talking to node " + node + ": " + ioe);
- //throw new RuntimeException(ioe);
- //ioe.printStackTrace(System.out);
- IOUtils.closeWhileHandlingException(c);
- connections.remove(node.id);
- continue;
- }
- }
+ // Index 10 docs again:
+ for(int i=0;i<10;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(primaryC, doc, false);
+ }
- Thread.sleep(10);
+ // Refresh primary, which also pushes to replica:
+ long primaryVersion3 = primary.flush();
+ assertTrue(primaryVersion3 > primaryVersion2);
- } catch (Throwable t) {
- failed.set(true);
- stop.set(true);
- throw new RuntimeException(t);
- } finally {
- node.lock.unlock();
- }
+ // Wait for replica to show the change
+ while (true) {
+ replicaC.out.writeByte(SimplePrimaryNode.CMD_SEARCH_ALL);
+ replicaC.flush();
+ long version5 = replicaC.in.readVLong();
+ hitCount = replicaC.in.readVInt();
+ if (hitCount == 10) {
+ assertTrue(version5 > version4);
+ // good!
+ break;
}
- System.out.println("Searcher: now stop");
- IOUtils.closeWhileHandlingException(connections.values());
+ Thread.sleep(10);
}
- }
-
- private class IndexThread extends Thread {
-
- @Override
- public void run() {
-
- try {
- LineFileDocs docs = new LineFileDocs(random());
- int docCount = 0;
-
- // How often we do an update/delete vs add:
- double updatePct = random().nextDouble();
-
- // Varies how many docs/sec we index:
- int sleepChance = TestUtil.nextInt(random(), 4, 100);
- message("top: indexer: updatePct=" + updatePct + " sleepChance=" + sleepChance);
+ replicaC.close();
+ primaryC.close();
- long lastTransLogLoc = transLog.getNextLocation();
-
- NodeProcess curPrimary = null;
- Connection c = null;
-
- while (stop.get() == false) {
-
- try {
- while (stop.get() == false && curPrimary == null) {
- Thread.sleep(10);
- curPrimary = primary;
- if (curPrimary != null) {
- c = new Connection(curPrimary.tcpPort);
- c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
- break;
- }
- }
-
- if (stop.get()) {
- break;
- }
-
- Thread.currentThread().setName("indexer p" + curPrimary.id);
-
- if (random().nextInt(10) == 7) {
- // We use the marker docs to check for data loss in search thread:
- Document doc = new Document();
- int id = markerUpto.getAndIncrement();
- String idString = "m"+id;
- doc.add(newStringField("docid", idString, Field.Store.YES));
- doc.add(newStringField("marker", "marker", Field.Store.YES));
- curPrimary.addOrUpdateDocument(c, doc, false);
- transLog.addDocument(idString, doc);
- message("index marker=" + idString + "; translog is " + Node.bytesToString(Files.size(transLogPath)));
- }
-
- if (docCount > 0 && random().nextDouble() < updatePct) {
- int randomID = random().nextInt(docCount);
- String randomIDString = Integer.toString(randomID);
- if (random().nextBoolean()) {
- // Replace previous doc
- Document doc = docs.nextDoc();
- ((Field) doc.getField("docid")).setStringValue(randomIDString);
- curPrimary.addOrUpdateDocument(c, doc, true);
- transLog.updateDocument(randomIDString, doc);
- } else {
- // Delete previous doc
- curPrimary.deleteDocument(c, randomIDString);
- transLog.deleteDocuments(randomIDString);
- }
- } else {
- // Add new doc:
- Document doc = docs.nextDoc();
- String idString = Integer.toString(docCount++);
- ((Field) doc.getField("docid")).setStringValue(idString);
- curPrimary.addOrUpdateDocument(c, doc, false);
- transLog.addDocument(idString, doc);
-
- if (DO_RANDOM_XLOG_REPLAY && random().nextInt(10) == 7) {
- long curLoc = transLog.getNextLocation();
- // randomly replay chunks of translog just to test replay:
- message("now randomly replay translog from " + lastTransLogLoc + " to " + curLoc);
- transLog.replay(curPrimary, lastTransLogLoc, curLoc);
- lastTransLogLoc = curLoc;
- }
- }
- } catch (IOException se) {
- // Assume primary crashed
- message("top: indexer lost connection to primary");
- try {
- c.close();
- } catch (Throwable t) {
- }
- curPrimary = null;
- c = null;
- }
-
- if (random().nextInt(sleepChance) == 0) {
- Thread.sleep(1);
- }
-
- if (random().nextInt(100) == 17) {
- System.out.println("Indexer: now pause for a bit...");
- Thread.sleep(TestUtil.nextInt(random(), 500, 2000));
- System.out.println("Indexer: done pause for a bit...");
- }
- }
- if (curPrimary != null) {
- try {
- c.out.writeByte(SimplePrimaryNode.CMD_INDEXING_DONE);
- c.flush();
- c.in.readByte();
- } catch (IOException se) {
- // Assume primary crashed
- message("top: indexer lost connection to primary");
- try {
- c.close();
- } catch (Throwable t) {
- }
- curPrimary = null;
- c = null;
- }
- }
- System.out.println("Indexer: now stop");
- } catch (Throwable t) {
- failed.set(true);
- stop.set(true);
- throw new RuntimeException(t);
- }
- }
+ replica.close();
+ primary.close();
}
static void message(String message) {