You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2017/03/14 06:41:01 UTC
[24/43] lucene-solr:feature/autoscaling: LUCENE-7700: Move throughput
control and merge aborting out of IndexWriter's core.
LUCENE-7700: Move throughput control and merge aborting out of IndexWriter's core.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/9540bc37
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/9540bc37
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/9540bc37
Branch: refs/heads/feature/autoscaling
Commit: 9540bc37583dfd4e995b893154039fcf031dc3c3
Parents: d2bf30d
Author: Dawid Weiss <dw...@apache.org>
Authored: Fri Mar 10 10:23:29 2017 +0100
Committer: Dawid Weiss <dw...@apache.org>
Committed: Fri Mar 10 10:23:29 2017 +0100
----------------------------------------------------------------------
lucene/CHANGES.txt | 6 +
.../lucene/index/ConcurrentMergeScheduler.java | 75 ++++++--
.../org/apache/lucene/index/IndexWriter.java | 94 ++++------
.../org/apache/lucene/index/MergePolicy.java | 184 +++++++++++++++++--
.../apache/lucene/index/MergeRateLimiter.java | 177 +++++++-----------
.../org/apache/lucene/index/MergeScheduler.java | 12 ++
.../apache/lucene/index/NoMergeScheduler.java | 7 +
.../lucene/index/TestMergeRateLimiter.java | 4 +-
8 files changed, 358 insertions(+), 201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9540bc37/lucene/CHANGES.txt
----------------------------------------------------------------------
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 4040945..b6ee4b8 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -117,6 +117,12 @@ API Changes
instead of once all shard responses are present. (Simon Willnauer,
Mike McCandless)
+* LUCENE-7700: A cleanup of merge throughput control logic. Refactored all the
+ code previously scattered throughout the IndexWriter and
+ ConcurrentMergeScheduler into a more accessible set of public methods (see
+ MergePolicy.OneMergeProgress, MergeScheduler.wrapForMerge and
+ OneMerge.mergeInit). (Dawid Weiss, Mike McCandless).
+
* LUCENE-7734: FieldType's copy constructor was widened to accept any IndexableFieldType.
(David Smiley)
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9540bc37/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
index 0dd0a4d..6e930c4 100644
--- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
+++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
@@ -25,6 +25,11 @@ import java.util.Locale;
import org.apache.lucene.index.MergePolicy.OneMerge;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.RateLimitedIndexOutput;
+import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
@@ -255,6 +260,36 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
assert false: "merge thread " + currentThread + " was not found";
}
+ @Override
+ public Directory wrapForMerge(OneMerge merge, Directory in) {
+ Thread mergeThread = Thread.currentThread();
+ if (!MergeThread.class.isInstance(mergeThread)) {
+ throw new AssertionError("wrapForMerge should be called from MergeThread. Current thread: "
+ + mergeThread);
+ }
+
+ // Return a wrapped Directory which has rate-limited output.
+ RateLimiter rateLimiter = ((MergeThread) mergeThread).rateLimiter;
+ return new FilterDirectory(in) {
+ @Override
+ public IndexOutput createOutput(String name, IOContext context) throws IOException {
+ ensureOpen();
+
+ // This Directory is only supposed to be used during merging,
+ // so all writes should have MERGE context, else there is a bug
+ // somewhere that is failing to pass down the right IOContext:
+ assert context.context == IOContext.Context.MERGE: "got context=" + context.context;
+
+ // Because rateLimiter is bound to a particular merge thread, this method should
+ // always be called from that context. Verify this.
+ assert mergeThread == Thread.currentThread() : "Not the same merge thread, current="
+ + Thread.currentThread() + ", expected=" + mergeThread;
+
+ return new RateLimitedIndexOutput(rateLimiter, in.createOutput(name, context));
+ }
+ };
+ }
+
/**
* Called whenever the running merges have changed, to set merge IO limits.
* This method sorts the merge threads by their merge size in
@@ -327,8 +362,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
newMBPerSec = targetMBPerSec;
}
- double curMBPerSec = merge.rateLimiter.getMBPerSec();
-
+ MergeRateLimiter rateLimiter = mergeThread.rateLimiter;
+ double curMBPerSec = rateLimiter.getMBPerSec();
+
if (verbose()) {
long mergeStartNS = merge.mergeStartNS;
if (mergeStartNS == -1) {
@@ -339,11 +375,11 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
message.append(String.format(Locale.ROOT, "merge thread %s estSize=%.1f MB (written=%.1f MB) runTime=%.1fs (stopped=%.1fs, paused=%.1fs) rate=%s\n",
mergeThread.getName(),
bytesToMB(merge.estimatedMergeBytes),
- bytesToMB(merge.rateLimiter.totalBytesWritten),
+ bytesToMB(rateLimiter.getTotalBytesWritten()),
nsToSec(now - mergeStartNS),
- nsToSec(merge.rateLimiter.getTotalStoppedNS()),
- nsToSec(merge.rateLimiter.getTotalPausedNS()),
- rateToString(merge.rateLimiter.getMBPerSec())));
+ nsToSec(rateLimiter.getTotalStoppedNS()),
+ nsToSec(rateLimiter.getTotalPausedNS()),
+ rateToString(rateLimiter.getMBPerSec())));
if (newMBPerSec != curMBPerSec) {
if (newMBPerSec == 0.0) {
@@ -364,7 +400,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
}
}
- merge.rateLimiter.setMBPerSec(newMBPerSec);
+ rateLimiter.setMBPerSec(newMBPerSec);
}
if (verbose()) {
message(message.toString());
@@ -449,7 +485,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
Thread currentThread = Thread.currentThread();
int count = 0;
for (MergeThread mergeThread : mergeThreads) {
- if (currentThread != mergeThread && mergeThread.isAlive() && mergeThread.merge.rateLimiter.getAbort() == false) {
+ if (currentThread != mergeThread && mergeThread.isAlive() && mergeThread.merge.isAborted() == false) {
count++;
}
}
@@ -497,8 +533,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
return;
}
- updateIOThrottle(merge);
-
boolean success = false;
try {
if (verbose()) {
@@ -507,14 +541,16 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
// OK to spawn a new merge thread to handle this
// merge:
- final MergeThread merger = getMergeThread(writer, merge);
- mergeThreads.add(merger);
+ final MergeThread newMergeThread = getMergeThread(writer, merge);
+ mergeThreads.add(newMergeThread);
+
+ updateIOThrottle(newMergeThread.merge, newMergeThread.rateLimiter);
if (verbose()) {
- message(" launch new thread [" + merger.getName() + "]");
+ message(" launch new thread [" + newMergeThread.getName() + "]");
}
- merger.start();
+ newMergeThread.start();
updateMergeThreads();
success = true;
@@ -598,16 +634,17 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
/** Runs a merge thread to execute a single merge, then exits. */
protected class MergeThread extends Thread implements Comparable<MergeThread> {
-
final IndexWriter writer;
final OneMerge merge;
+ final MergeRateLimiter rateLimiter;
/** Sole constructor. */
public MergeThread(IndexWriter writer, OneMerge merge) {
this.writer = writer;
this.merge = merge;
+ this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress());
}
-
+
@Override
public int compareTo(MergeThread other) {
// Larger merges sort first:
@@ -616,9 +653,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
@Override
public void run() {
-
try {
-
if (verbose()) {
message(" merge thread: start");
}
@@ -715,7 +750,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
}
/** Tunes IO throttle when a new merge starts. */
- private synchronized void updateIOThrottle(OneMerge newMerge) throws IOException {
+ private synchronized void updateIOThrottle(OneMerge newMerge, MergeRateLimiter rateLimiter) throws IOException {
if (doAutoIOThrottle == false) {
return;
}
@@ -794,7 +829,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
} else {
rate = targetMBPerSec;
}
- newMerge.rateLimiter.setMBPerSec(rate);
+ rateLimiter.setMBPerSec(rate);
targetMBPerSecChanged();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9540bc37/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index da030ca..aa28d99 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -36,6 +36,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
@@ -51,22 +52,18 @@ import org.apache.lucene.search.Sort;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
-import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.FlushInfo;
import org.apache.lucene.store.IOContext;
-import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.LockValidatingDirectoryWrapper;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.MergeInfo;
-import org.apache.lucene.store.RateLimitedIndexOutput;
import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.util.CloseableThreadLocal;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
@@ -277,7 +274,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
private final Directory directoryOrig; // original user directory
private final Directory directory; // wrapped with additional checks
- private final Directory mergeDirectory; // wrapped with throttling: used for merging
private final Analyzer analyzer; // how to analyze text
private final AtomicLong changeCount = new AtomicLong(); // increments every time a change is completed
@@ -353,8 +349,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* card to make sure they can later charge you when you check out. */
final AtomicLong pendingNumDocs = new AtomicLong();
- final CloseableThreadLocal<MergeRateLimiter> rateLimiters = new CloseableThreadLocal<>();
-
DirectoryReader getReader() throws IOException {
return getReader(true, false);
}
@@ -809,10 +803,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
directoryOrig = d;
directory = new LockValidatingDirectoryWrapper(d, writeLock);
- // Directory we use for merging, so we can abort running merges, and so
- // merge schedulers can optionally rate-limit per-merge IO:
- mergeDirectory = addMergeRateLimiters(directory);
-
analyzer = config.getAnalyzer();
mergeScheduler = config.getMergeScheduler();
mergeScheduler.setInfoStream(infoStream);
@@ -2212,8 +2202,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
try {
abortMerges();
- rateLimiters.close();
-
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "rollback: done finish merges");
}
@@ -2418,7 +2406,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now abort pending merge " + segString(merge.segments));
}
- merge.rateLimiter.setAbort();
+ merge.setAborted();
mergeFinish(merge);
}
pendingMerges.clear();
@@ -2427,7 +2415,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now abort running merge " + segString(merge.segments));
}
- merge.rateLimiter.setAbort();
+ merge.setAborted();
}
// We wait here to make all merges stop. It should not
@@ -2775,13 +2763,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* index.
*
* <p>
- * <b>NOTE:</b> this method merges all given {@link LeafReader}s in one
+ * <b>NOTE:</b> this merges all given {@link LeafReader}s in one
* merge. If you intend to merge a large number of readers, it may be better
* to call this method multiple times, each time with a small set of readers.
* In principle, if you use a merge policy with a {@code mergeFactor} or
* {@code maxMergeAtOnce} parameter, you should pass that many readers in one
* call.
*
+ * <p>
+ * <b>NOTE:</b> this method does not call or make use of the {@link MergeScheduler},
+ * so any custom bandwidth throttling is at the moment ignored.
+ *
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
@@ -2832,8 +2824,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
SegmentMerger merger = new SegmentMerger(Arrays.asList(readers), info, infoStream, trackingDir,
globalFieldNumberMap,
context);
-
- rateLimiters.set(new MergeRateLimiter(null));
if (!merger.shouldMerge()) {
return docWriter.deleteQueue.getNextSequenceNumber();
@@ -2864,7 +2854,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Now create the compound file if needed
if (useCompoundFile) {
Collection<String> filesToDelete = infoPerCommit.files();
- TrackingDirectoryWrapper trackingCFSDir = new TrackingDirectoryWrapper(mergeDirectory);
+ TrackingDirectoryWrapper trackingCFSDir = new TrackingDirectoryWrapper(directory);
// TODO: unlike merge, on exception we arent sniping any trash cfs files here?
// createCompoundFile tries to cleanup, but it might not always be able to...
try {
@@ -3745,7 +3735,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// deleter.refresh() call that will remove any index
// file that current segments does not reference), we
// abort this merge
- if (merge.rateLimiter.getAbort()) {
+ if (merge.isAborted()) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "commitMerge: skip: it was aborted");
}
@@ -3905,8 +3895,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
boolean success = false;
- rateLimiters.set(merge.rateLimiter);
-
final long t0 = System.currentTimeMillis();
final MergePolicy mergePolicy = config.getMergePolicy();
@@ -3937,7 +3925,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "hit exception during merge");
}
- } else if (merge.rateLimiter.getAbort() == false && (merge.maxNumSegments != UNBOUNDED_MAX_MERGE_SEGMENTS || (!closed && !closing))) {
+ } else if (!merge.isAborted() && (merge.maxNumSegments != UNBOUNDED_MAX_MERGE_SEGMENTS || (!closed && !closing))) {
// This merge (and, generally, any change to the
// segments) may now enable new merges, so we call
// merge policy & update pending merges.
@@ -3951,7 +3939,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
tragicEvent(t, "merge");
}
- if (merge.info != null && merge.rateLimiter.getAbort() == false) {
+ if (merge.info != null && merge.isAborted() == false) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "merge time " + (System.currentTimeMillis()-t0) + " msec for " + merge.info.info.maxDoc() + " docs");
}
@@ -3976,7 +3964,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
assert merge.segments.size() > 0;
if (stopMerges) {
- merge.rateLimiter.setAbort();
+ merge.setAborted();
throw new MergePolicy.MergeAbortedException("merge is aborted: " + segString(merge.segments));
}
@@ -4087,7 +4075,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
return;
}
- if (merge.rateLimiter.getAbort()) {
+ merge.mergeInit();
+
+ if (merge.isAborted()) {
return;
}
@@ -4239,9 +4229,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* but without holding synchronized lock on IndexWriter
* instance */
private int mergeMiddle(MergePolicy.OneMerge merge, MergePolicy mergePolicy) throws IOException {
+ merge.checkAborted();
- merge.rateLimiter.checkAbort();
-
+ Directory mergeDirectory = config.getMergeScheduler().wrapForMerge(merge, directory);
List<SegmentCommitInfo> sourceSegments = merge.segments;
IOContext context = new IOContext(merge.getStoreMergeInfo());
@@ -4339,7 +4329,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
globalFieldNumberMap,
context);
- merge.rateLimiter.checkAbort();
+ merge.checkAborted();
merge.mergeStartNS = System.nanoTime();
@@ -4354,11 +4344,20 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (infoStream.isEnabled("IW")) {
if (merger.shouldMerge()) {
+ String pauseInfo = merge.getMergeProgress().getPauseTimes().entrySet()
+ .stream()
+ .filter((e) -> e.getValue() > 0)
+ .map((e) -> String.format(Locale.ROOT, "%.1f sec %s",
+ e.getValue() / 1000000000.,
+ e.getKey().name().toLowerCase(Locale.ROOT)))
+ .collect(Collectors.joining(", "));
+ if (!pauseInfo.isEmpty()) {
+ pauseInfo = " (" + pauseInfo + ")";
+ }
+
long t1 = System.nanoTime();
double sec = (t1-merge.mergeStartNS)/1000000000.;
double segmentMB = (merge.info.sizeInBytes()/1024./1024.);
- double stoppedSec = merge.rateLimiter.getTotalStoppedNS()/1000000000.;
- double throttleSec = merge.rateLimiter.getTotalPausedNS()/1000000000.;
infoStream.message("IW", "merge codec=" + codec + " maxDoc=" + merge.info.info.maxDoc() + "; merged segment has " +
(mergeState.mergeFieldInfos.hasVectors() ? "vectors" : "no vectors") + "; " +
(mergeState.mergeFieldInfos.hasNorms() ? "norms" : "no norms") + "; " +
@@ -4367,10 +4366,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
(mergeState.mergeFieldInfos.hasProx() ? "freqs" : "no freqs") + "; " +
(mergeState.mergeFieldInfos.hasPointValues() ? "points" : "no points") + "; " +
String.format(Locale.ROOT,
- "%.1f sec (%.1f sec stopped, %.1f sec paused) to merge segment [%.2f MB, %.2f MB/sec]",
+ "%.1f sec%s to merge segment [%.2f MB, %.2f MB/sec]",
sec,
- stoppedSec,
- throttleSec,
+ pauseInfo,
segmentMB,
segmentMB / sec));
} else {
@@ -4406,7 +4404,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
success = true;
} catch (Throwable t) {
synchronized(this) {
- if (merge.rateLimiter.getAbort()) {
+ if (merge.isAborted()) {
// This can happen if rollback is called while we were building
// our CFS -- fall through to logic below to remove the non-CFS
// merged files:
@@ -4439,7 +4437,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// registered with IFD
deleteNewFiles(filesToRemove);
- if (merge.rateLimiter.getAbort()) {
+ if (merge.isAborted()) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "abort merge after building CFS");
}
@@ -5063,30 +5061,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
throw new IllegalArgumentException("number of documents in the index cannot exceed " + actualMaxDocs + " (current document count is " + pendingNumDocs.get() + "; added numDocs is " + addedNumDocs + ")");
}
- /** Wraps the incoming {@link Directory} so that we assign a per-thread
- * {@link MergeRateLimiter} to all created {@link IndexOutput}s. */
- private Directory addMergeRateLimiters(Directory in) {
- return new FilterDirectory(in) {
- @Override
- public IndexOutput createOutput(String name, IOContext context) throws IOException {
- ensureOpen();
-
- // Paranoia defense: if this trips we have a bug somewhere...
- IndexWriter.this.ensureOpen(false);
-
- // This Directory is only supposed to be used during merging,
- // so all writes should have MERGE context, else there is a bug
- // somewhere that is failing to pass down the right IOContext:
- assert context.context == IOContext.Context.MERGE: "got context=" + context.context;
-
- MergeRateLimiter rateLimiter = rateLimiters.get();
- assert rateLimiter != null;
-
- return new RateLimitedIndexOutput(rateLimiter, in.createOutput(name, context));
- }
- };
- }
-
/** Returns the highest <a href="#sequence_number">sequence number</a> across
* all completed operations, or 0 if no operations have finished yet. Still
* in-flight operations (in other threads) are not counted until they finish.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9540bc37/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
index dbf37df..d9a0ab8 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
@@ -19,12 +19,19 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.EnumMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MergeInfo;
-import org.apache.lucene.store.RateLimiter;
/**
* <p>Expert: a MergePolicy determines the sequence of
@@ -55,6 +62,125 @@ import org.apache.lucene.store.RateLimiter;
* @lucene.experimental
*/
public abstract class MergePolicy {
+ /**
+ * Progress and state for an executing merge. This class
+ * encapsulates the logic to pause and resume the merge thread
+ * or to abort the merge entirely.
+ *
+ * @lucene.experimental */
+ public static class OneMergeProgress {
+ /** Reason for pausing the merge thread. */
+ public static enum PauseReason {
+ /** Stopped (because of throughput rate set to 0, typically). */
+ STOPPED,
+ /** Temporarily paused because of exceeded throughput rate. */
+ PAUSED,
+ /** Other reason. */
+ OTHER
+ };
+
+ private final ReentrantLock pauseLock = new ReentrantLock();
+ private final Condition pausing = pauseLock.newCondition();
+
+ /**
+ * Pause times (in nanoseconds) for each {@link PauseReason}.
+ */
+ private final EnumMap<PauseReason, AtomicLong> pauseTimesNS;
+
+ private volatile boolean aborted;
+
+ /**
+ * This field is for sanity-check purposes only. Only the same thread that invoked
+ * {@link OneMerge#mergeInit()} is permitted to be calling
+ * {@link #pauseNanos}. This is always verified at runtime.
+ */
+ private Thread owner;
+
+ /** Creates a new merge progress info. */
+ public OneMergeProgress() {
+ // Place all the pause reasons in there immediately so that we can simply update values.
+ pauseTimesNS = new EnumMap<PauseReason,AtomicLong>(PauseReason.class);
+ for (PauseReason p : PauseReason.values()) {
+ pauseTimesNS.put(p, new AtomicLong());
+ }
+ }
+
+ /**
+ * Abort the merge this progress tracks at the next
+ * possible moment.
+ */
+ public void abort() {
+ aborted = true;
+ wakeup(); // wakeup any paused merge thread.
+ }
+
+ /**
+ * Return the aborted state of this merge.
+ */
+ public boolean isAborted() {
+ return aborted;
+ }
+
+ /**
+ * Pauses the calling thread for at least <code>pauseNanos</code> nanoseconds
+ * unless the merge is aborted or the external condition returns <code>false</code>,
+ * in which case control returns immediately.
+ *
+ * The external condition is required so that other threads can terminate the pausing immediately,
+ * before <code>pauseNanos</code> expires. We can't rely on just {@link Condition#awaitNanos(long)} alone
+ * because it can return due to spurious wakeups too.
+ *
+ * @param condition The pause condition that should return false if immediate return from this
+ * method is needed. Other threads can wake up any sleeping thread by calling
+ * {@link #wakeup}, but it'd fall to sleep for the remainder of the requested time if this
+ * condition
+ */
+ public void pauseNanos(long pauseNanos, PauseReason reason, BooleanSupplier condition) throws InterruptedException {
+ if (Thread.currentThread() != owner) {
+ throw new RuntimeException("Only the merge owner thread can call pauseNanos(). This thread: "
+ + Thread.currentThread().getName() + ", owner thread: "
+ + owner);
+ }
+
+ long start = System.nanoTime();
+ AtomicLong timeUpdate = pauseTimesNS.get(reason);
+ pauseLock.lock();
+ try {
+ while (pauseNanos > 0 && !aborted && condition.getAsBoolean()) {
+ pauseNanos = pausing.awaitNanos(pauseNanos);
+ }
+ } finally {
+ pauseLock.unlock();
+ timeUpdate.addAndGet(System.nanoTime() - start);
+ }
+ }
+
+ /**
+ * Request a wakeup for any threads stalled in {@link #pauseNanos}.
+ */
+ public void wakeup() {
+ pauseLock.lock();
+ try {
+ pausing.signalAll();
+ } finally {
+ pauseLock.unlock();
+ }
+ }
+
+ /** Returns pause reasons and associated times in nanoseconds. */
+ public Map<PauseReason,Long> getPauseTimes() {
+ Set<Entry<PauseReason,AtomicLong>> entries = pauseTimesNS.entrySet();
+ return entries.stream()
+ .collect(Collectors.toMap(
+ (e) -> e.getKey(),
+ (e) -> e.getValue().get()));
+ }
+
+ final void setMergeThread(Thread owner) {
+ assert this.owner == null;
+ this.owner = owner;
+ }
+ }
/** OneMerge provides the information necessary to perform
* an individual primitive merge operation, resulting in
@@ -64,7 +190,6 @@ public abstract class MergePolicy {
*
* @lucene.experimental */
public static class OneMerge {
-
SegmentCommitInfo info; // used by IndexWriter
boolean registerDone; // used by IndexWriter
long mergeGen; // used by IndexWriter
@@ -82,8 +207,10 @@ public abstract class MergePolicy {
/** Segments to be merged. */
public final List<SegmentCommitInfo> segments;
- /** A private {@link RateLimiter} for this merge, used to rate limit writes and abort. */
- public final MergeRateLimiter rateLimiter;
+ /**
+ * Control used to pause/stop/resume the merge thread.
+ */
+ private final OneMergeProgress mergeProgress;
volatile long mergeStartNS = -1;
@@ -106,9 +233,17 @@ public abstract class MergePolicy {
}
totalMaxDoc = count;
- rateLimiter = new MergeRateLimiter(this);
+ mergeProgress = new OneMergeProgress();
}
+ /**
+ * Called by {@link IndexWriter} after the merge started and from the
+ * thread that will be executing the merge.
+ */
+ public void mergeInit() throws IOException {
+ mergeProgress.setMergeThread(Thread.currentThread());
+ }
+
/** Called by {@link IndexWriter} after the merge is done and all readers have been closed. */
public void mergeFinished() throws IOException {
}
@@ -163,7 +298,7 @@ public abstract class MergePolicy {
if (maxNumSegments != -1) {
b.append(" [maxNumSegments=" + maxNumSegments + "]");
}
- if (rateLimiter.getAbort()) {
+ if (isAborted()) {
b.append(" [ABORTED]");
}
return b.toString();
@@ -194,7 +329,32 @@ public abstract class MergePolicy {
/** Return {@link MergeInfo} describing this merge. */
public MergeInfo getStoreMergeInfo() {
return new MergeInfo(totalMaxDoc, estimatedMergeBytes, isExternal, maxNumSegments);
- }
+ }
+
+ /** Returns true if this merge was or should be aborted. */
+ public boolean isAborted() {
+ return mergeProgress.isAborted();
+ }
+
+ /** Marks this merge as aborted. The merge thread should terminate at the soonest possible moment. */
+ public void setAborted() {
+ this.mergeProgress.abort();
+ }
+
+ /** Checks if merge has been aborted and throws a merge exception if so. */
+ public void checkAborted() throws MergeAbortedException {
+ if (isAborted()) {
+ throw new MergePolicy.MergeAbortedException("merge is aborted: " + segString());
+ }
+ }
+
+ /**
+ * Returns a {@link OneMergeProgress} instance for this merge, which provides
+ * statistics of the merge threads (run time vs. sleep time) if merging is throttled.
+ */
+ public OneMergeProgress getMergeProgress() {
+ return mergeProgress;
+ }
}
/**
@@ -222,8 +382,7 @@ public abstract class MergePolicy {
merges.add(merge);
}
- /** Returns a description of the merges in this
- * specification. */
+ /** Returns a description of the merges in this specification. */
public String segString(Directory dir) {
StringBuilder b = new StringBuilder();
b.append("MergeSpec:\n");
@@ -235,8 +394,7 @@ public abstract class MergePolicy {
}
}
- /** Exception thrown if there are any problems while
- * executing a merge. */
+ /** Exception thrown if there are any problems while executing a merge. */
public static class MergeException extends RuntimeException {
private Directory dir;
@@ -259,9 +417,9 @@ public abstract class MergePolicy {
}
}
- /** Thrown when a merge was explicity aborted because
+ /** Thrown when a merge was explicitly aborted because
* {@link IndexWriter#abortMerges} was called. Normally
- * this exception is privately caught and suppresed by
+ * this exception is privately caught and suppressed by
* {@link IndexWriter}. */
public static class MergeAbortedException extends IOException {
/** Create a {@link MergeAbortedException}. */
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9540bc37/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java b/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java
index d04c2d2..e5361d5 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java
@@ -20,118 +20,107 @@ package org.apache.lucene.index;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.ThreadInterruptedException;
-import static org.apache.lucene.store.RateLimiter.SimpleRateLimiter;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.lucene.index.MergePolicy.OneMergeProgress;
+import org.apache.lucene.index.MergePolicy.OneMergeProgress.PauseReason;
/** This is the {@link RateLimiter} that {@link IndexWriter} assigns to each running merge, to
* give {@link MergeScheduler}s ionice like control.
*
- * This is similar to {@link SimpleRateLimiter}, except it's merge-private,
- * it will wake up if its rate changes while it's paused, it tracks how
- * much time it spent stopped and paused, and it supports aborting.
- *
* @lucene.internal */
public class MergeRateLimiter extends RateLimiter {
private final static int MIN_PAUSE_CHECK_MSEC = 25;
- volatile long totalBytesWritten;
+
+ private final static long MIN_PAUSE_NS = TimeUnit.MILLISECONDS.toNanos(2);
+ private final static long MAX_PAUSE_NS = TimeUnit.MILLISECONDS.toNanos(250);
+
+ private volatile double mbPerSec;
+ private volatile long minPauseCheckBytes;
- double mbPerSec;
private long lastNS;
- private long minPauseCheckBytes;
- private boolean abort;
- long totalPausedNS;
- long totalStoppedNS;
- final MergePolicy.OneMerge merge;
- /** Returned by {@link #maybePause}. */
- private static enum PauseResult {NO, STOPPED, PAUSED};
+ private AtomicLong totalBytesWritten = new AtomicLong();
- /** Sole constructor. */
- public MergeRateLimiter(MergePolicy.OneMerge merge) {
- this.merge = merge;
+ private final OneMergeProgress mergeProgress;
+ /** Sole constructor. */
+ public MergeRateLimiter(OneMergeProgress mergeProgress) {
// Initially no IO limit; use setter here so minPauseCheckBytes is set:
+ this.mergeProgress = mergeProgress;
setMBPerSec(Double.POSITIVE_INFINITY);
}
@Override
- public synchronized void setMBPerSec(double mbPerSec) {
- // 0.0 is allowed: it means the merge is paused
- if (mbPerSec < 0.0) {
- throw new IllegalArgumentException("mbPerSec must be positive; got: " + mbPerSec);
+ public void setMBPerSec(double mbPerSec) {
+ // Synchronized to make updates to mbPerSec and minPauseCheckBytes atomic.
+ synchronized (this) {
+ // 0.0 is allowed: it means the merge is paused
+ if (mbPerSec < 0.0) {
+ throw new IllegalArgumentException("mbPerSec must be positive; got: " + mbPerSec);
+ }
+ this.mbPerSec = mbPerSec;
+
+ // NOTE: Double.POSITIVE_INFINITY casts to Long.MAX_VALUE
+ this.minPauseCheckBytes = Math.min(1024*1024, (long) ((MIN_PAUSE_CHECK_MSEC / 1000.0) * mbPerSec * 1024 * 1024));
+ assert minPauseCheckBytes >= 0;
}
- this.mbPerSec = mbPerSec;
- // NOTE: Double.POSITIVE_INFINITY casts to Long.MAX_VALUE
- minPauseCheckBytes = Math.min(1024*1024, (long) ((MIN_PAUSE_CHECK_MSEC / 1000.0) * mbPerSec * 1024 * 1024));
- assert minPauseCheckBytes >= 0;
- notify();
+
+ mergeProgress.wakeup();
}
@Override
- public synchronized double getMBPerSec() {
+ public double getMBPerSec() {
return mbPerSec;
}
/** Returns total bytes written by this merge. */
public long getTotalBytesWritten() {
- return totalBytesWritten;
+ return totalBytesWritten.get();
}
@Override
public long pause(long bytes) throws MergePolicy.MergeAbortedException {
+ totalBytesWritten.addAndGet(bytes);
- totalBytesWritten += bytes;
-
- long startNS = System.nanoTime();
- long curNS = startNS;
-
- // While loop because 1) Thread.wait doesn't always sleep long
- // enough, and 2) we wake up and check again when our rate limit
+ // While loop because we may wake up and check again when our rate limit
// is changed while we were pausing:
- long pausedNS = 0;
- while (true) {
- PauseResult result = maybePause(bytes, curNS);
- if (result == PauseResult.NO) {
- // Set to curNS, not targetNS, to enforce the instant rate, not
- // the "averaaged over all history" rate:
- lastNS = curNS;
- break;
- }
- curNS = System.nanoTime();
- long ns = curNS - startNS;
- startNS = curNS;
-
- // Separately track when merge was stopped vs rate limited:
- if (result == PauseResult.STOPPED) {
- totalStoppedNS += ns;
- } else {
- assert result == PauseResult.PAUSED;
- totalPausedNS += ns;
- }
- pausedNS += ns;
+ long paused = 0;
+ long delta;
+ while ((delta = maybePause(bytes, System.nanoTime())) >= 0) {
+ // Keep waiting.
+ paused += delta;
}
- return pausedNS;
+ return paused;
}
/** Total NS merge was stopped. */
- public synchronized long getTotalStoppedNS() {
- return totalStoppedNS;
+ public long getTotalStoppedNS() {
+ return mergeProgress.getPauseTimes().get(PauseReason.STOPPED);
}
/** Total NS merge was paused to rate limit IO. */
- public synchronized long getTotalPausedNS() {
- return totalPausedNS;
+ public long getTotalPausedNS() {
+ return mergeProgress.getPauseTimes().get(PauseReason.PAUSED);
}
- /** Returns NO if no pause happened, STOPPED if pause because rate was 0.0 (merge is stopped), PAUSED if paused with a normal rate limit. */
- private synchronized PauseResult maybePause(long bytes, long curNS) throws MergePolicy.MergeAbortedException {
-
+ /**
+ * Returns the number of nanoseconds spent in a paused state or <code>-1</code>
+ * if no pause was applied. If the thread needs pausing, this method delegates
+ * to the linked {@link OneMergeProgress}.
+ */
+ private long maybePause(long bytes, long curNS) throws MergePolicy.MergeAbortedException {
// Now is a good time to abort the merge:
- checkAbort();
+ if (mergeProgress.isAborted()) {
+ throw new MergePolicy.MergeAbortedException("Merge aborted.");
+ }
- double secondsToPause = (bytes/1024./1024.) / mbPerSec;
+ double rate = mbPerSec; // read from volatile rate once.
+ double secondsToPause = (bytes/1024./1024.) / rate;
// Time we should sleep until; this is purely instantaneous
// rate (just adds seconds onto the last time we had paused to);
@@ -140,54 +129,30 @@ public class MergeRateLimiter extends RateLimiter {
long curPauseNS = targetNS - curNS;
- // NOTE: except maybe on real-time JVMs, minimum realistic
- // wait/sleep time is 1 msec; if you pass just 1 nsec the impl
- // rounds up to 1 msec, so we don't bother unless it's > 2 msec:
-
- if (curPauseNS <= 2000000) {
- return PauseResult.NO;
+ // We don't bother with thread pausing if the pause is smaller than 2 msec.
+ if (curPauseNS <= MIN_PAUSE_NS) {
+ // Set to curNS, not targetNS, to enforce the instant rate, not
+ // the "averaged over all history" rate:
+ lastNS = curNS;
+ return -1;
}
- // Defensive: sleep for at most 250 msec; the loop above will call us again if we should keep sleeping:
- if (curPauseNS > 250L*1000000) {
- curPauseNS = 250L*1000000;
+ // Defensive: don't sleep for too long; the loop above will call us again if
+ // we should keep sleeping and the rate may be adjusted in between.
+ if (curPauseNS > MAX_PAUSE_NS) {
+ curPauseNS = MAX_PAUSE_NS;
}
- int sleepMS = (int) (curPauseNS / 1000000);
- int sleepNS = (int) (curPauseNS % 1000000);
-
- double rate = mbPerSec;
-
+ long start = System.nanoTime();
try {
- // CMS can wake us up here if it changes our target rate:
- wait(sleepMS, sleepNS);
+ mergeProgress.pauseNanos(
+ curPauseNS,
+ rate == 0.0 ? PauseReason.STOPPED : PauseReason.PAUSED,
+ () -> rate == mbPerSec);
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
-
- if (rate == 0.0) {
- return PauseResult.STOPPED;
- } else {
- return PauseResult.PAUSED;
- }
- }
-
- /** Throws {@link MergePolicy.MergeAbortedException} if this merge was aborted. */
- public synchronized void checkAbort() throws MergePolicy.MergeAbortedException {
- if (abort) {
- throw new MergePolicy.MergeAbortedException("merge is aborted: " + merge.segString());
- }
- }
-
- /** Mark this merge aborted. */
- public synchronized void setAbort() {
- abort = true;
- notify();
- }
-
- /** Returns true if this merge was aborted. */
- public synchronized boolean getAbort() {
- return abort;
+ return System.nanoTime() - start;
}
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9540bc37/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java
index 65af45b..66d0870 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java
@@ -20,6 +20,9 @@ package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException;
+import org.apache.lucene.index.MergePolicy.OneMerge;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RateLimitedIndexOutput;
import org.apache.lucene.util.InfoStream;
/** <p>Expert: {@link IndexWriter} uses an instance
@@ -42,6 +45,15 @@ public abstract class MergeScheduler implements Closeable {
* */
public abstract void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException;
+ /**
+ * Wraps the incoming {@link Directory} so that we can merge-throttle it
+ * using {@link RateLimitedIndexOutput}.
+ */
+ public Directory wrapForMerge(OneMerge merge, Directory in) {
+ // A no-op by default.
+ return in;
+ }
+
/** Close this MergeScheduler. */
@Override
public abstract void close() throws IOException;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9540bc37/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java
index 1630653..e4c0136 100644
--- a/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java
+++ b/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java
@@ -16,6 +16,8 @@
*/
package org.apache.lucene.index;
+import org.apache.lucene.index.MergePolicy.OneMerge;
+import org.apache.lucene.store.Directory;
/**
* A {@link MergeScheduler} which never executes any merges. It is also a
@@ -41,6 +43,11 @@ public final class NoMergeScheduler extends MergeScheduler {
@Override
public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) {}
+
+ @Override
+ public Directory wrapForMerge(OneMerge merge, Directory in) {
+ return in;
+ }
@Override
public MergeScheduler clone() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9540bc37/lucene/core/src/test/org/apache/lucene/index/TestMergeRateLimiter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestMergeRateLimiter.java b/lucene/core/src/test/org/apache/lucene/index/TestMergeRateLimiter.java
index ef922bb..723cfbc 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestMergeRateLimiter.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestMergeRateLimiter.java
@@ -27,8 +27,8 @@ public class TestMergeRateLimiter extends LuceneTestCase {
RandomIndexWriter w = new RandomIndexWriter(random(), dir);
w.addDocument(new Document());
w.close();
- MergePolicy.OneMerge merge = new MergePolicy.OneMerge(SegmentInfos.readLatestCommit(dir).asList());
- MergeRateLimiter rateLimiter = new MergeRateLimiter(merge);
+
+ MergeRateLimiter rateLimiter = new MergeRateLimiter(new MergePolicy.OneMergeProgress());
assertEquals(Double.POSITIVE_INFINITY, rateLimiter.getMBPerSec(), 0.0);
assertTrue(rateLimiter.getMinPauseCheckBytes() > 0);
dir.close();