You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/02/11 17:42:42 UTC
[08/31] lucene-solr git commit: reduce some verbosity;
fix a silly vInt/int disagreement in the test protocol;
reduce indexing rate so translog replay can keep up vs primary crashing itself
reduce some verbosity; fix a silly vInt/int disagreement in the test protocol; reduce indexing rate so translog replay can keep up vs primary crashing itself
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/8889469b
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/8889469b
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/8889469b
Branch: refs/heads/master
Commit: 8889469b8bb1445133c39069adfdb5db028dbad1
Parents: 022540e
Author: Mike McCandless <mi...@apache.org>
Authored: Tue Feb 2 06:09:57 2016 -0500
Committer: Mike McCandless <mi...@apache.org>
Committed: Tue Feb 2 06:09:57 2016 -0500
----------------------------------------------------------------------
.../org/apache/lucene/index/SegmentInfos.java | 2 +-
.../lucene/replicator/nrt/PrimaryNode.java | 7 +-
.../lucene/replicator/nrt/ReplicaNode.java | 26 +++--
.../lucene/replicator/nrt/NodeProcess.java | 22 +++-
.../replicator/nrt/SimplePrimaryNode.java | 55 ++++++++--
.../replicator/nrt/SimpleReplicaNode.java | 27 ++++-
.../lucene/replicator/nrt/SimpleServer.java | 20 +++-
.../lucene/replicator/nrt/SimpleTransLog.java | 2 +-
.../replicator/nrt/TestNRTReplication.java | 86 ++++++++++++++-
.../nrt/TestStressNRTReplication.java | 107 ++++++++++++-------
.../lucene/replicator/nrt/ThreadPumper.java | 6 +-
lucene/replicator/test.cmd | 2 +-
12 files changed, 285 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java b/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
index 2f8d914..046f3ea 100644
--- a/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
+++ b/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
@@ -879,7 +879,7 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
* segments. */
public void changed() {
version++;
- System.out.println(Thread.currentThread().getName() + ": SIS.change to version=" + version);
+ //System.out.println(Thread.currentThread().getName() + ": SIS.change to version=" + version);
//new Throwable().printStackTrace(System.out);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
index ccd8848..3cff95e 100644
--- a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
@@ -104,7 +104,8 @@ public abstract class PrimaryNode extends Node {
IndexSearcher s = mgr.acquire();
try {
- message("init: marker hit count: " + s.search(new TermQuery(new Term("marker", "marker")), 1).totalHits);
+ // TODO: this is test code specific!!
+ message("init: marker count: " + s.count(new TermQuery(new Term("marker", "marker"))));
} finally {
mgr.release(s);
}
@@ -135,7 +136,7 @@ public abstract class PrimaryNode extends Node {
if (result) {
message("top: opened NRT reader version=" + curInfos.getVersion());
finishedMergedFiles.removeAll(completedMergeFiles);
- message("flushAndRefresh: version=" + curInfos.getVersion() + " completedMergeFiles=" + completedMergeFiles + " finishedMergedFiles=" + finishedMergedFiles);
+ message("flushAndRefresh: version=" + curInfos.getVersion() + " completedMergeFiles=" + completedMergeFiles + " finishedMergedFiles=" + finishedMergedFiles);
} else {
message("top: no changes in flushAndRefresh; still version=" + curInfos.getVersion());
}
@@ -208,6 +209,8 @@ public abstract class PrimaryNode extends Node {
try {
searcher = mgr.acquire();
infos = ((StandardDirectoryReader) searcher.getIndexReader()).getSegmentInfos();
+ // TODO: this is test code specific!!
+ message("setCurrentInfos: marker count: " + searcher.count(new TermQuery(new Term("marker", "marker"))) + " version=" + infos.getVersion() + " searcher=" + searcher);
} finally {
if (searcher != null) {
mgr.release(searcher);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
index 713c6f1..c7af429 100644
--- a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.SegmentInfos;
@@ -152,8 +153,6 @@ abstract class ReplicaNode extends Node {
deleter.deleteUnknownFiles(segmentsFileName);
message("top: done delete unknown files on init: all files=" + Arrays.toString(dir.listAll()));
- // nocommit make test where index has all docs deleted (all segments dropped, so 0 segments) and is then replicated
-
String s = infos.getUserData().get(PRIMARY_GEN_KEY);
long myPrimaryGen;
if (s == null) {
@@ -277,6 +276,15 @@ abstract class ReplicaNode extends Node {
// Finally, we are open for business, since our index now "agrees" with the primary:
mgr = new SegmentInfosSearcherManager(dir, this, infos, searcherFactory);
+ IndexSearcher searcher = mgr.acquire();
+ try {
+ // TODO: this is test specific:
+ int hitCount = searcher.count(new TermQuery(new Term("marker", "marker")));
+ message("top: marker count=" + hitCount + " version=" + ((DirectoryReader) searcher.getIndexReader()).getVersion());
+ } finally {
+ mgr.release(searcher);
+ }
+
// Must commit after init mgr:
if (doCommit) {
// Very important to commit what we just sync'd over, because we removed the pre-existing commit point above if we had to
@@ -287,9 +295,13 @@ abstract class ReplicaNode extends Node {
message("top: done start");
state = "idle";
} catch (Throwable t) {
- message("exc on start:");
- t.printStackTrace(System.out);
- throw new RuntimeException(t);
+ if (t.getMessage().startsWith("replica cannot start") == false) {
+ message("exc on start:");
+ t.printStackTrace(System.out);
+ } else {
+ dir.close();
+ }
+ IOUtils.reThrow(t);
}
}
@@ -418,7 +430,7 @@ abstract class ReplicaNode extends Node {
int markerCount;
IndexSearcher s = mgr.acquire();
try {
- markerCount = s.search(new TermQuery(new Term("marker", "marker")), 1).totalHits;
+ markerCount = s.count(new TermQuery(new Term("marker", "marker")));
} finally {
mgr.release(s);
}
@@ -496,7 +508,7 @@ abstract class ReplicaNode extends Node {
} catch (NodeCommunicationException nce) {
// E.g. primary could crash/close when we are asking it for the copy state:
message("top: ignoring communication exception creating CopyJob: " + nce);
- nce.printStackTrace(System.out);
+ //nce.printStackTrace(System.out);
if (state.equals("syncing")) {
state = "idle";
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
index 9de2c04..dcbef87 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
@@ -19,6 +19,7 @@ package org.apache.lucene.replicator.nrt;
import java.io.Closeable;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.document.Document;
@@ -47,7 +48,9 @@ class NodeProcess implements Closeable {
volatile boolean isOpen = true;
- public NodeProcess(Process p, int id, int tcpPort, Thread pumper, boolean isPrimary, long initCommitVersion, long initInfosVersion) {
+ final AtomicBoolean nodeIsClosing;
+
+ public NodeProcess(Process p, int id, int tcpPort, Thread pumper, boolean isPrimary, long initCommitVersion, long initInfosVersion, AtomicBoolean nodeIsClosing) {
this.p = p;
this.id = id;
this.tcpPort = tcpPort;
@@ -55,6 +58,7 @@ class NodeProcess implements Closeable {
this.isPrimary = isPrimary;
this.initCommitVersion = initCommitVersion;
this.initInfosVersion = initInfosVersion;
+ this.nodeIsClosing = nodeIsClosing;
assert initInfosVersion >= initCommitVersion: "initInfosVersion=" + initInfosVersion + " initCommitVersion=" + initCommitVersion;
lock = new ReentrantLock();
}
@@ -122,10 +126,11 @@ class NodeProcess implements Closeable {
/** Ask the primary node process to flush. We send it all currently up replicas so it can notify them about the new NRT point. Returns the newly
* flushed version, or a negative (current) version if there were no changes. */
- public synchronized long flush() throws IOException {
+ public synchronized long flush(int atLeastMarkerCount) throws IOException {
assert isPrimary;
try (Connection c = new Connection(tcpPort)) {
c.out.writeByte(SimplePrimaryNode.CMD_FLUSH);
+ c.out.writeVInt(atLeastMarkerCount);
c.flush();
c.s.shutdownOutput();
return c.in.readLong();
@@ -218,7 +223,7 @@ class NodeProcess implements Closeable {
public void deleteDocument(Connection c, String docid) throws IOException {
if (isPrimary == false) {
- throw new IllegalStateException("only primary can index");
+ throw new IllegalStateException("only primary can delete documents");
}
c.out.writeByte(SimplePrimaryNode.CMD_DELETE_DOC);
c.out.writeString(docid);
@@ -228,11 +233,20 @@ class NodeProcess implements Closeable {
public void deleteAllDocuments(Connection c) throws IOException {
if (isPrimary == false) {
- throw new IllegalStateException("only primary can index");
+ throw new IllegalStateException("only primary can delete documents");
}
c.out.writeByte(SimplePrimaryNode.CMD_DELETE_ALL_DOCS);
c.flush();
c.in.readByte();
}
+
+ public void forceMerge(Connection c) throws IOException {
+ if (isPrimary == false) {
+ throw new IllegalStateException("only primary can force merge");
+ }
+ c.out.writeByte(SimplePrimaryNode.CMD_FORCE_MERGE);
+ c.flush();
+ c.in.readByte();
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
index 0afd1b4..b9ecced 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
@@ -53,8 +53,10 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.Directory;
@@ -272,6 +274,8 @@ class SimplePrimaryNode extends PrimaryNode {
private void handleFlush(DataInput topIn, DataOutput topOut, BufferedOutputStream bos) throws IOException {
Thread.currentThread().setName("flush");
+ int atLeastMarkerCount = topIn.readVInt();
+
int[] replicaTCPPorts;
int[] replicaIDs;
synchronized (this) {
@@ -284,6 +288,8 @@ class SimplePrimaryNode extends PrimaryNode {
if (flushAndRefresh()) {
// Something did get flushed (there were indexing ops since the last flush):
+ verifyAtLeastMarkerCount(atLeastMarkerCount, null);
+
// Tell caller the version before pushing to replicas, so that even if we crash after this, caller will know what version we
// (possibly) pushed to some replicas. Alternatively we could make this 2 separate ops?
long version = getCopyStateVersion();
@@ -295,6 +301,7 @@ class SimplePrimaryNode extends PrimaryNode {
for(int i=0;i<replicaIDs.length;i++) {
int replicaID = replicaIDs[i];
try (Connection c = new Connection(replicaTCPPorts[i])) {
+ message("send NEW_NRT_POINT to R" + replicaID + " at tcpPort=" + replicaTCPPorts[i]);
c.out.writeByte(SimpleReplicaNode.CMD_NEW_NRT_POINT);
c.out.writeVLong(version);
c.out.writeInt(tcpPort);
@@ -452,6 +459,10 @@ class SimplePrimaryNode extends PrimaryNode {
writer.deleteAll();
out.writeByte((byte) 1);
bos.flush();
+ } else if (cmd == CMD_FORCE_MERGE) {
+ writer.forceMerge(1);
+ out.writeByte((byte) 1);
+ bos.flush();
} else if (cmd == CMD_INDEXING_DONE) {
out.writeByte((byte) 1);
bos.flush();
@@ -480,7 +491,6 @@ class SimplePrimaryNode extends PrimaryNode {
throw new IllegalArgumentException("unhandled field name " + name);
}
}
-
writer.addDocument(doc);
}
@@ -537,6 +547,7 @@ class SimplePrimaryNode extends PrimaryNode {
static final byte CMD_DELETE_DOC = 8;
static final byte CMD_INDEXING_DONE = 19;
static final byte CMD_DELETE_ALL_DOCS = 22;
+ static final byte CMD_FORCE_MERGE = 23;
// Sent by replica to primary when replica first starts up, so primary can add it to any warming merges:
static final byte CMD_NEW_REPLICA = 20;
@@ -617,15 +628,8 @@ class SimplePrimaryNode extends PrimaryNode {
case CMD_MARKER_SEARCH:
{
Thread.currentThread().setName("msearch");
- IndexSearcher searcher = mgr.acquire();
- try {
- long version = ((DirectoryReader) searcher.getIndexReader()).getVersion();
- int hitCount = searcher.search(new TermQuery(new Term("marker", "marker")), 1).totalHits;
- out.writeVLong(version);
- out.writeVInt(hitCount);
- } finally {
- mgr.release(searcher);
- }
+ int expectedAtLeastCount = in.readVInt();
+ verifyAtLeastMarkerCount(expectedAtLeastCount, out);
}
continue outer;
@@ -706,4 +710,35 @@ class SimplePrimaryNode extends PrimaryNode {
break;
}
}
+
+ private void verifyAtLeastMarkerCount(int expectedAtLeastCount, DataOutput out) throws IOException {
+ IndexSearcher searcher = mgr.acquire();
+ try {
+ long version = ((DirectoryReader) searcher.getIndexReader()).getVersion();
+ int hitCount = searcher.count(new TermQuery(new Term("marker", "marker")));
+
+ if (hitCount < expectedAtLeastCount) {
+ message("marker search: expectedAtLeastCount=" + expectedAtLeastCount + " but hitCount=" + hitCount);
+ TopDocs hits = searcher.search(new TermQuery(new Term("marker", "marker")), expectedAtLeastCount);
+ List<Integer> seen = new ArrayList<>();
+ for(ScoreDoc hit : hits.scoreDocs) {
+ Document doc = searcher.doc(hit.doc);
+ seen.add(Integer.parseInt(doc.get("docid").substring(1)));
+ }
+ Collections.sort(seen);
+ message("saw markers:");
+ for(int marker : seen) {
+ message("saw m" + marker);
+ }
+ throw new IllegalStateException("at flush: marker count " + hitCount + " but expected at least " + expectedAtLeastCount + " version=" + version);
+ }
+
+ if (out != null) {
+ out.writeVLong(version);
+ out.writeVInt(hitCount);
+ }
+ } finally {
+ mgr.release(searcher);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
index 27a5547..bc8bb03 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
@@ -24,18 +24,24 @@ import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.Directory;
@@ -148,6 +154,7 @@ class SimpleReplicaNode extends ReplicaNode {
/** Handles incoming request to the naive TCP server wrapping this node */
void handleOneConnection(ServerSocket ss, AtomicBoolean stop, InputStream is, Socket socket, DataInput in, DataOutput out, BufferedOutputStream bos) throws IOException, InterruptedException {
+ //message("one connection: " + socket);
outer:
while (true) {
byte cmd;
@@ -173,6 +180,7 @@ class SimpleReplicaNode extends ReplicaNode {
long version = in.readVLong();
Thread.currentThread().setName("recv-" + version);
curPrimaryTCPPort = in.readInt();
+ message("newNRTPoint primaryTCPPort=" + curPrimaryTCPPort);
newNRTPoint(version);
}
break;
@@ -235,10 +243,26 @@ class SimpleReplicaNode extends ReplicaNode {
case SimplePrimaryNode.CMD_MARKER_SEARCH:
{
Thread.currentThread().setName("msearch");
+ int expectedAtLeastCount = in.readVInt();
IndexSearcher searcher = mgr.acquire();
try {
long version = ((DirectoryReader) searcher.getIndexReader()).getVersion();
- int hitCount = searcher.search(new TermQuery(new Term("marker", "marker")), 1).totalHits;
+ int hitCount = searcher.count(new TermQuery(new Term("marker", "marker")));
+ if (hitCount < expectedAtLeastCount) {
+ message("marker search: expectedAtLeastCount=" + expectedAtLeastCount + " but hitCount=" + hitCount);
+ TopDocs hits = searcher.search(new TermQuery(new Term("marker", "marker")), expectedAtLeastCount);
+ List<Integer> seen = new ArrayList<>();
+ for(ScoreDoc hit : hits.scoreDocs) {
+ Document doc = searcher.doc(hit.doc);
+ seen.add(Integer.parseInt(doc.get("docid").substring(1)));
+ }
+ Collections.sort(seen);
+ message("saw markers:");
+ for(int marker : seen) {
+ message("saw m" + marker);
+ }
+ }
+
out.writeVLong(version);
out.writeVInt(hitCount);
bos.flush();
@@ -305,7 +329,6 @@ class SimpleReplicaNode extends ReplicaNode {
default:
throw new IllegalArgumentException("unrecognized cmd=" + cmd);
}
- System.out.println("NOW FLUSH");
bos.flush();
break;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java
index 5b04721..3fdc45f 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleServer.java
@@ -22,6 +22,7 @@ import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
@@ -42,9 +43,9 @@ import org.apache.lucene.store.InputStreamDataInput;
import org.apache.lucene.store.OutputStreamDataOutput;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils;
-import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.lucene.util.LuceneTestCase.SuppressSysoutChecks;
+import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
/** Child process with silly naive TCP socket server to handle
@@ -99,8 +100,9 @@ public class SimpleServer extends LuceneTestCase {
success = true;
} catch (Throwable t) {
if (t instanceof SocketException == false && t instanceof NodeCommunicationException == false) {
- node.message("unexpected exception handling client connection:");
+ node.message("unexpected exception handling client connection; now failing test:");
t.printStackTrace(System.out);
+ IOUtils.closeWhileHandlingException(ss);
// Test should fail with this:
throw new RuntimeException(t);
} else {
@@ -218,7 +220,7 @@ public class SimpleServer extends LuceneTestCase {
public void test() throws Exception {
int id = Integer.parseInt(System.getProperty("tests.nrtreplication.nodeid"));
- Thread.currentThread().setName("init child " + id);
+ Thread.currentThread().setName("main child " + id);
Path indexPath = Paths.get(System.getProperty("tests.nrtreplication.indexpath"));
boolean isPrimary = System.getProperty("tests.nrtreplication.isPrimary") != null;
int primaryTCPPort;
@@ -238,7 +240,7 @@ public class SimpleServer extends LuceneTestCase {
boolean doFlipBitsDuringCopy = "true".equals(System.getProperty("tests.nrtreplication.doFlipBitsDuringCopy"));
// Create server socket that we listen for incoming requests on:
- try (final ServerSocket ss = new ServerSocket(0)) {
+ try (final ServerSocket ss = new ServerSocket(0, 0, InetAddress.getLoopbackAddress())) {
int tcpPort = ((InetSocketAddress) ss.getLocalSocketAddress()).getPort();
System.out.println("\nPORT: " + tcpPort);
@@ -247,7 +249,15 @@ public class SimpleServer extends LuceneTestCase {
node = new SimplePrimaryNode(random(), indexPath, id, tcpPort, primaryGen, forcePrimaryVersion, null, doFlipBitsDuringCopy);
System.out.println("\nCOMMIT VERSION: " + ((PrimaryNode) node).getLastCommitVersion());
} else {
- node = new SimpleReplicaNode(random(), id, tcpPort, indexPath, primaryGen, primaryTCPPort, null);
+ try {
+ node = new SimpleReplicaNode(random(), id, tcpPort, indexPath, primaryGen, primaryTCPPort, null);
+ } catch (RuntimeException re) {
+ if (re.getMessage().startsWith("replica cannot start")) {
+ // this is "OK": it means MDW's refusal to delete a segments_N commit point means we cannot start:
+ assumeTrue(re.getMessage(), false);
+ }
+ throw re;
+ }
}
System.out.println("\nINFOS VERSION: " + node.getCurrentSearchingVersion());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleTransLog.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleTransLog.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleTransLog.java
index d409ffc..4c11e52 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleTransLog.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleTransLog.java
@@ -197,7 +197,7 @@ class SimpleTransLog implements Closeable {
}
String marker = readNullableString(in);
if (marker != null) {
- //System.out.println("xlog: replay marker=" + id);
+ //TestStressNRTReplication.message("xlog: replay marker=" + id);
doc.add(new StringField("marker", marker, Field.Store.YES));
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
index 755b234..b2240eb 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
@@ -231,8 +231,8 @@ public class TestNRTReplication extends LuceneTestCase {
replicaC.flush();
version2 = replicaC.in.readVLong();
hitCount = replicaC.in.readVInt();
- if (hitCount == 10) {
- assertTrue(version2 > version1);
+ if (version2 == primaryVersion1) {
+ assertEquals(10, hitCount);
// good!
break;
}
@@ -269,8 +269,9 @@ public class TestNRTReplication extends LuceneTestCase {
replicaC.flush();
version4 = replicaC.in.readVLong();
hitCount = replicaC.in.readVInt();
- if (hitCount == 0) {
+ if (version4 == primaryVersion2) {
assertTrue(version4 > version3);
+ assertEquals(0, hitCount);
// good!
break;
}
@@ -293,7 +294,8 @@ public class TestNRTReplication extends LuceneTestCase {
replicaC.flush();
long version5 = replicaC.in.readVLong();
hitCount = replicaC.in.readVInt();
- if (hitCount == 10) {
+ if (version5 == primaryVersion3) {
+ assertEquals(10, hitCount);
assertTrue(version5 > version4);
// good!
break;
@@ -308,6 +310,82 @@ public class TestNRTReplication extends LuceneTestCase {
primary.close();
}
+ public void testReplicateForceMerge() throws Exception {
+
+ Node.globalStartNS = System.nanoTime();
+ childTempDir = createTempDir("child");
+
+ message("change thread name from " + Thread.currentThread().getName());
+ Thread.currentThread().setName("main");
+
+ Path primaryPath = createTempDir("primary");
+ NodeProcess primary = startNode(-1, 0, primaryPath, true, -1);
+
+ Path replicaPath = createTempDir("replica");
+ NodeProcess replica = startNode(primary.tcpPort, 1, replicaPath, false, -1);
+
+ // Tell primary current replicas:
+ try (Connection c = new Connection(primary.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_SET_REPLICAS);
+ c.out.writeVInt(1);
+ c.out.writeVInt(replica.id);
+ c.out.writeVInt(replica.tcpPort);
+ c.flush();
+ c.in.readByte();
+ }
+
+ // Index 10 docs into primary:
+ LineFileDocs docs = new LineFileDocs(random());
+ Connection primaryC = new Connection(primary.tcpPort);
+ primaryC.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+ for(int i=0;i<10;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(primaryC, doc, false);
+ }
+
+ // Refresh primary, which also pushes to replica:
+ long primaryVersion1 = primary.flush();
+ assertTrue(primaryVersion1 > 0);
+
+ // Index 10 more docs into primary:
+ for(int i=0;i<10;i++) {
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(primaryC, doc, false);
+ }
+
+ // Refresh primary, which also pushes to replica:
+ long primaryVersion2 = primary.flush();
+ assertTrue(primaryVersion2 > primaryVersion1);
+
+ primary.forceMerge(primaryC);
+
+ // Refresh primary, which also pushes to replica:
+ long primaryVersion3 = primary.flush();
+ assertTrue(primaryVersion3 > primaryVersion2);
+
+ Connection replicaC = new Connection(replica.tcpPort);
+
+ // Wait for replica to show the change
+ while (true) {
+ replicaC.out.writeByte(SimplePrimaryNode.CMD_SEARCH_ALL);
+ replicaC.flush();
+ long version = replicaC.in.readVLong();
+ int hitCount = replicaC.in.readVInt();
+ if (version == primaryVersion3) {
+ assertEquals(20, hitCount);
+ // good!
+ break;
+ }
+ Thread.sleep(10);
+ }
+
+ replicaC.close();
+ primaryC.close();
+
+ replica.close();
+ primary.close();
+ }
+
static void message(String message) {
long now = System.nanoTime();
System.out.println(String.format(Locale.ROOT,
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java
index 271c5d2..63ff12a 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java
@@ -58,10 +58,21 @@ import org.apache.lucene.util.ThreadInterruptedException;
import com.carrotsearch.randomizedtesting.SeedUtils;
+// nocommit why so many "hit SocketException during commit with R0"?
+
+// nocommit why so much time when so many nodes are down
+
+// nocommit indexing is too fast? (xlog replay fails to finish before primary crashes itself)
+
+// nocommit why all these NodeCommunicationExcs?
+
+// nocommit the sockets are a pita on jvm crashing ...
+
/*
TODO
- fangs
- sometimes have one replica be really slow at copying / have random pauses (fake GC) / etc.
+ - graceful primary close
- why do we do the "rename temp to actual" all at the end...? what really does that buy us?
- replica should also track maxSegmentName its seen, and tap into inflateGens if it's later promoted to primary?
- test should not print scary exceptions and then succeed!
@@ -137,11 +148,12 @@ public class TestStressNRTReplication extends LuceneTestCase {
/** Randomly gracefully close a replica; it will later be restarted and sync itself. */
static final boolean DO_CLOSE_REPLICA = true;
+ /** Randomly gracefully close the primary; it will later be restarted and sync itself. */
+ static final boolean DO_CLOSE_PRIMARY = true;
+
/** If false, all child + parent output is interleaved into single stdout/err */
static final boolean SEPARATE_CHILD_OUTPUT = false;
- // nocommit DO_CLOSE_PRIMARY?
-
/** Randomly crash whole cluster and then restart it */
static final boolean DO_FULL_CLUSTER_CRASH = true;
@@ -151,8 +163,6 @@ public class TestStressNRTReplication extends LuceneTestCase {
/** Set to a non-null value to force exactly that many nodes; else, it's random. */
static final Integer NUM_NODES = null;
- static final boolean DO_RANDOM_XLOG_REPLAY = false;
-
final AtomicBoolean failed = new AtomicBoolean();
final AtomicBoolean stop = new AtomicBoolean();
@@ -174,6 +184,7 @@ public class TestStressNRTReplication extends LuceneTestCase {
Path transLogPath;
SimpleTransLog transLog;
final AtomicInteger markerUpto = new AtomicInteger();
+ final AtomicInteger markerID = new AtomicInteger();
/** Maps searcher version to how many hits the query body:the matched. */
final Map<Long,Integer> hitCounts = new ConcurrentHashMap<>();
@@ -201,7 +212,11 @@ public class TestStressNRTReplication extends LuceneTestCase {
// Silly bootstrapping:
versionToTransLogLocation.put(0L, 0L);
- versionToTransLogLocation.put(1L, 0L);
+
+ // nocommit why also 1?
+ //versionToTransLogLocation.put(1L, 0L);
+
+ versionToMarker.put(0L, 0);
int numNodes;
@@ -268,23 +283,24 @@ public class TestStressNRTReplication extends LuceneTestCase {
// Wait a bit:
Thread.sleep(TestUtil.nextInt(random(), Math.min(runTimeSec*4, 200), runTimeSec*4));
if (primary != null && random().nextBoolean()) {
- message("top: now flush primary");
NodeProcess curPrimary = primary;
if (curPrimary != null) {
// Save these before we start flush:
long nextTransLogLoc = transLog.getNextLocation();
int markerUptoSav = markerUpto.get();
+ message("top: now flush primary; at least marker count=" + markerUptoSav);
long result;
try {
- result = primary.flush();
+ result = primary.flush(markerUptoSav);
} catch (Throwable t) {
message("top: flush failed; skipping: " + t.getMessage());
result = -1;
}
if (result > 0) {
// There were changes
+ message("top: flush finished with changed; new primary version=" + result);
lastPrimaryVersion = result;
addTransLogLoc(lastPrimaryVersion, nextTransLogLoc);
addVersionMarker(lastPrimaryVersion, markerUptoSav);
@@ -316,7 +332,7 @@ public class TestStressNRTReplication extends LuceneTestCase {
{
NodeProcess node = nodes[random().nextInt(nodes.length)];
- if (node != null) {
+ if (node != null && node.nodeIsClosing.get() == false) {
// TODO: if this node is primary, it means we committed a "partial" version (not exposed as an NRT point)... not sure it matters.
// maybe we somehow allow IW to commit a specific sis (the one we just flushed)?
message("top: now commit node=" + node);
@@ -452,27 +468,33 @@ public class TestStressNRTReplication extends LuceneTestCase {
addVersionMarker(newPrimary.initInfosVersion, markerCount);
assert newPrimary.initInfosVersion >= lastPrimaryVersion;
- message("top: now change lastPrimaryVersion from " + lastPrimaryVersion + " to " + newPrimary.initInfosVersion);
+ message("top: now change lastPrimaryVersion from " + lastPrimaryVersion + " to " + newPrimary.initInfosVersion + "; startup marker count " + markerCount);
lastPrimaryVersion = newPrimary.initInfosVersion;
- // Publish new primary, before replaying xlog. This means other indexing ops can come in at the same time as we catch up indexing
- // previous ops. Effectively, we have "forked" the indexing ops, by rolling back in time a bit, and replaying old indexing ops (from
- // translog) concurrently with new incoming ops.
- nodes[id] = newPrimary;
- primary = newPrimary;
-
- sendReplicasToPrimary();
-
long nextTransLogLoc = transLog.getNextLocation();
- int nextMarkerUpto = markerUpto.get();
- message("top: replay trans log " + startTransLogLoc + " (version=" + newPrimary.initCommitVersion + ") to " + nextTransLogLoc + " (translog end)");
+ long t0 = System.nanoTime();
+ message("top: start translog replay " + startTransLogLoc + " (version=" + newPrimary.initCommitVersion + ") to " + nextTransLogLoc + " (translog end)");
try {
transLog.replay(newPrimary, startTransLogLoc, nextTransLogLoc);
} catch (IOException ioe) {
+ // nocommit what if primary node is still running here, and we failed for some other reason?
message("top: replay xlog failed; abort");
return;
}
- message("top: done replay trans log");
+
+ long t1 = System.nanoTime();
+ message("top: done translog replay; took " + ((t1 - t0)/1000000.0) + " msec; now publish primary");
+
+ // Publish new primary only after translog has succeeded in replaying; this is important, for this test anyway, so we keep a "linear"
+ // history so enforcing marker counts is correct. E.g., if we publish first and replay translog concurrently with incoming ops, then
+ // a primary commit that happens while translog is still replaying will incorrectly record the translog loc into the commit user data
+ // when in fact that commit did NOT reflect all prior ops. So if we crash and start up again from that commit point, we are missing
+ // ops.
+ nodes[id] = newPrimary;
+ primary = newPrimary;
+
+ sendReplicasToPrimary();
+
}
/** Launches a child "server" (separate JVM), which is either primary or replica node */
@@ -506,6 +528,9 @@ public class TestStressNRTReplication extends LuceneTestCase {
if (DO_CRASH_PRIMARY) {
cmd.add("-Dtests.nrtreplication.doRandomCrash=true");
}
+ if (DO_CLOSE_PRIMARY) {
+ cmd.add("-Dtests.nrtreplication.doRandomClose=true");
+ }
} else {
if (DO_CRASH_REPLICA) {
cmd.add("-Dtests.nrtreplication.doRandomCrash=true");
@@ -544,7 +569,7 @@ public class TestStressNRTReplication extends LuceneTestCase {
childLog = null;
}
- message("child process command: " + cmd);
+ //message("child process command: " + cmd);
ProcessBuilder pb = new ProcessBuilder(cmd);
pb.redirectErrorStream(true);
@@ -577,6 +602,10 @@ public class TestStressNRTReplication extends LuceneTestCase {
throw new RuntimeException(ie);
}
message("exit value=" + p.exitValue());
+ if (p.exitValue() == 0) {
+ message("zero exit status; assuming failed to remove segments_N; skipping");
+ return null;
+ }
// Hackity hack, in case primary crashed/closed and we haven't noticed (reaped the process) yet:
if (isPrimary == false) {
@@ -586,8 +615,9 @@ public class TestStressNRTReplication extends LuceneTestCase {
message("failed to remove segments_N; skipping");
return null;
}
- for(int i=0;i<10;i++) {
- if (primaryGen != myPrimaryGen || primary == null) {
+ for(int i=0;i<100;i++) {
+ NodeProcess primary2 = primary;
+ if (primaryGen != myPrimaryGen || primary2 == null || primary2.nodeIsClosing.get()) {
// OK: primary crashed while we were trying to start, so it's expected/allowed that we could not start the replica:
message("primary crashed/closed while replica R" + id + " tried to start; skipping");
return null;
@@ -634,6 +664,7 @@ public class TestStressNRTReplication extends LuceneTestCase {
}
final boolean finalWillCrash = willCrash;
+ final AtomicBoolean nodeIsClosing = new AtomicBoolean();
// Baby sits the child process, pulling its stdout and printing to our stdout, calling nodeClosed once it exits:
Thread pumper = ThreadPumper.start(
@@ -669,11 +700,11 @@ public class TestStressNRTReplication extends LuceneTestCase {
}
nodeClosed(id);
}
- }, r, System.out, childLog);
+ }, r, System.out, childLog, nodeIsClosing);
pumper.setName("pump" + id);
message("top: node=" + id + " started at tcpPort=" + tcpPort + " initCommitVersion=" + initCommitVersion + " initInfosVersion=" + initInfosVersion);
- return new NodeProcess(p, id, tcpPort, pumper, isPrimary, initCommitVersion, initInfosVersion);
+ return new NodeProcess(p, id, tcpPort, pumper, isPrimary, initCommitVersion, initInfosVersion, nodeIsClosing);
}
private void nodeClosed(int id) {
@@ -754,7 +785,7 @@ public class TestStressNRTReplication extends LuceneTestCase {
message("top: restarter cycle");
// Randomly crash full cluster:
- if (DO_FULL_CLUSTER_CRASH && random().nextInt(50) == 17) {
+ if (DO_FULL_CLUSTER_CRASH && random().nextInt(500) == 17) {
message("top: full cluster crash");
for(int i=0;i<nodes.length;i++) {
if (starting[i]) {
@@ -954,12 +985,15 @@ public class TestStressNRTReplication extends LuceneTestCase {
continue;
}
+ // nocommit not anymore?
// This can be null if we got the new primary after crash and that primary is still catching up (replaying xlog):
Integer expectedAtLeastHitCount = versionToMarker.get(version);
+ assertNotNull("version=" + version, expectedAtLeastHitCount);
if (expectedAtLeastHitCount != null && expectedAtLeastHitCount > 0 && random().nextInt(10) == 7) {
try {
c.out.writeByte(SimplePrimaryNode.CMD_MARKER_SEARCH);
+ c.out.writeVInt(expectedAtLeastHitCount);
c.flush();
while (c.sockIn.available() == 0) {
if (stop.get()) {
@@ -1064,13 +1098,15 @@ public class TestStressNRTReplication extends LuceneTestCase {
if (random().nextInt(10) == 7) {
// We use the marker docs to check for data loss in search thread:
Document doc = new Document();
- int id = markerUpto.getAndIncrement();
+ int id = markerID.getAndIncrement();
String idString = "m"+id;
doc.add(newStringField("docid", idString, Field.Store.YES));
doc.add(newStringField("marker", "marker", Field.Store.YES));
curPrimary.addOrUpdateDocument(c, doc, false);
transLog.addDocument(idString, doc);
- message("index marker=" + idString + "; translog is " + Node.bytesToString(Files.size(transLogPath)));
+ // Only increment after primary replies:
+ markerUpto.getAndIncrement();
+ //message("index marker=" + idString + "; translog is " + Node.bytesToString(Files.size(transLogPath)));
}
if (docCount > 0 && random().nextDouble() < updatePct) {
@@ -1094,14 +1130,6 @@ public class TestStressNRTReplication extends LuceneTestCase {
((Field) doc.getField("docid")).setStringValue(idString);
curPrimary.addOrUpdateDocument(c, doc, false);
transLog.addDocument(idString, doc);
-
- if (DO_RANDOM_XLOG_REPLAY && random().nextInt(10) == 7) {
- long curLoc = transLog.getNextLocation();
- // randomly replay chunks of translog just to test replay:
- message("now randomly replay translog from " + lastTransLogLoc + " to " + curLoc);
- transLog.replay(curPrimary, lastTransLogLoc, curLoc);
- lastTransLogLoc = curLoc;
- }
}
} catch (IOException se) {
// Assume primary crashed
@@ -1115,12 +1143,13 @@ public class TestStressNRTReplication extends LuceneTestCase {
}
if (random().nextInt(sleepChance) == 0) {
- Thread.sleep(1);
+ Thread.sleep(10);
}
if (random().nextInt(100) == 17) {
- System.out.println("Indexer: now pause for a bit...");
- Thread.sleep(TestUtil.nextInt(random(), 500, 2000));
+ int pauseMS = TestUtil.nextInt(random(), 500, 2000);
+ System.out.println("Indexer: now pause for " + pauseMS + " msec...");
+ Thread.sleep(pauseMS);
System.out.println("Indexer: done pause for a bit...");
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java
index 6ddb777..d74e170 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java
@@ -21,11 +21,12 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.PrintStream;
import java.io.Writer;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
/** A pipe thread. It'd be nice to reuse guava's implementation for this... */
class ThreadPumper {
- public static Thread start(final Runnable onExit, final BufferedReader from, final PrintStream to, final Writer toFile) {
+ public static Thread start(final Runnable onExit, final BufferedReader from, final PrintStream to, final Writer toFile, final AtomicBoolean nodeClosing) {
Thread t = new Thread() {
@Override
public void run() {
@@ -43,6 +44,9 @@ class ThreadPumper {
} else {
TestNRTReplication.message(line);
}
+ if (line.contains("now force close server socket after")) {
+ nodeClosing.set(true);
+ }
}
// Sub-process finished
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8889469b/lucene/replicator/test.cmd
----------------------------------------------------------------------
diff --git a/lucene/replicator/test.cmd b/lucene/replicator/test.cmd
index 14e3bd2..c38fc96 100644
--- a/lucene/replicator/test.cmd
+++ b/lucene/replicator/test.cmd
@@ -1 +1 @@
-python -u /l/util/src/python/repeatLuceneTest.py -tmpDir /b/tmp -logDir /l/logs TestNRTReplication -jvms 1 -mult 4 -nightly
+python -u /l/util/src/python/repeatLuceneTest.py -tmpDir /b/tmp -logDir /l/logs TestStressNRTReplication -jvms 1 -mult 4 -nightly