You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2014/04/08 12:43:02 UTC
svn commit: r1585682 [1/2] - in /lucene/dev/branches/lucene5438/lucene:
core/src/java/org/apache/lucene/index/ core/src/java/org/apache/lucene/store/
replicator/src/java/org/apache/lucene/replicator/
replicator/src/test/org/apache/lucene/replicator/ te...
Author: mikemccand
Date: Tue Apr 8 10:43:01 2014
New Revision: 1585682
URL: http://svn.apache.org/r1585682
Log:
LUCENE-5438: checkpoint current [broken] state
Modified:
lucene/dev/branches/lucene5438/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
lucene/dev/branches/lucene5438/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java
lucene/dev/branches/lucene5438/lucene/replicator/src/java/org/apache/lucene/replicator/SlowChecksumDirectory.java
lucene/dev/branches/lucene5438/lucene/replicator/src/test/org/apache/lucene/replicator/TestNRTReplication.java
lucene/dev/branches/lucene5438/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
Modified: lucene/dev/branches/lucene5438/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5438/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java?rev=1585682&r1=1585681&r2=1585682&view=diff
==============================================================================
--- lucene/dev/branches/lucene5438/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java (original)
+++ lucene/dev/branches/lucene5438/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java Tue Apr 8 10:43:01 2014
@@ -415,6 +415,7 @@ public final class SegmentInfos implemen
private void write(Directory directory) throws IOException {
String segmentFileName = getNextSegmentFileName();
+ assert directory.fileExists(segmentFileName) == false: "segments file " + segmentFileName + " already exists!";
// Always advance the generation on write:
if (generation == -1) {
Modified: lucene/dev/branches/lucene5438/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5438/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java?rev=1585682&r1=1585681&r2=1585682&view=diff
==============================================================================
--- lucene/dev/branches/lucene5438/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java (original)
+++ lucene/dev/branches/lucene5438/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java Tue Apr 8 10:43:01 2014
@@ -22,13 +22,13 @@ import java.io.IOException;
*
* @lucene.internal
*/
-final class RateLimitedIndexOutput extends BufferedIndexOutput {
+public final class RateLimitedIndexOutput extends BufferedIndexOutput {
private final IndexOutput delegate;
private final BufferedIndexOutput bufferedDelegate;
private final RateLimiter rateLimiter;
- RateLimitedIndexOutput(final RateLimiter rateLimiter, final IndexOutput delegate) {
+ public RateLimitedIndexOutput(final RateLimiter rateLimiter, final IndexOutput delegate) {
// TODO should we make buffer size configurable
if (delegate instanceof BufferedIndexOutput) {
bufferedDelegate = (BufferedIndexOutput) delegate;
Modified: lucene/dev/branches/lucene5438/lucene/replicator/src/java/org/apache/lucene/replicator/SlowChecksumDirectory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5438/lucene/replicator/src/java/org/apache/lucene/replicator/SlowChecksumDirectory.java?rev=1585682&r1=1585681&r2=1585682&view=diff
==============================================================================
--- lucene/dev/branches/lucene5438/lucene/replicator/src/java/org/apache/lucene/replicator/SlowChecksumDirectory.java (original)
+++ lucene/dev/branches/lucene5438/lucene/replicator/src/java/org/apache/lucene/replicator/SlowChecksumDirectory.java Tue Apr 8 10:43:01 2014
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -77,8 +78,11 @@ public class SlowChecksumDirectory exten
}
long value = checksum.getValue();
- System.out.println(Thread.currentThread().getName() + " id=" + checksums.id + " " + name + ": record pending checksum=" + value);
+ System.out.println("[" + Thread.currentThread().getName() + "] id=" + checksums.id + " " + name + ": record pending checksum=" + value + " (current checkums=" + getChecksum(name) + ") len=" + in.fileLength(name));
pendingChecksums.put(name, value);
+
+ // In case we overwrote this file:
+ checksums.remove(name);
} finally {
input.close();
}
@@ -97,7 +101,7 @@ public class SlowChecksumDirectory exten
}
public void sync(Collection<String> names) throws IOException {
- System.out.println(Thread.currentThread().getName() + " id=" + checksums.id + " sync " + names);
+ System.out.println("[" + Thread.currentThread().getName() + "] id=" + checksums.id + " sync " + names);
in.sync(names);
for(String name : names) {
Long v = pendingChecksums.get(name);
@@ -116,7 +120,7 @@ public class SlowChecksumDirectory exten
@Override
public void deleteFile(String name) throws IOException {
- System.out.println(Thread.currentThread().getName() + " id=" + checksums.id + " " + name + " now delete");
+ System.out.println("[" + Thread.currentThread().getName() + "] id=" + checksums.id + " " + name + " now delete");
in.deleteFile(name);
pendingChecksums.remove(name);
checksums.remove(name);
@@ -156,6 +160,7 @@ public class SlowChecksumDirectory exten
this.id = id;
this.dir = dir;
long maxGen = -1;
+ Set<String> seen = new HashSet<>();
for (String fileName : dir.listAll()) {
if (fileName.startsWith(FILE_NAME_PREFIX)) {
long gen = Long.parseLong(fileName.substring(1+FILE_NAME_PREFIX.length()),
@@ -163,6 +168,8 @@ public class SlowChecksumDirectory exten
if (gen > maxGen) {
maxGen = gen;
}
+ } else {
+ seen.add(fileName);
}
}
@@ -177,7 +184,15 @@ public class SlowChecksumDirectory exten
for(int i=0;i<count;i++) {
String name = in.readString();
long checksum = in.readLong();
- checksums.put(name, checksum);
+ // Must filter according to what's in the
+ // directory now because we may have deleted
+ // some files but then crashed and then our
+ // checksum state is invalid:
+ if (seen.contains(name)) {
+ checksums.put(name, checksum);
+ } else {
+ System.out.println(Thread.currentThread().getName() + ": id=" + id + " " + name + ": skip this checkum file on init: it does not exist");
+ }
}
nextWriteGen = maxGen+1;
System.out.println(Thread.currentThread().getName() + ": id=" + id + " " + genToFileName(maxGen) + " loaded checksums");
Modified: lucene/dev/branches/lucene5438/lucene/replicator/src/test/org/apache/lucene/replicator/TestNRTReplication.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene5438/lucene/replicator/src/test/org/apache/lucene/replicator/TestNRTReplication.java?rev=1585682&r1=1585681&r2=1585682&view=diff
==============================================================================
--- lucene/dev/branches/lucene5438/lucene/replicator/src/test/org/apache/lucene/replicator/TestNRTReplication.java (original)
+++ lucene/dev/branches/lucene5438/lucene/replicator/src/test/org/apache/lucene/replicator/TestNRTReplication.java Tue Apr 8 10:43:01 2014
@@ -19,7 +19,9 @@ package org.apache.lucene.replicator;
import java.io.Closeable;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -29,9 +31,15 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.Adler32;
@@ -79,6 +87,8 @@ import org.apache.lucene.store.MockDirec
import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.store.RAMOutputStream;
+import org.apache.lucene.store.RateLimitedIndexOutput;
+import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.LineFileDocs;
@@ -89,8 +99,25 @@ import org.apache.lucene.util.TreeLogger
import org.apache.lucene.util._TestUtil;
import org.junit.AfterClass;
+// nocommit add SLM
+
+// nocommit also allow downgrade of Master to Replica,
+// instead of Master.close then Replica init
+
+// nocommit make sure we are not over-IncRef'ing infos on master
+
+// nocommit test replicas that are slow to copy
+
// nocommit add fang: master crashes
+// nocommit provoke more "going backwards", e.g. randomly
+// sometimes shutdown whole cluster and pick random node to
+// be the new master
+
+// nocommit should we have support for "flush as frequently
+// as you can"? or at least, "do not flush so frequently
+// that replicas can't finish copying before next flush"?
+
// nocommit what about network partitioning
// nocommit make MDW throw exceptions sometimes
@@ -99,36 +126,32 @@ import org.junit.AfterClass;
// nocommit test rare bit errors during copy
-// nocommit: master's IW must not do reader pooling? else
-// deletes are not pushed to disk?
-
// nocommit test slow replica
-// nocommit make flush sync async
-
// nocommit test replica that "falls out" because it's too
// slow and then tries to join back again w/o having
// "properly" restarted
-// nocommit also test sometimes starting up new master from
-// a down replica (ie, not just promoting an already running
-// replica)
-
// nocommit rewrite the test so each node has its own
// threads, to be closer to the concurrency we'd "really"
// see across N machines
-// nocommit checksum_N files never delete!
+// nocommit also allow replicas pulling files from replicas;
+// they need not always come from master
@SuppressCodecs({ "SimpleText", "Memory", "Direct" })
public class TestNRTReplication extends LuceneTestCase {
static volatile Master master;
- static ReentrantLock masterLock = new ReentrantLock();
+ static final AtomicInteger masterCount = new AtomicInteger();
+ static final Lock masterLock = new ReentrantLock();
+ static Object[] nodes;
@AfterClass
public static void afterClass() {
+ System.out.println("TEST: now afterClass");
master = null;
+ nodes = null;
}
public void test() throws Throwable {
@@ -141,90 +164,9 @@ public class TestNRTReplication extends
}
}
- /*
- private static Map<String,Long> globalState = new HashMap<>();
-
- private static void setGlobalStateKeyVal(String key, long value) {
- Long cur = globalState.get(key);
- assert cur == null || cur <= value;
- TreeLogger.log(" push " + key + " cur=" + cur + " new=" + value);
- globalState.put(key, value);
- }
- */
-
- /** Called just before IndexWriter finishCommit on the
- * current master, to push "next write gens" to global
- * state. */
- /*
- static void pushGlobalState(SegmentInfos infos) {
- TreeLogger.log("TEST: now pushGlobalState");
- TreeLogger.start("pushGlobalState");
- // NOTE: assumed externally sync'd, i.e. only one master
- // across the cluster at a time
- setGlobalStateKeyVal("segmentsGen", infos.getGeneration());
- // nocommit weird that we must add 2 :)
- setGlobalStateKeyVal("segmentsVersion", infos.getVersion()+2);
-
- // Used to generate next segment file name:
- setGlobalStateKeyVal("segmentsCounter", (long) infos.counter);
-
- for(SegmentCommitInfo info : infos) {
- setGlobalStateKeyVal(info.info.name + "_delGen", info.getNextDelGen());
- setGlobalStateKeyVal(info.info.name + "_fieldInfosGen", info.getNextFieldInfosGen());
- }
- TreeLogger.end("pushGlobalState");
- }
- */
-
- /** Called just before init of a new writer, to pull the
- * "next write gens" and set them in the current infos. */
- /*
- static void pullGlobalState(SegmentInfos infos) {
- TreeLogger.log("TEST: now pullGlobalState");
- TreeLogger.start("pullGlobalState");
- Long v = globalState.get("segmentsGen");
- if (v == null) {
- TreeLogger.log("no global state yet; skip");
- TreeLogger.end("pullGlobalState");
- return;
- }
- TreeLogger.log("pull global gen=" + v + " vs cur=" + infos.getGeneration());
- assert infos.getGeneration() <= v.longValue(): "infos.generation=" + infos.getGeneration() + " global.generation=" + v;
- infos.setGeneration(v.longValue());
-
- v = globalState.get("segmentsVersion");
- assert v != null;
- assert infos.version <= v.longValue(): "infos.version=" + infos.version + " global.version=" + v;
- TreeLogger.log("pull global version=" + v + " vs cur=" + infos.version);
- infos.version = v.longValue();
-
- v = globalState.get("segmentsCounter");
- assert v != null;
- assert infos.counter <= v.longValue(): "infos.counter=" + infos.counter + " global.counter=" + v;
- TreeLogger.log("pull global counter=" + v + " vs cur=" + infos.counter);
- infos.counter = v.intValue();
-
- for(SegmentCommitInfo info : infos) {
- String key = info.info.name + "_delGen";
- v = globalState.get(key);
- long value = v == null ? 1 : v.longValue();
- assert info.getNextDelGen() <= value: "seg=" + info.info.name + " delGen=" + info.getNextDelGen() + " vs global=" + value;
- TreeLogger.log("pull global del gen=" + v + " for seg=" + info.info.name + " vs cur=" + info.getNextDelGen());
- info.setNextDelGen(value);
-
- key = info.info.name + "_fieldInfosGen";
- v = globalState.get(key);
- value = v == null ? 1 : v.longValue();
- assert info.getNextFieldInfosGen() <= value: "seg=" + info.info.name + " fieldInfosGen=" + info.getNextFieldInfosGen() + " vs global=" + value;
- TreeLogger.log("pull global fieldInfos gen= " + v + " for seg=" + info.info.name + " vs cur=" + info.getNextFieldInfosGen());
- info.setNextFieldInfosGen(value);
- }
- TreeLogger.end("pullGlobalState");
- }
- */
-
private void _test() throws Exception {
+ Thread.currentThread().setName("main");
TreeLogger.setLogger(new TreeLogger("main"));
// Maps all segmentInfos.getVersion() we've seen, to the
@@ -232,18 +174,20 @@ public class TestNRTReplication extends
// *:* search we verify the totalHits is correct:
final Map<Long,Integer> versionDocCounts = new ConcurrentHashMap<Long,Integer>();
- int numDirs = 1+_TestUtil.nextInt(random(), 2, 4);
+ int numDirs = 1+_TestUtil.nextInt(random(), 2, 6);
// nocommit
- numDirs = 6;
+ //int numDirs = 2;
+
final File[] dirs = new File[numDirs];
- // One Master and N-1 Replica:
- final Object[] nodes = new Object[numDirs];
+
+ // One Master (initially node 0) and N-1 Replica:
+ nodes = new Object[numDirs];
System.out.println("TEST: " + nodes.length + " nodes");
for(int i=0;i<numDirs;i++) {
- dirs[i] = _TestUtil.getTempDir("NRTReplication");
+ dirs[i] = _TestUtil.getTempDir("NRTReplication." + i + "_");
if (i > 0) {
+ // Some replicas don't start on init:
if (random().nextInt(10) < 7) {
- // Some replicas don't start on init:
nodes[i] = new Replica(dirs[i], i, versionDocCounts, null);
} else {
System.out.println("TEST: skip replica " + i + " startup");
@@ -251,12 +195,7 @@ public class TestNRTReplication extends
}
}
- nodes[0] = master = new Master(dirs[0], 0, nodes, versionDocCounts);
-
- // Periodically stops/starts/commits replicas, moves master:
- CommitThread commitThread = new CommitThread(dirs, nodes, versionDocCounts);
- commitThread.setName("commitThread");
- commitThread.start();
+ nodes[0] = master = new Master(dirs[0], 0, versionDocCounts);
// nocommit test graceful full shutdown / restart
@@ -266,7 +205,7 @@ public class TestNRTReplication extends
//long endTime = System.currentTimeMillis() + 10000;
while (System.currentTimeMillis() < endTime) {
- Thread.sleep(_TestUtil.nextInt(random(), 2, 20));
+ Thread.sleep(_TestUtil.nextInt(random(), 2, 50));
assert master != null;
@@ -274,175 +213,192 @@ public class TestNRTReplication extends
TreeLogger.start("replicate");
SegmentInfos infos;
- boolean closeMaster = false;
- if (master == null) {
- infos = null;
+ boolean closeMaster = random().nextInt(100) == 57;
+ if (closeMaster) {
+ TreeLogger.log("top: id=" + master.id + " now move master");
+ // Commits & closes current master and pull the
+ // infos of the final commit:
+ master.close(random().nextBoolean());
+ TreeLogger.log("top: done shutdown master");
} else {
- if (random().nextInt(100) == 57) {
- closeMaster = true;
- TreeLogger.log("top: id=" + master.id + " now move master");
- // Commits & closes current master and pull the
- // infos of the final commit:
- masterLock.lock();
- try {
- infos = master.close(random().nextBoolean());
- } finally {
- masterLock.unlock();
- }
- TreeLogger.log("top: done shutdown master; version=" + infos.version);
- } else {
- // Have writer do a full flush, and return the
- // resulting segments, protected from deletion
- // (incRef'd) just while we copy the files out to
- // the replica (s). This is just like pulling an
- // NRT reader, except we don't actually open the
- // readers on the newly flushed segments:
- TreeLogger.log("flush current master");
- infos = master.writer.w.flushAndIncRef();
- }
- }
+ // Have writer do a full flush, and return the
+ // resulting segments, protected from deletion
+ // (incRef'd) just while we copy the files out to
+ // the replica (s). This is just like pulling an
+ // NRT reader, except we don't actually open the
+ // readers on the newly flushed segments:
+ TreeLogger.log("flush current master");
+ master.flush();
+ TreeLogger.log("done flush current master");
+ }
+
+ CopyState copyState = master.getCopyState();
+
+ // nocommit also allow downgrade Master -> Replica,
+ // NOT a full close
+
+ int count = docCount(copyState.infos);
+ TreeLogger.log("record version=" + copyState.version + " count=" + count + " segs=" + copyState.infos.toString(copyState.dir));
+ Integer oldCount = versionDocCounts.put(copyState.version, count);
- if (infos != null) {
+ // Refresh the local searcher on master:
+ if (closeMaster == false) {
+ master.mgr.setCurrentInfos(copyState.infos);
+ master.mgr.maybeRefresh();
+ }
- int count = docCount(infos);
- TreeLogger.log("record version=" + infos.version + " count=" + count + " segs=" + infos.toString(master.dir));
- Integer oldCount = versionDocCounts.put(infos.version, count);
-
- // nocommit cannot do this: versions can go backwards
- //if (oldCount != null) {
- //assertEquals("version=" + infos.version + " oldCount=" + oldCount + " newCount=" + count, oldCount.intValue(), count);
- //}
-
- // nocommit need to do this concurrently w/ pushing
- // out to replicas:
- if (closeMaster == false) {
- master.mgr.refresh(infos);
- }
-
- // nocommit can we have commit commit the "older" SIS?
- // they will commit quickly since the OS will have
- // already moved those bytes to disk...
-
- int totDocCount = docCount(infos);
- String extra = " master.sizeInBytes=" + ((NRTCachingDirectory) master.dir.getDelegate()).sizeInBytes();
-
- TreeLogger.log("replicate docCount=" + totDocCount + " version=" + infos.version + extra + " segments=" + infos.toString(master.dir));
-
- // Convert infos to byte[], to send "on the wire":
- RAMOutputStream out = new RAMOutputStream();
- infos.write(out);
- byte[] infosBytes = new byte[(int) out.getFilePointer()];
- out.writeTo(infosBytes, 0);
-
- // nocommit test master crash (promoting replica to master)
-
- // nocommit do this sync in separate threads
- Map<String,FileMetaData> filesMetaData = getFilesMetaData(master, infos.files(master.dir, false));
-
- // nocommit simulate super-slow replica: it should not
- // hold up the copying of other replicas, nor new
- // flushing; the copy of a given SIS to a given
- // replica should be fully concurrent/bg
- int upto = 0;
- for(Object n : nodes) {
- if (n != null && n instanceof Replica) {
- Replica r = (Replica) n;
+ // nocommit break this into separate tests, so we can
+ // test the "clean" case where versions are "correct":
- // nocommit improve this: load each file ONCE,
- // push to the N replicas that need it
- try {
- r.sync(master.dir, filesMetaData, infosBytes, infos.version);
- } catch (AlreadyClosedException ace) {
- // Ignore this: it means the replica shut down
- // while we were trying to sync. This
- // "approximates" an exception the master would
- // see trying to push file bytes to a replica
- // that was just taken offline.
- } catch (Exception e) {
- TreeLogger.log("TEST FAIL: replica " + r.id, e);
- throw e;
- }
- } else if (n == null) {
- TreeLogger.log("id=" + upto + " skip down replica");
- }
- upto++;
+ // nocommit cannot do this: versions can go backwards
+ //if (oldCount != null) {
+ //assertEquals("version=" + infos.version + " oldCount=" + oldCount + " newCount=" + count, oldCount.intValue(), count);
+ //}
+
+ // nocommit can we have commit commit the "older" SIS?
+ // they will commit quickly since the OS will have
+ // already moved those bytes to disk...
+
+ String extra = " master.sizeInBytes=" + ((NRTCachingDirectory) master.dir.getDelegate()).sizeInBytes();
+
+ TreeLogger.log("replicate docCount=" + count + " version=" + copyState.version + extra + " segments=" + copyState.infos.toString(copyState.dir));
+
+ // nocommit test master crash (promoting replica to master)
+
+ // nocommit simulate super-slow replica: it should not
+ // hold up the copying of other replicas, nor new
+ // flushing; the copy of a given SIS to a given
+ // replica should be fully concurrent/bg
+
+ // Notify all running replicas that they should now
+ // pull the new flush over:
+ int upto = 0;
+ for(Object n : nodes) {
+ if (n != null && n instanceof Replica) {
+ Replica r = (Replica) n;
+ // nocommit can we "broadcast" the new files
+ // instead of each replica pulling its own copy
+ // ...
+ TreeLogger.log("id=" + upto + ": signal new flush");
+ r.newFlush();
+ } else if (n == null) {
+ TreeLogger.log("id=" + upto + " skip down replica");
}
+ upto++;
}
- if (closeMaster == false) {
- // Done pushing to all replicas so we now release
- // the files on master, so IW is free to delete if it
- // needs to:
- master.setInfos(infos);
- } else {
+ master.releaseCopyState(copyState);
- // clean this up:
- TreeLogger.log("close old master dir dir.listAll()=" + Arrays.toString(master.dir.listAll()));
+ if (closeMaster) {
+ if (random().nextBoolean()) {
+ TreeLogger.log("top: id=" + master.id + " now waitIdle");
+ master.waitIdle();
+ TreeLogger.log("top: id=" + master.id + " done waitIdle");
+ } else {
+ TreeLogger.log("top: id=" + master.id + " skip waitIdle");
+ Thread.sleep(random().nextInt(5));
+ }
+
+ TreeLogger.log("top: id=" + master.id + " close old master dir dir.listAll()=" + Arrays.toString(master.dir.listAll()));
master.dir.close();
masterLock.lock();
try {
+ masterCount.incrementAndGet();
+ } finally {
+ masterLock.unlock();
+ }
- nodes[master.id] = null;
-
- // nocommit go back to picking random replica
+ nodes[master.id] = null;
- // Must pick newest replica to promote, else we
- // can't overwrite open files when trying to copy
- // to the newer replicas:
- int bestIDX = -1;
- long highestVersion = -1;
- for(int idx=0;idx<nodes.length;idx++) {
- if (nodes[idx] instanceof Replica) {
- Replica r = (Replica) nodes[idx];
- long version = r.mgr.getCurrentInfos().version;
- if (version > highestVersion) {
- bestIDX = idx;
- highestVersion = version;
- }
+ // nocommit make sure we test race here, where
+ // replica is coming up just as we are electing a
+ // new master
+
+ // Must pick newest replica to promote, else we
+ // can't overwrite open files when trying to copy
+ // to the newer replicas:
+ int bestIDX = -1;
+ long highestVersion = -1;
+ for (int idx=0;idx<nodes.length;idx++) {
+ if (nodes[idx] instanceof Replica) {
+ Replica r = (Replica) nodes[idx];
+ long version = r.mgr.getCurrentInfos().version;
+ TreeLogger.log("top: id=" + r.id + " check version=" + version);
+ if (version > highestVersion) {
+ bestIDX = idx;
+ highestVersion = version;
+ TreeLogger.log("top: id=" + r.id + " check version=" + version + " max so far");
}
}
+ }
- int idx;
- if (bestIDX != -1) {
- idx = bestIDX;
- } else {
- // All replicas are down; it doesn't matter
- // which one we pick
- idx = random().nextInt(nodes.length);
- }
+ int idx;
+ if (bestIDX != -1) {
+ idx = bestIDX;
+ } else {
+ // All replicas are down; it doesn't matter
+ // which one we pick
+ idx = random().nextInt(nodes.length);
+ }
- if (nodes[idx] == null) {
- // Start up Master from scratch:
- TreeLogger.log("top: id=" + idx + " promote down node to master");
- nodes[idx] = master = new Master(dirs[idx], idx, nodes, versionDocCounts);
- } else {
- // Promote a running replica to Master:
- assert nodes[idx] instanceof Replica;
- TreeLogger.log("top: id=" + idx + " promote replica to master");
- master = new Master((Replica) nodes[idx], nodes);
- nodes[idx] = master;
- }
- } finally {
- masterLock.unlock();
+ if (nodes[idx] == null) {
+ // Start up Master from scratch:
+ TreeLogger.log("top: id=" + idx + " promote down node to master");
+ nodes[idx] = master = new Master(dirs[idx], idx, versionDocCounts);
+ } else {
+ // Promote a running replica to Master:
+ assert nodes[idx] instanceof Replica;
+ TreeLogger.log("top: id=" + idx + " promote replica to master");
+ master = new Master((Replica) nodes[idx]);
+ nodes[idx] = master;
+ }
+ } else {
+ if (random().nextInt(100) == 17) {
+ TreeLogger.log("top: id=" + master.id + " commit master");
+ master.commit();
}
}
- TreeLogger.end("replicate");
- }
- System.out.println("TEST: stop commit thread");
- commitThread.finish();
+ // Maybe restart a down replica, or commit / shutdown
+ // / crash one:
+ for(int i=0;i<nodes.length;i++) {
+ if (nodes[i] == null && random().nextInt(20) == 17) {
+ // Restart this replica
+ try {
+ nodes[i] = new Replica(dirs[i], i, versionDocCounts, null);
+ } catch (Throwable t) {
+ TreeLogger.log("top: id=" + i + " FAIL startup", t);
+ throw t;
+ }
+ } else {
+ if (nodes[i] instanceof Replica) {
+ Replica r = (Replica) nodes[i];
- if (master != null) {
- System.out.println("TEST: close master");
- masterLock.lock();
- try {
- master.close(random().nextBoolean());
- master.dir.close();
- } finally {
- masterLock.unlock();
+ if (random().nextInt(100) == 17) {
+ TreeLogger.log("top: id=" + i + " commit replica");
+ r.commit(false);
+ }
+
+ if (random().nextInt(100) == 17) {
+ // Now shutdown this replica
+ TreeLogger.log("top: id=" + i + " shutdown replica");
+ r.shutdown();
+ nodes[i] = null;
+ break;
+ } else if (random().nextInt(100) == 17) {
+ // Now crash the replica
+ TreeLogger.log("top: id=" + i + " crash replica");
+ r.crash();
+ nodes[i] = null;
+ break;
+ }
+ }
+ }
}
+
+ TreeLogger.end("replicate");
}
System.out.println("TEST: close replicas");
@@ -451,14 +407,20 @@ public class TestNRTReplication extends
((Replica) n).shutdown();
}
}
+
+ if (master != null) {
+ System.out.println("TEST: close master");
+ master.close(random().nextBoolean());
+ master.dir.close();
+ }
}
static class FileMetaData {
- public final long sizeInBytes;
+ public final long length;
public final long checksum;
- public FileMetaData(long sizeInBytes, long checksum) {
- this.sizeInBytes = sizeInBytes;
+ public FileMetaData(long length, long checksum) {
+ this.length = length;
this.checksum = checksum;
}
}
@@ -466,47 +428,14 @@ public class TestNRTReplication extends
static Map<String,FileMetaData> getFilesMetaData(Master master, Collection<String> files) throws IOException {
Map<String,FileMetaData> filesMetaData = new HashMap<String,FileMetaData>();
for(String file : files) {
- Long checksum = master.dir.getChecksum(file);
- assert checksum != null;
- filesMetaData.put(file, new FileMetaData(master.dir.fileLength(file), checksum.longValue()));
+ filesMetaData.put(file, new FileMetaData(master.dir.fileLength(file), master.dir.getChecksum(file)));
}
return filesMetaData;
}
- static Set<String> copyFiles(SlowChecksumDirectory src, Replica dst, Map<String,FileMetaData> filesMetaData, boolean lowPriority) throws IOException {
+ static Map<String,FileMetaData> copyFiles(SlowChecksumDirectory src, Replica dst, Map<String,FileMetaData> files, boolean lowPriority, long totBytes) throws IOException {
long t0 = System.currentTimeMillis();
- long totBytes = 0;
- Set<String> toCopy = new HashSet<String>();
- for(Map.Entry<String,FileMetaData> ent : filesMetaData.entrySet()) {
- String fileName = ent.getKey();
- // nocommit remove now unused metaData.checksum
- FileMetaData metaData = ent.getValue();
- Long srcChecksum0 = src.getChecksum(fileName);
- assert srcChecksum0 != null: "id=" + dst.id + " name=" + fileName;
- long srcChecksum = srcChecksum0.longValue();
-
- Long checksum = dst.dir.getChecksum(fileName);
- if (dst.dir.fileExists(fileName) == false) {
- TreeLogger.log("id=" + dst.id + " " + fileName + " will copy [does not exist] length=" + metaData.sizeInBytes + " srcChecksum=" + srcChecksum);
- toCopy.add(fileName);
- totBytes += metaData.sizeInBytes;
- } else if (dst.dir.fileLength(fileName) != metaData.sizeInBytes) {
- TreeLogger.log("id=" + dst.id + " " + fileName + " will copy [different file length old=" + dst.dir.fileLength(fileName) + " new=" + metaData.sizeInBytes + "]");
- toCopy.add(fileName);
- totBytes += metaData.sizeInBytes;
- } else if (checksum == null) {
- TreeLogger.log("id=" + dst.id + " " + fileName + " will copy [no checksum] length=" + metaData.sizeInBytes);
- toCopy.add(fileName);
- totBytes += metaData.sizeInBytes;
- } else if (checksum.longValue() != srcChecksum) {
- TreeLogger.log("id=" + dst.id + " " + fileName + " will copy [wrong checksum: cur=" + checksum.longValue() + " new=" + srcChecksum + "] length=" + metaData.sizeInBytes);
- toCopy.add(fileName);
- totBytes += metaData.sizeInBytes;
- } else {
- TreeLogger.log("id=" + dst.id + " " + fileName + " skip copy checksum=" + srcChecksum + " file.length=" + metaData.sizeInBytes);
- }
- }
// nocommit should we "organize" the files to be copied
// by segment name? so that NRTCachingDir can
@@ -520,47 +449,32 @@ public class TestNRTReplication extends
// nocommit can we get numDocs?
ioContext = new IOContext(new FlushInfo(0, totBytes));
}
- for(String f : toCopy) {
- long bytes = src.fileLength(f);
- //System.out.println(" copy " + f + " (" + bytes + " bytes)");
- totBytes += bytes;
- TreeLogger.log("id=" + dst.id + " " + f + " copy file");
-
- dst.copyOneFile(dst.id, src, f, ioContext);
- // nocommit make test that exercises this
- // Make sure no bits flipped during copy
- Long v1 = dst.dir.getChecksum(f);
- assert v1 != null;
+ CopyResult copyResult = dst.fileCopier.add(src, files, lowPriority, ioContext);
- Long v2 = src.getChecksum(f);
- assert v2 != null;
-
- if (v1.longValue() != v2.longValue()) {
- throw new IOException("id=" + dst.id + " " + f + ": copy failed: wrong checksums src=" + v2 + " vs dst=" + v1);
- }
+ try {
+ copyResult.done.await();
+ } catch (InterruptedException ie) {
+ // nocommit
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(ie);
+ }
- if (lowPriority) {
- // Rate limit low priority (copying a merged segment):
- // nocommit use rate limiter
- try {
- Thread.sleep(bytes/100000);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(ie);
- }
- }
+ if (copyResult.failed.get()) {
+ TreeLogger.log("replica " + dst.id + ": " + (lowPriority ? "low-priority " : "") + " failed to copy some files");
+ return null;
}
+
long t1 = System.currentTimeMillis();
- TreeLogger.log("replica " + dst.id + ": " + (lowPriority ? "low-priority " : "") + "took " + (t1-t0) + " millis for " + totBytes + " bytes; " + toCopy.size() + " files (of " + filesMetaData.size() + "); sizeInBytes=" + ((NRTCachingDirectory) dst.dir.getDelegate()).sizeInBytes());
+ TreeLogger.log("replica " + dst.id + ": " + (lowPriority ? "low-priority " : "") + "took " + (t1-t0) + " millis for " + totBytes + " bytes; " + files.size() + " files; nrtDir.sizeInBytes=" + ((NRTCachingDirectory) dst.dir.getDelegate()).sizeInBytes());
- return toCopy;
+ return files;
}
- /** Like SearcherManager, except it refreshes via a
- * provided (NRT) SegmentInfos. */
+ /** Like SearcherManager, except it refreshes via an
+ * externally provided (NRT) SegmentInfos. */
private static class InfosSearcherManager extends ReferenceManager<IndexSearcher> {
- private SegmentInfos currentInfos;
+ private volatile SegmentInfos currentInfos;
private final Directory dir;
public InfosSearcherManager(Directory dir, int id, SegmentInfos infosIn) throws IOException {
@@ -598,17 +512,19 @@ public class TestNRTReplication extends
public SegmentInfos getCurrentInfos() {
return currentInfos;
}
-
- public void refresh(SegmentInfos infos) throws IOException {
+
+ public void setCurrentInfos(SegmentInfos infos) {
if (currentInfos != null) {
// So that if we commit, we will go to the next
// (unwritten so far) generation:
infos.updateGeneration(currentInfos);
+ TreeLogger.log("mgr.setCurrentInfos: carry over infos gen=" + infos.getSegmentsFileName());
}
currentInfos = infos;
- maybeRefresh();
}
+ // nocommit who manages the "ref" for currentInfos?
+
@Override
protected IndexSearcher refreshIfNeeded(IndexSearcher old) throws IOException {
List<AtomicReader> subs;
@@ -625,75 +541,19 @@ public class TestNRTReplication extends
}
}
- static class CommitThread extends Thread {
- private final File[] dirs;
- private final Object[] nodes;
- private final Map<Long,Integer> versionDocCounts;
- private volatile boolean stop;
-
- public CommitThread(File[] dirs, Object[] nodes, Map<Long,Integer> versionDocCounts) {
- this.dirs = dirs;
- this.nodes = nodes;
- this.versionDocCounts = versionDocCounts;
- }
+ private static class CopyState {
+ public final SlowChecksumDirectory dir;
+ public final Map<String,FileMetaData> files;
+ public final long version;
+ public final byte[] infosBytes;
+ public final SegmentInfos infos;
- @Override
- public void run() {
- TreeLogger.setLogger(new TreeLogger("commit"));
- try {
- while (stop == false) {
- Thread.sleep(_TestUtil.nextInt(random(), 10, 30));
- int i = random().nextInt(nodes.length);
-
- masterLock.lock();
- try {
- Object n = nodes[i];
- if (n != null) {
- if (n instanceof Replica) {
- Replica r = (Replica) n;
- if (random().nextInt(100) == 17) {
- TreeLogger.log("top: id=" + i + " commit");
- r.commit(false);
- }
- if (random().nextInt(100) == 17) {
- // Shutdown this replica
- nodes[i] = null;
- TreeLogger.log("top: id=" + i + " shutdown replica");
- r.shutdown();
- } else if (random().nextInt(100) == 17) {
- // Crash the replica
- nodes[i] = null;
- TreeLogger.log("top: id=" + i + " crash replica");
- r.crash();
- }
- } else if (master != null && master.isClosed() == false) {
- // Randomly commit master:
- if (random().nextInt(100) == 17) {
- TreeLogger.log("top: id=" + i + " commit master");
- master.commit();
- }
- }
- } else if (random().nextInt(20) == 17) {
- // Restart this replica
- try {
- nodes[i] = new Replica(dirs[i], i, versionDocCounts, null);
- } catch (Throwable t) {
- TreeLogger.log("top: id=" + i + " FAIL startup", t);
- throw t;
- }
- }
- } finally {
- masterLock.unlock();
- }
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public void finish() throws InterruptedException {
- stop = true;
- join();
+ public CopyState(SlowChecksumDirectory dir, Map<String,FileMetaData> files, long version, byte[] infosBytes, SegmentInfos infos) {
+ this.dir = dir;
+ this.files = files;
+ this.version = version;
+ this.infosBytes = infosBytes;
+ this.infos = infos;
}
}
@@ -703,31 +563,34 @@ public class TestNRTReplication extends
final Set<String> finishedMergedSegments = Collections.newSetFromMap(new ConcurrentHashMap<String,Boolean>());
final SlowChecksumDirectory dir;
- final Object[] nodes;
final RandomIndexWriter writer;
final InfosSearcherManager mgr;
final SearchThread searchThread;
final IndexThread indexThread;
final int id;
- private boolean isClosed;
+ private volatile boolean isClosed;
+ private AtomicInteger infosRefCount = new AtomicInteger();
SegmentInfos lastInfos;
/** Start up a master from scratch. */
public Master(File path,
- int id, Object[] nodes, Map<Long,Integer> versionDocCounts) throws IOException {
+ int id, Map<Long,Integer> versionDocCounts) throws IOException {
final MockDirectoryWrapper dirOrig = newMockFSDirectory(path);
// In some legitimate cases we will double-write:
dirOrig.setPreventDoubleWrite(false);
this.id = id;
- this.nodes = nodes;
// nocommit put back
dirOrig.setCheckIndexOnClose(false);
+ // Master may legitimately close while replicas are
+ // still copying from it:
+ dirOrig.setAllowCloseWithOpenFiles(true);
+
dir = new SlowChecksumDirectory(id, new NRTCachingDirectory(dirOrig, 1.0, 10.0));
//((NRTCachingDirectory) master).VERBOSE = true;
SegmentInfos infos = new SegmentInfos();
@@ -736,8 +599,6 @@ public class TestNRTReplication extends
} catch (IndexNotFoundException infe) {
}
- //pullGlobalState(infos);
-
mgr = new InfosSearcherManager(dir, id, infos);
searchThread = new SearchThread("master", mgr, versionDocCounts);
searchThread.start();
@@ -758,6 +619,7 @@ public class TestNRTReplication extends
// nocommit thread hazard here? IW could have already
// nuked some segments...?
writer.w.incRefDeleter(lastInfos);
+ setCopyState();
indexThread = new IndexThread(this);
indexThread.start();
@@ -765,7 +627,6 @@ public class TestNRTReplication extends
public void commit() throws IOException {
writer.w.prepareCommit();
- //pushGlobalState(writer.w.getPendingCommit());
// It's harmless if we crash here, because the
// global state has already been updated
writer.w.commit();
@@ -774,13 +635,25 @@ public class TestNRTReplication extends
/** Promotes an existing Replica to Master, re-using the
* open NRTCachingDir, the SearcherManager, the search
* thread, etc. */
- public Master(Replica replica, Object[] nodes) throws IOException {
+ public Master(Replica replica) throws IOException {
this.id = replica.id;
this.dir = replica.dir;
- this.nodes = nodes;
this.mgr = replica.mgr;
this.searchThread = replica.searchThread;
+ // Master may legitimately close while replicas are
+ // still copying from it:
+ ((MockDirectoryWrapper) ((NRTCachingDirectory) dir.getDelegate()).getDelegate()).setAllowCloseWithOpenFiles(true);
+
+ // Do not copy from ourself:
+ try {
+ replica.copyThread.finish();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(ie);
+ }
+ replica.fileCopier.close();
+
// nocommit must somehow "stop" this replica? e.g. we
// don't want it doing any more deleting?
@@ -790,15 +663,19 @@ public class TestNRTReplication extends
// nocommit also test periodically committing, and
// preserving multiple commit points; verify these
// "survive" over to the replica
+ SegmentInfos infos = replica.mgr.getCurrentInfos().clone();
+ TreeLogger.log("top: id=" + replica.id + " sis version=" + infos.version);
- SegmentInfos curInfos = replica.mgr.getCurrentInfos().clone();
- //pullGlobalState(curInfos);
-
- writer = new RandomIndexWriter(random(), dir, curInfos, iwc);
+ writer = new RandomIndexWriter(random(), dir, replica.mgr.getCurrentInfos().clone(), iwc);
_TestUtil.reduceOpenFiles(writer.w);
System.out.println("after reduce: " + writer.w.getConfig());
lastInfos = mgr.getCurrentInfos();
+
+ writer.w.incRefDeleter(lastInfos);
+
+ setCopyState();
+
// nocommit thread hazard here? IW could have already
// nuked some segments...?
writer.w.incRefDeleter(lastInfos);
@@ -823,12 +700,40 @@ public class TestNRTReplication extends
TreeLogger.setLogger(new TreeLogger("merge"));
}
//System.out.println("TEST: warm merged segment files " + info);
- Map<String,FileMetaData> filesMetaData = getFilesMetaData(Master.this, info.files());
+ Map<String,FileMetaData> toCopy = getFilesMetaData(Master.this, info.files());
+ long totBytes = 0;
+ for(FileMetaData metaData : toCopy.values()) {
+ totBytes += metaData.length;
+ }
+ TreeLogger.log("warm files=" + toCopy.keySet() + " totBytes=" + totBytes);
+ TreeLogger.start("warmMerge");
+
+ IOContext context = new IOContext(new MergeInfo(0, totBytes, false, 0));
+
+ List<CopyResult> copyResults = new ArrayList<>();
for(Object n : nodes) {
if (n != null && n instanceof Replica) {
+ Replica r = (Replica) n;
+
try {
- // nocommit do we need to check for merge aborted...?
- ((Replica) n).warmMerge(info.info.name, dir, filesMetaData);
+
+ // nocommit we could also have replica pre-warm a SegmentReader
+ // here, and add it onto subReader list
+ // for next reopen ...?
+
+ // Must call filter, in case we are
+ // overwriting files and must invalidate
+ // the last commit:
+ int sizeBefore = toCopy.size();
+ Map<String,FileMetaData> toCopy2 = r.filterFilesToCopy(toCopy);
+
+ // Since this is a newly merged segment,
+ // all files should be new and need
+ // copying:
+ assert toCopy.size() == sizeBefore: "before=" + toCopy.keySet() + " after=" + toCopy2.keySet();
+
+ TreeLogger.log("copy to replica " + r.id);
+ copyResults.add(r.copyMergedFiles(Master.this.dir, toCopy2, context));
} catch (AlreadyClosedException ace) {
// Ignore this: it means the replica shut down
// while we were trying to copy files. This
@@ -838,15 +743,32 @@ public class TestNRTReplication extends
}
}
}
+
+ TreeLogger.log("now wait for " + copyResults.size() + " replicas to finish copying");
+ for(CopyResult result : copyResults) {
+ try {
+ result.done.await();
+ } catch (InterruptedException ie) {
+ // nocommit
+ }
+
+ // nocommit if there's an error ... what to
+ // do?
+
+ // nocommit we should check merge.abort
+ // somehow in here, so if the master is in
+ // a hurry to shutdown, we respect that
+ }
+ TreeLogger.log("done warm merge for " + copyResults.size() + " replicas");
+ TreeLogger.end("warmMerge");
}
});
return iwc;
}
- /** Gracefully shuts down the master, and returns the
- * final segments in the index .*/
- public SegmentInfos close(boolean waitForMerges) throws IOException {
+ /** Gracefully shuts down the master */
+ public void close(boolean waitForMerges) throws IOException {
TreeLogger.log("id=" + id + " close master waitForMerges=" + waitForMerges);
try {
@@ -856,8 +778,6 @@ public class TestNRTReplication extends
throw new RuntimeException(ie);
}
- mgr.close();
-
if (waitForMerges) {
// Do it here, instead of on close, so we can
// continue indexing while waiting for merges:
@@ -865,10 +785,7 @@ public class TestNRTReplication extends
TreeLogger.log("waitForMerges done");
}
- if (lastInfos != null) {
- writer.w.decRefDeleter(lastInfos);
- lastInfos = null;
- }
+ mgr.close();
try {
indexThread.finish();
@@ -884,39 +801,100 @@ public class TestNRTReplication extends
commit();
- // Don't wait for merges now; we already did above:
- writer.close(false);
- TreeLogger.log("done close writer");
+ synchronized(this) {
+ isClosed = true;
+ if (lastInfos != null) {
+ writer.w.decRefDeleter(lastInfos);
+ lastInfos = null;
+ copyState = null;
+ }
- SegmentInfos infos = new SegmentInfos();
- infos.read(dir);
+ // nocommit what about exc from writer.close...
+ // Don't wait for merges now; we already did above:
+ writer.close(false);
+ TreeLogger.log("done close writer");
- TreeLogger.log("final infos=" + infos.toString(master.dir));
+ SegmentInfos infos = new SegmentInfos();
+ infos.read(dir);
+
+ TreeLogger.log("final infos=" + infos.toString(master.dir));
- // nocommit caller must close
- // dir.close();
- isClosed = true;
+ // nocommit caller must close
+ // dir.close();
- return infos;
+ lastInfos = infos;
+ setCopyState();
+ }
}
public synchronized boolean isClosed() {
return isClosed;
}
- public synchronized SegmentInfos getInfos() throws IOException {
- writer.w.incRefDeleter(lastInfos);
- return lastInfos;
+ private CopyState copyState;
+
+ private synchronized void setCopyState() throws IOException {
+ RAMOutputStream out = new RAMOutputStream();
+ lastInfos.write(out);
+ byte[] infosBytes = new byte[(int) out.getFilePointer()];
+ out.writeTo(infosBytes, 0);
+ copyState = new CopyState(dir,
+ getFilesMetaData(this, lastInfos.files(dir, false)),
+ lastInfos.version, infosBytes, lastInfos);
+ }
+
+ public synchronized CopyState getCopyState() throws IOException {
+ //if (isClosed && lastInfos == null) {
+ //return null;
+ //}
+ TreeLogger.log("master.getCopyState version=" + lastInfos.version + " files=" + lastInfos.files(dir, false) + " writer=" + writer);
+ if (isClosed == false) {
+ writer.w.incRefDeleter(copyState.infos);
+ }
+ int count = infosRefCount.incrementAndGet();
+ assert count >= 1;
+ return copyState;
+ }
+
+ /** Flushes, returns a ref with the resulting infos. */
+ public void flush() throws IOException {
+ SegmentInfos newInfos = master.writer.w.flushAndIncRef();
+
+ synchronized(this) {
+ writer.w.decRefDeleter(lastInfos);
+ // Steals the reference returned by IW:
+ lastInfos = newInfos;
+ setCopyState();
+ }
}
- // NOTE: steals incoming ref
- public synchronized void setInfos(SegmentInfos newInfos) throws IOException {
- writer.w.decRefDeleter(lastInfos);
- lastInfos = newInfos;
+ public synchronized void releaseCopyState(CopyState copyState) throws IOException {
+ TreeLogger.log("master.releaseCopyState version=" + copyState.version + " files=" + copyState.files.keySet());
+ // Must check because by the time the replica releases
+ // it's possible it's a different master:
+ if (copyState.dir == dir) {
+ if (isClosed == false) {
+ writer.w.decRefDeleter(copyState.infos);
+ }
+ int count = infosRefCount.decrementAndGet();
+ assert count >= 0;
+ TreeLogger.log(" infosRefCount=" + infosRefCount);
+ if (count == 0) {
+ notify();
+ }
+ } else {
+ TreeLogger.log(" skip: wrong master");
+ }
}
- public synchronized void releaseInfos(SegmentInfos infos) throws IOException {
- writer.w.decRefDeleter(infos);
+ /** Waits until all outstanding infos refs are dropped. */
+ public synchronized void waitIdle() throws InterruptedException {
+ while (true) {
+ if (infosRefCount.get() == 0) {
+ return;
+ }
+ wait();
+ }
}
}
@@ -959,7 +937,7 @@ public class TestNRTReplication extends
stop = true;
join();
}
- };
+ }
private static class Replica {
final int id;
@@ -968,7 +946,9 @@ public class TestNRTReplication extends
private final InfosSearcherManager mgr;
private volatile boolean stop;
- private SearchThread searchThread;
+ final SearchThread searchThread;
+ final SimpleFileCopier fileCopier;
+ CopyThread copyThread;
private final Collection<String> lastCommitFiles;
private final Collection<String> lastNRTFiles;
@@ -994,210 +974,260 @@ public class TestNRTReplication extends
dir = new SlowChecksumDirectory(id, new NRTCachingDirectory(fsDir, 1.0, 10.0));
TreeLogger.log("id=" + id + " created dirs, checksums; dir.listAll=" + Arrays.toString(dir.listAll()));
- // nocommit don't need copiedFiles anymore:
- // Startup sync to pull latest index over:
- Set<String> copiedFiles = null;
+ fileCopier = new SimpleFileCopier(dir, id);
+ fileCopier.start();
lastCommitFiles = new HashSet<String>();
lastNRTFiles = new HashSet<String>();
+ deleter = new InfosRefCounts(id, dir);
+
String segmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(dir);
+ SegmentInfos lastCommit = null;
if (segmentsFileName != null) {
- SegmentInfos lastCommit = new SegmentInfos();
+ lastCommit = new SegmentInfos();
TreeLogger.log("id=" + id + " " + segmentsFileName + " now load");
lastCommit.read(dir, segmentsFileName);
lastCommitFiles.addAll(lastCommit.files(dir, true));
- TreeLogger.log("id=" + id + " lastCommitFiles = " + lastCommitFiles);
+ TreeLogger.log("id=" + id + " incRef lastCommitFiles");
+ deleter.incRef(lastCommitFiles);
+ TreeLogger.log("id=" + id + " loaded version=" + lastCommit.version + " lastCommitFiles = " + lastCommitFiles);
}
// Must sync latest index from master before starting
// up mgr, so that we don't hold open any files that
// need to be overwritten when the master is against
- // an older index than our copy:
+ // an older index than our copy, and so we rollback
+ // our version if we had been at a higher version but
+ // were down when master moved:
SegmentInfos infos = null;
- assert master == null || masterLock.isLocked();
- if (master != null && master.isClosed() == false) {
- SegmentInfos masterInfos = null;
- try {
- masterInfos = master.getInfos();
- // Convert infos to byte[], to send "on the wire":
- RAMOutputStream out = new RAMOutputStream();
- masterInfos.write(out);
- byte[] infosBytes = new byte[(int) out.getFilePointer()];
- out.writeTo(infosBytes, 0);
- Map<String,FileMetaData> filesMetaData = getFilesMetaData(master, masterInfos.files(master.dir, false));
+ // Startup sync to pull latest index over:
+ Map<String,FileMetaData> copiedFiles = null;
+
+ if (master != null) {
+
+ // nocommit this logic isn't right; else, on a full
+ // restart how will a master be found:
+ // Repeat until we find a working master; this is to
+ // handle the case when a replica starts up but no
+ // new master has yet been selected when moving
+ // master:
+ while (true) {
+ Master curMaster = master;
+ CopyState copyState = curMaster.getCopyState();
try {
+ // nocommit factor this out & share w/ CopyThread:
+
+ deleter.incRef(copyState.files.keySet());
+
+ Map<String,FileMetaData> toCopy = filterFilesToCopy(copyState.files);
+ long totBytes = 0;
+ for(FileMetaData metaData : toCopy.values()) {
+ totBytes += metaData.length;
+ }
+
// Copy files over to replica:
- copiedFiles = copyFiles(master.dir, this, filesMetaData, false);
- } catch (Throwable t) {
- TreeLogger.log("id=" + id + " FAIL", t);
- throw new RuntimeException(t);
- }
+ copiedFiles = copyFiles(copyState.dir, this, toCopy, false, totBytes);
+ if (copiedFiles == null) {
+ TreeLogger.log("id=" + id + " startup copyFiles failed");
+ deleter.decRef(copyState.files.keySet());
+ try {
+ Thread.sleep(5);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ continue;
+ }
- // Turn byte[] back to SegmentInfos:
- infos = new SegmentInfos();
- infos.read(dir, new ByteArrayDataInput(infosBytes));
- lastNRTFiles.addAll(infos.files(dir, false));
- } finally {
- if (masterInfos != null) {
- master.releaseInfos(masterInfos);
+ // Turn byte[] back to SegmentInfos:
+ infos = new SegmentInfos();
+ infos.read(dir, new ByteArrayDataInput(copyState.infosBytes));
+ lastNRTFiles.addAll(copyState.files.keySet());
+ } finally {
+ curMaster.releaseCopyState(copyState);
}
+ break;
}
} else {
TreeLogger.log("id=" + id + " no master on startup; fallback to last commit");
}
- if (copiedFiles != null) {
- // nocommit factor out & share this invalidation
-
- // nocommit we need to do this invalidation BEFORE
- // actually ovewriting the file? because if we crash
- // in between the two, we've left an invalid commit?
-
- // If any of the files we just copied over
- // were referenced by the last commit,
- // then we must remove this commit point (it is now
- // corrupt):
- for(String fileName : copiedFiles) {
- if (lastCommitFiles.contains(fileName)) {
- TreeLogger.log("id=" + id + " " + segmentsFileName + " delete now corrupt commit and clear lastCommitFiles");
- dir.deleteFile(segmentsFileName);
- dir.deleteFile("segments.gen");
- lastCommitFiles.clear();
- break;
- }
- }
+ if (lastCommit != null) {
+ infos.updateGeneration(lastCommit);
}
mgr = new InfosSearcherManager(dir, id, infos);
- deleter = new InfosRefCounts(id, dir);
- TreeLogger.log("id=" + id + " incRef lastCommitFiles");
- deleter.incRef(lastCommitFiles);
- TreeLogger.log("id=" + id + ": incRef lastNRTFiles=" + lastNRTFiles);
- deleter.incRef(lastNRTFiles);
deleter.deleteUnknownFiles();
searchThread = new SearchThread(""+id, mgr, versionDocCounts);
searchThread.start();
- pruneChecksums();
+
+ copyThread = new CopyThread(this);
+ copyThread.start();
+
TreeLogger.log("done startup");
TreeLogger.end("startup");
}
- private void pruneChecksums() {
- TreeLogger.log("id=" + id + " now pruneChecksums");
- TreeLogger.start("pruneChecksums");
- Set<String> validFiles = new HashSet<String>();
- validFiles.addAll(lastNRTFiles);
- validFiles.addAll(lastCommitFiles);
- TreeLogger.end("pruneChecksums");
+ /** From the incoming files, determines which ones need
+ * copying on this replica, because they weren't yet
+ * copied, or they were previously copied but the
+ * checksum or file length is different. */
+ public Map<String,FileMetaData> filterFilesToCopy(Map<String,FileMetaData> files) throws IOException {
+ Map<String,FileMetaData> toCopy = new HashMap<>();
+
+ for(Map.Entry<String,FileMetaData> ent : files.entrySet()) {
+ String fileName = ent.getKey();
+ FileMetaData metaData = ent.getValue();
+ long srcChecksum = ent.getValue().checksum;
+
+ Long checksum = dir.getChecksum(fileName);
+
+ long curLength;
+ try {
+ curLength = dir.fileLength(fileName);
+ } catch (FileNotFoundException | NoSuchFileException e) {
+ curLength = -1;
+ }
+ if (curLength != metaData.length) {
+ TreeLogger.log("id=" + id + " " + fileName + " will copy [wrong file length old=" + curLength + " new=" + metaData.length + "]");
+ toCopy.put(fileName, metaData);
+ } else if (checksum == null) {
+ TreeLogger.log("id=" + id + " " + fileName + " will copy [no checksum] length=" + metaData.length + " srcChecksum=" + srcChecksum);
+ toCopy.put(fileName, metaData);
+ } else if (checksum.longValue() != srcChecksum) {
+ TreeLogger.log("id=" + id + " " + fileName + " will copy [wrong checksum: cur=" + checksum.longValue() + " new=" + srcChecksum + "] length=" + metaData.length);
+ toCopy.put(fileName, metaData);
+ } else {
+ TreeLogger.log("id=" + id + " " + fileName + " skip copy checksum=" + srcChecksum + " file.length=" + metaData.length);
+ }
+ }
+
+ // nocommit what about multiple commit points?
+
+ // Invalidate the current commit if we will overwrite
+ // any files from it:
+
+ // nocommit do we need sync'd here?
+ for(String fileName : toCopy.keySet()) {
+ if (lastCommitFiles.contains(fileName)) {
+ TreeLogger.log("id=" + id + " " + fileName + " is being copied but is referenced by last commit; now drop last commit; lastCommitFiles=" + lastCommitFiles);
+ deleter.decRef(lastCommitFiles);
+ dir.deleteFile("segments.gen");
+ lastCommitFiles.clear();
+ break;
+ }
+ }
+
+ return toCopy;
}
- public void copyOneFile(int id, Directory src, String fileName, IOContext context) throws IOException {
+ public void newFlush() {
+ copyThread.lock.lock();
try {
- src.copy(dir, fileName, fileName, context);
- } catch (IOException ioe) {
- TreeLogger.log("id=" + id + " " + fileName + " failed copy1", ioe);
- throw ioe;
+ copyThread.cond.signal();
+ } finally {
+ copyThread.lock.unlock();
}
}
- // nocommit move this to a thread so N replicas copy at
- // once:
-
- public synchronized Set<String> sync(SlowChecksumDirectory master, Map<String,FileMetaData> filesMetaData, byte[] infosBytes,
- long infosVersion) throws IOException {
-
+ public CopyResult copyMergedFiles(SlowChecksumDirectory src, Map<String,FileMetaData> files, IOContext context) throws IOException {
if (stop) {
throw new AlreadyClosedException("replica closed");
}
+ return fileCopier.add(src, files, true, context);
+ }
+
+ void sync(int curMasterMoveCount, SlowChecksumDirectory master, Map<String,FileMetaData> filesMetaData, byte[] infosBytes,
+ long infosVersion) throws IOException {
SegmentInfos currentInfos = mgr.getCurrentInfos();
TreeLogger.log("id=" + id + " sync version=" + infosVersion + " vs current version=" + currentInfos.getVersion());
TreeLogger.start("sync");
+ // nocommit make overall test "modal", ie up front
+ // decide whether any docs are allowed to be lost
+ // (version goes backwards on replicas) or not
+
/*
- if (currentInfos != null && currentInfos.getVersion() >= infosVersion) {
+ if (currentInfos != null && currentInfos.getVersion() >= infosVersion) {
System.out.println(" replica id=" + id + ": skip sync current version=" + currentInfos.getVersion() + " vs new version=" + infosVersion);
return;
- }
+ }
*/
+ // Must incRef before filter in case filter decRefs
+ // the last commit:
+ deleter.incRef(filesMetaData.keySet());
+
+ Map<String,FileMetaData> toCopy = filterFilesToCopy(filesMetaData);
+ long totBytes = 0;
+ for(FileMetaData metaData : toCopy.values()) {
+ totBytes += metaData.length;
+ }
+
// Copy files over to replica:
- Set<String> copiedFiles = copyFiles(master, this, filesMetaData, false);
+ Map<String,FileMetaData> copiedFiles = copyFiles(master, this, toCopy, false, totBytes);
+
+ if (copiedFiles == null) {
+ // At least one file failed to copy; skip cutover
+ TreeLogger.log("top: id=" + id + " replica sync failed; abort");
+ deleter.decRef(filesMetaData.keySet());
+ TreeLogger.end("sync");
+ return;
+ }
+
+ TreeLogger.log("top: id=" + id + " replica sync done file copy");
+
+ // OK all files copied successfully, now rebuild the
+ // infos and cutover searcher mgr
// Turn byte[] back to SegmentInfos:
SegmentInfos infos = new SegmentInfos();
infos.read(dir, new ByteArrayDataInput(infosBytes));
TreeLogger.log("id=" + id + " replica sync version=" + infos.version + " segments=" + infos.toString(dir));
- // Delete now un-referenced files:
- Collection<String> newFiles = infos.files(dir, false);
- deleter.incRef(newFiles);
- deleter.decRef(lastNRTFiles);
- lastNRTFiles.clear();
- lastNRTFiles.addAll(newFiles);
-
- // nocommit factor out & share this invalidation
-
- // nocommit we need to do this invalidation BEFORE
- // actually ovewriting the file? because if we crash
- // in between the two, we've left an invalid commit?
+ masterLock.lock();
+ try {
+ if (curMasterMoveCount != masterCount.get()) {
+ // At least one file failed to copy; skip cutover
+ TreeLogger.log("top: id=" + id + " master moved during sync; abort");
+ deleter.decRef(filesMetaData.keySet());
+ TreeLogger.end("sync");
+ return;
+ }
- // Invalidate the current commit if we overwrote any
- // files from it:
- for(String fileName : copiedFiles) {
- if (lastCommitFiles.contains(fileName)) {
- TreeLogger.log("id=" + id + " delete now corrupt commit and clear lastCommitFiles=" + lastCommitFiles);
- deleter.decRef(lastCommitFiles);
- dir.deleteFile("segments.gen");
- lastCommitFiles.clear();
- break;
+ synchronized (this) {
+ mgr.setCurrentInfos(infos);
+ deleter.decRef(lastNRTFiles);
+ lastNRTFiles.clear();
+ lastNRTFiles.addAll(filesMetaData.keySet());
}
+ } finally {
+ masterLock.unlock();
}
-
+
// Cutover to new searcher
- mgr.refresh(infos);
+ mgr.maybeRefresh();
+ TreeLogger.log("top: id=" + id + " done mgr refresh");
+
TreeLogger.end("sync");
- return copiedFiles;
}
- public synchronized void warmMerge(String segmentName, SlowChecksumDirectory master, Map<String,FileMetaData> filesMetaData) throws IOException {
+ /** Gracefully close & shutdown this replica. */
+ public void shutdown() throws IOException, InterruptedException {
if (stop) {
- throw new AlreadyClosedException("replica closed");
- }
- TreeLogger.log("id=" + id + " replica warm merge " + segmentName);
- Set<String> copiedFiles = copyFiles(master, this, filesMetaData, true);
-
- // nocommit factor out & share this invalidation
-
- // nocommit we need to do this invalidation BEFORE
- // actually ovewriting the file? because if we crash
- // in between the two, we've left an invalid commit?
-
- // Invalidate the current commit if we overwrote any
- // files from it:
- for(String fileName : copiedFiles) {
- if (lastCommitFiles.contains(fileName)) {
- TreeLogger.log("id=" + id + " delete now corrupt commit and clear lastCommitFiles=" + lastCommitFiles);
- deleter.decRef(lastCommitFiles);
- lastCommitFiles.clear();
- dir.deleteFile("segments.gen");
- break;
- }
+ return;
}
+ stop = true;
- // nocommit we could also pre-warm a SegmentReader
- // here, and add it onto subReader list for next reopen ...?
- }
+ copyThread.finish();
+ fileCopier.close();
- /** Gracefully close & shutdown this replica. */
- public synchronized void shutdown() throws IOException, InterruptedException {
- stop = true;
// Sometimes shutdown w/o commiting
TreeLogger.log("id=" + id + " replica shutdown");
TreeLogger.start("shutdown");
@@ -1221,10 +1251,15 @@ public class TestNRTReplication extends
/** Crashes the underlying directory, corrupting any
* un-sync'd files. */
- public synchronized void crash() throws IOException, InterruptedException {
+ public void crash() throws IOException, InterruptedException {
+ if (stop) {
+ return;
+ }
stop = true;
TreeLogger.log("id=" + id + " replica crash; dir.listAll()=" + Arrays.toString(dir.listAll()));
TreeLogger.log("id=" + id + " replica crash; fsdir.listAll()=" + Arrays.toString(((NRTCachingDirectory) dir.getDelegate()).getDelegate().listAll()));
+ copyThread.finish();
+ fileCopier.close();
searchThread.finish();
mgr.close();
((MockDirectoryWrapper) ((NRTCachingDirectory) dir.getDelegate()).getDelegate()).crash();
@@ -1233,24 +1268,45 @@ public class TestNRTReplication extends
/** Commit latest SegmentInfos (fsync'ing all referenced
* files). */
- public synchronized void commit(boolean deleteAll) throws IOException {
- SegmentInfos infos = mgr.getCurrentInfos();
+ public void commit(boolean deleteAll) throws IOException {
+ SegmentInfos infos;
+ Collection<String> newFiles;
+
+ synchronized (this) {
+ infos = mgr.getCurrentInfos();
+ if (infos != null) {
+ newFiles = infos.files(dir, false);
+ deleter.incRef(newFiles);
+ } else {
+ // nocommit is infos ever null?
+ newFiles = null;
+ }
+ }
+
if (infos != null) {
- TreeLogger.log("id=" + id + " commit deleteAll=" + deleteAll + "; infos.version=" + infos.getVersion() + " files=" + infos.files(dir, false) + " segs=" + infos.toString(dir));
+ TreeLogger.log("top: id=" + id + " commit deleteAll=" + deleteAll + "; infos.version=" + infos.getVersion() + " infos.files=" + newFiles + " segs=" + infos.toString(dir));
TreeLogger.start("commit");
- Set<String> fileNames = new HashSet<String>(infos.files(dir, false));
- dir.sync(fileNames);
- // Can only save those files that have been
- // explicitly sync'd; a warmed, but not yet visible,
- // merge cannot be sync'd:
+ dir.sync(newFiles);
infos.commit(dir);
- TreeLogger.log("id=" + id + " " + infos.getSegmentsFileName() + " committed");
- Collection<String> newFiles = infos.files(dir, true);
- deleter.incRef(newFiles);
+ String segmentsFileName = infos.getSegmentsFileName();
+ deleter.incRef(Collections.singletonList(segmentsFileName));
+
+ synchronized (this) {
+ // If a sync happened while we were committing, we
+ // must carry over the commit gen:
+ SegmentInfos curInfos = mgr.getCurrentInfos();
+ if (curInfos != infos) {
+ curInfos.updateGeneration(infos);
+ }
+ }
+
+ TreeLogger.log("top: id=" + id + " " + segmentsFileName + " committed; now decRef lastCommitFiles=" + lastCommitFiles);
+
deleter.decRef(lastCommitFiles);
lastCommitFiles.clear();
lastCommitFiles.addAll(newFiles);
+ lastCommitFiles.add(segmentsFileName);
TreeLogger.log("id=" + id + " " + infos.getSegmentsFileName() + " lastCommitFiles=" + lastCommitFiles);
if (deleteAll) {
@@ -1258,14 +1314,407 @@ public class TestNRTReplication extends
// that just copied over as we closed the writer:
TreeLogger.log("id=" + id + " now deleteUnknownFiles during commit");
deleter.deleteUnknownFiles();
+ TreeLogger.log("id=" + id + " done deleteUnknownFiles during commit");
}
- pruneChecksums();
TreeLogger.end("commit");
}
}
}
+ static class CopyFileJob {
+ public final CopyResult result;
+ public final Directory src;
+ public final FileMetaData metaData;
+ public final String fileName;
+ public final IOContext context;
+
+ public CopyFileJob(CopyResult result, IOContext context, Directory src, String fileName, FileMetaData metaData) {
+ this.result = result;
+ this.context = context;
+ this.src = src;
+ this.fileName = fileName;
+ this.metaData = metaData;
+ }
+ }
+
+ static class CopyOneFile implements Closeable {
+ private long bytesLeft;
+
+ // TODO: reuse...?
+ private final byte[] buffer = new byte[65536];
+
+ private final IndexInput in;
+ private final IndexOutput out;
+ private final SlowChecksumDirectory dest;
+
+ public final CopyFileJob job;
+ public boolean failed;
+
+ public CopyOneFile(CopyFileJob job, SlowChecksumDirectory dest, RateLimiter rateLimiter) throws IOException {
+ this.job = job;
+ this.dest = dest;
+ in = job.src.openInput(job.fileName, job.context);
+ boolean success = false;
+ try {
+ IndexOutput out0 = dest.createOutput(job.fileName, job.context);
+ if (rateLimiter == null) {
+ // No IO rate limiting
+ out = out0;
+ } else {
+ out = new RateLimitedIndexOutput(rateLimiter, out0);
+ }
+ success = true;
+ } finally {
+ if (success == false) {
+ IOUtils.closeWhileHandlingException(in);
+ }
+ }
+
+ bytesLeft = job.metaData.length;
+ }
+
+ public boolean visit() throws IOException {
+ int chunk = bytesLeft > buffer.length ? buffer.length : (int) bytesLeft;
+ try {
+ in.readBytes(buffer, 0, chunk);
+ out.writeBytes(buffer, 0, chunk);
+ } catch (Exception e) {
+ TreeLogger.log("failed to copy " + job.fileName, e);
+ failed = true;
+ }
+ bytesLeft -= chunk;
+ return bytesLeft != 0;
+ }
+
+ /** Abort the copy. */
+ public void abort() {
+ TreeLogger.log("now abort copy file " + job.fileName);
+ try {
+ close();
+ } catch (IOException ioe) {
+ }
+ try {
+ dest.deleteFile(job.fileName);
+ } catch (IOException ioe) {
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.close(in, out);
+ if (failed) {
+ dest.deleteFile(job.fileName);
+ throw new IOException("copy failed");
+ }
+
+ Long actual = dest.getChecksum(job.fileName);
+ assert actual != null;
+
+ if (actual.longValue() != job.metaData.checksum) {
+ // Uh oh, bits flipped during copy:
+ dest.deleteFile(job.fileName);
+ throw new IOException("checksum mismatch");
+ }
+ }
+ }
+
+ static class CopyResult {
+ public final CountDownLatch done;
+ public final AtomicBoolean failed = new AtomicBoolean();
+
+ public CopyResult(int fileCount) {
+ done = new CountDownLatch(fileCount);
+ }
+ }
+
+ // TODO: abstract this, enable swapping in different
+ // low-level tools ... rsync, bittorrent, etc.
+
+ /** Simple class to copy low (merged segments) & high
+ * (flushes) priority files. It has a simplistic
+ * "ionice" implementation: if we are copying a low-pri
+ * (merge) file and a high-pri (flush) job shows up, then
+ * we pause the low-pri copy and finish all high-pri
+ * copies, then resume it. */
+
+ static class SimpleFileCopier extends Thread implements Closeable {
+
+ private final Queue<CopyFileJob> highPriorityJobs = new ConcurrentLinkedQueue<>();
+ private final Queue<CopyFileJob> lowPriorityJobs = new ConcurrentLinkedQueue<>();
+ private final SlowChecksumDirectory dest;
+
+ // nocommit make rate limit (10 MB/sec now) controllable:
+ private final RateLimiter mergeRateLimiter = new RateLimiter.SimpleRateLimiter(10.0);
+ private final int id;
+ private boolean stop;
+
+ /** We always copy files into this dest. */
+ public SimpleFileCopier(SlowChecksumDirectory dest, int id) {
+ this.dest = dest;
+ this.id = id;
+ }
+
+ // nocommit use Future here
+ public synchronized CopyResult add(SlowChecksumDirectory src, Map<String,FileMetaData> files, boolean lowPriority, IOContext context) {
+ if (stop) {
+ throw new AlreadyClosedException("closed");
+ }
+ CopyResult result = new CopyResult(files.size());
+ Queue<CopyFileJob> queue = lowPriority ? lowPriorityJobs : highPriorityJobs;
+ for(Map.Entry<String,FileMetaData> ent : files.entrySet()) {
+ queue.add(new CopyFileJob(result, context, src, ent.getKey(), ent.getValue()));
+ }
+ notify();
+ return result;
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName("fileCopier id=" + id);
+ TreeLogger.setLogger(new TreeLogger("fileCopier id=" + id));
+
+ CopyOneFile curLowPri = null;
+ CopyOneFile curHighPri = null;
+
+ boolean success = false;
+
+ try {
+ while (stop == false) {
+ if (curHighPri != null) {
+ if (curHighPri.visit() == false) {
+ TreeLogger.log("id=" + id + " " + curHighPri.job.fileName + " high-priority copy done");
+ // Finished copying; now close & verify checksums:
+ try {
+ curHighPri.close();
+ } catch (IOException ioe) {
+ curHighPri.job.result.failed.set(true);
+ TreeLogger.log("WARNING: id=" + id + " " + curHighPri.job.fileName + ": failed to copy high-priority file");
+ } finally {
+ curHighPri.job.result.done.countDown();
+ }
+ curHighPri = null;
+ }
+ } else if (highPriorityJobs.isEmpty() == false) {
+ // Start new high-priority copy:
+ CopyFileJob job = highPriorityJobs.poll();
+ try {
+ curHighPri = new CopyOneFile(job, dest, null);
+ TreeLogger.log("id=" + id + " " + curHighPri.job.fileName + " now start high-priority copy");
+ } catch (AlreadyClosedException ace) {
+ TreeLogger.log("id=" + id + " " + job.fileName + " skip copy: hit AlreadyClosedException");
+ job.result.failed.set(true);
+ job.result.done.countDown();
+ }
+ } else if (curLowPri != null) {
+ if (curLowPri.visit() == false) {
+ TreeLogger.log("id=" + id + " " + curLowPri.job.fileName + " low-priority copy done");
+ // Finished copying; now close & verify checksums:
+ try {
+ curLowPri.close();
+ } catch (IOException ioe) {
+ TreeLogger.log("WARNING: id=" + id + " " + curLowPri.job.fileName + ": failed to copy low-priority file");
+ curLowPri.job.result.failed.set(true);
+ } finally {
+ curLowPri.job.result.done.countDown();
+ }
+ curLowPri = null;
+ }
+ } else if (lowPriorityJobs.isEmpty() == false) {
+ // Start new low-priority copy:
+ CopyFileJob job = lowPriorityJobs.poll();
+ try {
+ curLowPri = new CopyOneFile(job, dest, mergeRateLimiter);
+ TreeLogger.log("id=" + id + " " + curLowPri.job.fileName + " now start low-priority copy");
+ } catch (AlreadyClosedException ace) {
+ TreeLogger.log("id=" + id + " " + job.fileName + " skip copy: hit AlreadyClosedException");
+ job.result.failed.set(true);
+ job.result.done.countDown();
+ }
+ } else {
+ // Wait for another job:
+ synchronized (this) {
+ if (highPriorityJobs.isEmpty() && lowPriorityJobs.isEmpty()) {
+ TreeLogger.log("id=" + id + " copy thread now idle");
+ try {
+ wait();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ TreeLogger.log("id=" + id + " copy thread now wake up");
+ }
+ }
+ }
+ }
+ success = true;
+ } catch (IOException ioe) {
+ // nocommit catch each op & retry instead?
+ throw new RuntimeException(ioe);
+ } finally {
+ synchronized(this) {
+ stop = true;
+ if (curLowPri != null) {
+ curLowPri.abort();
+ curLowPri.job.result.failed.set(true);
+ curLowPri.job.result.done.countDown();
+ }
+ if (curHighPri != null) {
+ curHighPri.abort();
+ curHighPri.job.result.failed.set(true);
+ curHighPri.job.result.done.countDown();
+ }
+
+ for(CopyFileJob job : highPriorityJobs) {
+ job.result.failed.set(true);
+ job.result.done.countDown();
+ }
+
+ for(CopyFileJob job : lowPriorityJobs) {
+ job.result.failed.set(true);
+ job.result.done.countDown();
+ }
+ }
+ }
+ }
+
+ // nocommit cutover all other finishes to closeable
+
+ @Override
+ public synchronized void close() throws IOException {
+ stop = true;
+ try {
+ notify();
+ join();
+ } catch (InterruptedException ie) {
+ // nocommit
+ throw new IOException(ie);
+ }
+ }
+ }
+
+ // TODO: we could pre-copy merged files out event before
+ // warmMerge is called? e.g. if we "notice" files being
+ // written to the dir ... could give us a "head start"
+
+ /** Runs in each replica, to handle copying over new
+ * flushes. */
+ static class CopyThread extends Thread {
+ final Lock lock;
+ final Condition cond;
+ private final Replica replica;
+ private volatile boolean stop;
+
+ public CopyThread(Replica replica) {
+ this.lock = new ReentrantLock();
+ this.cond = lock.newCondition();
+ this.stop = false;
+ this.replica = replica;
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName("replica id=" + replica.id);
+ TreeLogger.setLogger(new TreeLogger("replica id=" + replica.id));
+
+ try {
+ // While loop to keep pulling newly flushed/merged
+ // segments until we shutdown
+ while (stop == false) {
+
+ try {
+
+ // While loop to pull a single new segment:
+ long curVersion = replica.mgr.getCurrentInfos().version;
+
+ SegmentInfos newInfos = null;
+ CopyState copyState = null;
+ Master curMaster = null;
+ int curMasterMoveCount = -1;
+ while (stop == false) {
+ lock.lock();
+ try {
+ curMaster = master;
+ if (curMaster != null) {
+ copyState = curMaster.getCopyState();
+ curMasterMoveCount = masterCount.get();
+ if (copyState != null) {
+ assert copyState.version >= curVersion: "copyState.version=" + copyState.version + " vs curVersion=" + curVersion;
+ if (copyState.version > curVersion) {
+ TreeLogger.log("got new copyState");
+ break;
+ } else {
+ TreeLogger.log("skip newInfos");
+ curMaster.releaseCopyState(copyState);
+ copyState = null;
+ }
+ } else {
+ // Master is closed
+ Thread.sleep(5);
+ }
+ } else {
+ // Master hasn't started yet
+ Thread.sleep(5);
+ }
+ cond.awaitUninterruptibly();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ // We hold a ref on newInfos at this point
+
+ // nocommit what if master closes now?
+
+ if (stop) {
+ if (copyState != null) {
+ curMaster.releaseCopyState(copyState);
+ }
+ break;
+ }
+
+ // nocommit need to sometimes crash during
+ // replication; ooh: we could just Thread.interrupt()?
+
+ // nocommit needs to cross the "wire", ie turn into
+ // byte[] and back, and manage the "reservation"
+ // separately on master
+
+ try {
+ // nocommit just pass copyState
+ replica.sync(curMasterMoveCount, copyState.dir, copyState.files, copyState.infosBytes, copyState.version);
+ } finally {
+ curMaster.releaseCopyState(copyState);
+ }
+ } catch (AlreadyClosedException ace) {
+ // OK: master closed while we were replicating;
+ // we will just retry again against the next master:
+ TreeLogger.log("top: id=" + replica.id + " ignore AlreadyClosedException");
+ Thread.sleep(5);
+ continue;
+ }
+ }
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ } catch (IOException ioe) {
+ // nocommit how to properly handle...
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ /** Shuts down the thread and only returns once
+ * it's done. */
+ public void finish() throws InterruptedException {
+ stop = true;
+ lock.lock();
+ try {
+ cond.signal();
+ } finally {
+ lock.unlock();
+ }
+ join();
+ }
+ }
+
static class SearchThread extends Thread {
private volatile boolean stop;
private final InfosSearcherManager mgr;
@@ -1292,7 +1741,7 @@ public class TestNRTReplication extends
if (totalHits > 0) {
long version = ((DirectoryReader) s.getIndexReader()).getVersion();
Integer expectedCount = versionDocCounts.get(version);
- assertNotNull("searcher " + s + " is missing expected count", expectedCount);
+ assertNotNull("searcher " + s + " version=" + version + " is missing expected count", expectedCount);
// nocommit since master may roll back in time
// we cannot assert this:
//assertEquals("searcher version=" + version + " replica id=" + id + " searcher=" + s, expectedCount.intValue(), totalHits);
@@ -1342,13 +1791,18 @@ public class TestNRTReplication extends
} else {
refCounts.put(fileName, curCount.intValue() + 1);
}
+
+ // Necessary in case we had tried to delete this
+ // fileName before, it failed, but then it was later
+ // overwritten:
+ pending.remove(fileName);
}
}
public synchronized void decRef(Collection<String> fileNames) {
for(String fileName : fileNames) {
Integer curCount = refCounts.get(fileName);
- assert curCount != null;
+ assert curCount != null: "fileName=" + fileName;
assert curCount.intValue() > 0;
if (curCount.intValue() == 1) {
refCounts.remove(fileName);
@@ -1390,5 +1844,4 @@ public class TestNRTReplication extends
}
}
}
-
}