You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by is...@apache.org on 2017/03/12 00:18:53 UTC

[44/50] [abbrv] lucene-solr:jira/solr-6736: LUCENE-7700: Move throughput control and merge aborting out of IndexWriter's core.

LUCENE-7700: Move throughput control and merge aborting out of IndexWriter's core.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/9540bc37
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/9540bc37
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/9540bc37

Branch: refs/heads/jira/solr-6736
Commit: 9540bc37583dfd4e995b893154039fcf031dc3c3
Parents: d2bf30d
Author: Dawid Weiss <dw...@apache.org>
Authored: Fri Mar 10 10:23:29 2017 +0100
Committer: Dawid Weiss <dw...@apache.org>
Committed: Fri Mar 10 10:23:29 2017 +0100

----------------------------------------------------------------------
 lucene/CHANGES.txt                              |   6 +
 .../lucene/index/ConcurrentMergeScheduler.java  |  75 ++++++--
 .../org/apache/lucene/index/IndexWriter.java    |  94 ++++------
 .../org/apache/lucene/index/MergePolicy.java    | 184 +++++++++++++++++--
 .../apache/lucene/index/MergeRateLimiter.java   | 177 +++++++-----------
 .../org/apache/lucene/index/MergeScheduler.java |  12 ++
 .../apache/lucene/index/NoMergeScheduler.java   |   7 +
 .../lucene/index/TestMergeRateLimiter.java      |   4 +-
 8 files changed, 358 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9540bc37/lucene/CHANGES.txt
----------------------------------------------------------------------
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 4040945..b6ee4b8 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -117,6 +117,12 @@ API Changes
   instead of once all shard responses are present. (Simon Willnauer,
   Mike McCandless)
 
+* LUCENE-7700: A cleanup of merge throughput control logic. Refactored all the
+  code previously scattered throughout the IndexWriter and 
+  ConcurrentMergeScheduler into a more accessible set of public methods (see 
+  MergePolicy.OneMergeProgress, MergeScheduler.wrapForMerge and 
+  OneMerge.mergeInit). (Dawid Weiss, Mike McCandless).
+
 * LUCENE-7734: FieldType's copy constructor was widened to accept any IndexableFieldType.
   (David Smiley)
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9540bc37/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
index 0dd0a4d..6e930c4 100644
--- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
+++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
@@ -25,6 +25,11 @@ import java.util.Locale;
 import org.apache.lucene.index.MergePolicy.OneMerge;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.RateLimitedIndexOutput;
+import org.apache.lucene.store.RateLimiter;
 import org.apache.lucene.util.CollectionUtil;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.ThreadInterruptedException;
@@ -255,6 +260,36 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
     assert false: "merge thread " + currentThread + " was not found";
   }
 
+  @Override
+  public Directory wrapForMerge(OneMerge merge, Directory in) {
+    Thread mergeThread = Thread.currentThread();
+    if (!MergeThread.class.isInstance(mergeThread)) {
+      throw new AssertionError("wrapForMerge should be called from MergeThread. Current thread: "
+          + mergeThread);
+    }
+
+    // Return a wrapped Directory which has rate-limited output.
+    RateLimiter rateLimiter = ((MergeThread) mergeThread).rateLimiter;
+    return new FilterDirectory(in) {
+      @Override
+      public IndexOutput createOutput(String name, IOContext context) throws IOException {
+        ensureOpen();
+
+        // This Directory is only supposed to be used during merging,
+        // so all writes should have MERGE context, else there is a bug 
+        // somewhere that is failing to pass down the right IOContext:
+        assert context.context == IOContext.Context.MERGE: "got context=" + context.context;
+        
+        // Because rateLimiter is bound to a particular merge thread, this method should
+        // always be called from that context. Verify this.
+        assert mergeThread == Thread.currentThread() : "Not the same merge thread, current="
+          + Thread.currentThread() + ", expected=" + mergeThread;
+
+        return new RateLimitedIndexOutput(rateLimiter, in.createOutput(name, context));
+      }
+    };
+  }
+  
   /**
    * Called whenever the running merges have changed, to set merge IO limits.
    * This method sorts the merge threads by their merge size in
@@ -327,8 +362,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
         newMBPerSec = targetMBPerSec;
       }
 
-      double curMBPerSec = merge.rateLimiter.getMBPerSec();
-      
+      MergeRateLimiter rateLimiter = mergeThread.rateLimiter;
+      double curMBPerSec = rateLimiter.getMBPerSec();
+
       if (verbose()) {
         long mergeStartNS = merge.mergeStartNS;
         if (mergeStartNS == -1) {
@@ -339,11 +375,11 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
         message.append(String.format(Locale.ROOT, "merge thread %s estSize=%.1f MB (written=%.1f MB) runTime=%.1fs (stopped=%.1fs, paused=%.1fs) rate=%s\n",
                                      mergeThread.getName(),
                                      bytesToMB(merge.estimatedMergeBytes),
-                                     bytesToMB(merge.rateLimiter.totalBytesWritten),
+                                     bytesToMB(rateLimiter.getTotalBytesWritten()),
                                      nsToSec(now - mergeStartNS),
-                                     nsToSec(merge.rateLimiter.getTotalStoppedNS()),
-                                     nsToSec(merge.rateLimiter.getTotalPausedNS()),
-                                     rateToString(merge.rateLimiter.getMBPerSec())));
+                                     nsToSec(rateLimiter.getTotalStoppedNS()),
+                                     nsToSec(rateLimiter.getTotalPausedNS()),
+                                     rateToString(rateLimiter.getMBPerSec())));
 
         if (newMBPerSec != curMBPerSec) {
           if (newMBPerSec == 0.0) {
@@ -364,7 +400,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
         }
       }
 
-      merge.rateLimiter.setMBPerSec(newMBPerSec);
+      rateLimiter.setMBPerSec(newMBPerSec);
     }
     if (verbose()) {
       message(message.toString());
@@ -449,7 +485,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
     Thread currentThread = Thread.currentThread();
     int count = 0;
     for (MergeThread mergeThread : mergeThreads) {
-      if (currentThread != mergeThread && mergeThread.isAlive() && mergeThread.merge.rateLimiter.getAbort() == false) {
+      if (currentThread != mergeThread && mergeThread.isAlive() && mergeThread.merge.isAborted() == false) {
         count++;
       }
     }
@@ -497,8 +533,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
         return;
       }
 
-      updateIOThrottle(merge);
-
       boolean success = false;
       try {
         if (verbose()) {
@@ -507,14 +541,16 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
 
         // OK to spawn a new merge thread to handle this
         // merge:
-        final MergeThread merger = getMergeThread(writer, merge);
-        mergeThreads.add(merger);
+        final MergeThread newMergeThread = getMergeThread(writer, merge);
+        mergeThreads.add(newMergeThread);
+
+        updateIOThrottle(newMergeThread.merge, newMergeThread.rateLimiter);
 
         if (verbose()) {
-          message("    launch new thread [" + merger.getName() + "]");
+          message("    launch new thread [" + newMergeThread.getName() + "]");
         }
 
-        merger.start();
+        newMergeThread.start();
         updateMergeThreads();
 
         success = true;
@@ -598,16 +634,17 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
 
   /** Runs a merge thread to execute a single merge, then exits. */
   protected class MergeThread extends Thread implements Comparable<MergeThread> {
-
     final IndexWriter writer;
     final OneMerge merge;
+    final MergeRateLimiter rateLimiter;
 
     /** Sole constructor. */
     public MergeThread(IndexWriter writer, OneMerge merge) {
       this.writer = writer;
       this.merge = merge;
+      this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress());
     }
-    
+
     @Override
     public int compareTo(MergeThread other) {
       // Larger merges sort first:
@@ -616,9 +653,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
 
     @Override
     public void run() {
-
       try {
-
         if (verbose()) {
           message("  merge thread: start");
         }
@@ -715,7 +750,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
   }
 
   /** Tunes IO throttle when a new merge starts. */
-  private synchronized void updateIOThrottle(OneMerge newMerge) throws IOException {
+  private synchronized void updateIOThrottle(OneMerge newMerge, MergeRateLimiter rateLimiter) throws IOException {
     if (doAutoIOThrottle == false) {
       return;
     }
@@ -794,7 +829,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
     } else {
       rate = targetMBPerSec;
     }
-    newMerge.rateLimiter.setMBPerSec(rate);
+    rateLimiter.setMBPerSec(rate);
     targetMBPerSecChanged();
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9540bc37/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index da030ca..aa28d99 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -36,6 +36,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.codecs.Codec;
@@ -51,22 +52,18 @@ import org.apache.lucene.search.Sort;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
-import org.apache.lucene.store.FilterDirectory;
 import org.apache.lucene.store.FlushInfo;
 import org.apache.lucene.store.IOContext;
-import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.Lock;
 import org.apache.lucene.store.LockObtainFailedException;
 import org.apache.lucene.store.LockValidatingDirectoryWrapper;
 import org.apache.lucene.store.MMapDirectory;
 import org.apache.lucene.store.MergeInfo;
-import org.apache.lucene.store.RateLimitedIndexOutput;
 import org.apache.lucene.store.TrackingDirectoryWrapper;
 import org.apache.lucene.util.Accountable;
 import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.util.CloseableThreadLocal;
 import org.apache.lucene.util.Constants;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.InfoStream;
@@ -277,7 +274,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
 
   private final Directory directoryOrig;       // original user directory
   private final Directory directory;           // wrapped with additional checks
-  private final Directory mergeDirectory;      // wrapped with throttling: used for merging
   private final Analyzer analyzer;    // how to analyze text
 
   private final AtomicLong changeCount = new AtomicLong(); // increments every time a change is completed
@@ -353,8 +349,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
    *  card to make sure they can later charge you when you check out. */
   final AtomicLong pendingNumDocs = new AtomicLong();
 
-  final CloseableThreadLocal<MergeRateLimiter> rateLimiters = new CloseableThreadLocal<>();
-
   DirectoryReader getReader() throws IOException {
     return getReader(true, false);
   }
@@ -809,10 +803,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       directoryOrig = d;
       directory = new LockValidatingDirectoryWrapper(d, writeLock);
 
-      // Directory we use for merging, so we can abort running merges, and so
-      // merge schedulers can optionally rate-limit per-merge IO:
-      mergeDirectory = addMergeRateLimiters(directory);
-
       analyzer = config.getAnalyzer();
       mergeScheduler = config.getMergeScheduler();
       mergeScheduler.setInfoStream(infoStream);
@@ -2212,8 +2202,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     try {
       abortMerges();
 
-      rateLimiters.close();
-
       if (infoStream.isEnabled("IW")) {
         infoStream.message("IW", "rollback: done finish merges");
       }
@@ -2418,7 +2406,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       if (infoStream.isEnabled("IW")) {
         infoStream.message("IW", "now abort pending merge " + segString(merge.segments));
       }
-      merge.rateLimiter.setAbort();
+      merge.setAborted();
       mergeFinish(merge);
     }
     pendingMerges.clear();
@@ -2427,7 +2415,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       if (infoStream.isEnabled("IW")) {
         infoStream.message("IW", "now abort running merge " + segString(merge.segments));
       }
-      merge.rateLimiter.setAbort();
+      merge.setAborted();
     }
 
     // We wait here to make all merges stop.  It should not
@@ -2775,13 +2763,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
    * index.
    * 
    * <p>
-   * <b>NOTE:</b> this method merges all given {@link LeafReader}s in one
+   * <b>NOTE:</b> this merges all given {@link LeafReader}s in one
    * merge. If you intend to merge a large number of readers, it may be better
    * to call this method multiple times, each time with a small set of readers.
    * In principle, if you use a merge policy with a {@code mergeFactor} or
    * {@code maxMergeAtOnce} parameter, you should pass that many readers in one
    * call.
    * 
+   * <p>
+   * <b>NOTE:</b> this method does not call or make use of the {@link MergeScheduler},
+   * so any custom bandwidth throttling is at the moment ignored.
+   * 
    * @return The <a href="#sequence_number">sequence number</a>
    * for this operation
    *
@@ -2832,8 +2824,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       SegmentMerger merger = new SegmentMerger(Arrays.asList(readers), info, infoStream, trackingDir,
                                                globalFieldNumberMap, 
                                                context);
-      
-      rateLimiters.set(new MergeRateLimiter(null));
 
       if (!merger.shouldMerge()) {
         return docWriter.deleteQueue.getNextSequenceNumber();
@@ -2864,7 +2854,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       // Now create the compound file if needed
       if (useCompoundFile) {
         Collection<String> filesToDelete = infoPerCommit.files();
-        TrackingDirectoryWrapper trackingCFSDir = new TrackingDirectoryWrapper(mergeDirectory);
+        TrackingDirectoryWrapper trackingCFSDir = new TrackingDirectoryWrapper(directory);
         // TODO: unlike merge, on exception we arent sniping any trash cfs files here?
         // createCompoundFile tries to cleanup, but it might not always be able to...
         try {
@@ -3745,7 +3735,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     // deleter.refresh() call that will remove any index
     // file that current segments does not reference), we
     // abort this merge
-    if (merge.rateLimiter.getAbort()) {
+    if (merge.isAborted()) {
       if (infoStream.isEnabled("IW")) {
         infoStream.message("IW", "commitMerge: skip: it was aborted");
       }
@@ -3905,8 +3895,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
 
     boolean success = false;
 
-    rateLimiters.set(merge.rateLimiter);
-
     final long t0 = System.currentTimeMillis();
 
     final MergePolicy mergePolicy = config.getMergePolicy();
@@ -3937,7 +3925,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
             if (infoStream.isEnabled("IW")) {
               infoStream.message("IW", "hit exception during merge");
             }
-          } else if (merge.rateLimiter.getAbort() == false && (merge.maxNumSegments != UNBOUNDED_MAX_MERGE_SEGMENTS || (!closed && !closing))) {
+          } else if (!merge.isAborted() && (merge.maxNumSegments != UNBOUNDED_MAX_MERGE_SEGMENTS || (!closed && !closing))) {
             // This merge (and, generally, any change to the
             // segments) may now enable new merges, so we call
             // merge policy & update pending merges.
@@ -3951,7 +3939,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       tragicEvent(t, "merge");
     }
 
-    if (merge.info != null && merge.rateLimiter.getAbort() == false) {
+    if (merge.info != null && merge.isAborted() == false) {
       if (infoStream.isEnabled("IW")) {
         infoStream.message("IW", "merge time " + (System.currentTimeMillis()-t0) + " msec for " + merge.info.info.maxDoc() + " docs");
       }
@@ -3976,7 +3964,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     assert merge.segments.size() > 0;
 
     if (stopMerges) {
-      merge.rateLimiter.setAbort();
+      merge.setAborted();
       throw new MergePolicy.MergeAbortedException("merge is aborted: " + segString(merge.segments));
     }
 
@@ -4087,7 +4075,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
       return;
     }
 
-    if (merge.rateLimiter.getAbort()) {
+    merge.mergeInit();
+
+    if (merge.isAborted()) {
       return;
     }
 
@@ -4239,9 +4229,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
    *  but without holding synchronized lock on IndexWriter
    *  instance */
   private int mergeMiddle(MergePolicy.OneMerge merge, MergePolicy mergePolicy) throws IOException {
+    merge.checkAborted();
 
-    merge.rateLimiter.checkAbort();
-
+    Directory mergeDirectory = config.getMergeScheduler().wrapForMerge(merge, directory);
     List<SegmentCommitInfo> sourceSegments = merge.segments;
     
     IOContext context = new IOContext(merge.getStoreMergeInfo());
@@ -4339,7 +4329,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
                                                      globalFieldNumberMap, 
                                                      context);
 
-      merge.rateLimiter.checkAbort();
+      merge.checkAborted();
 
       merge.mergeStartNS = System.nanoTime();
 
@@ -4354,11 +4344,20 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
 
       if (infoStream.isEnabled("IW")) {
         if (merger.shouldMerge()) {
+          String pauseInfo = merge.getMergeProgress().getPauseTimes().entrySet()
+            .stream()
+            .filter((e) -> e.getValue() > 0)
+            .map((e) -> String.format(Locale.ROOT, "%.1f sec %s", 
+                e.getValue() / 1000000000., 
+                e.getKey().name().toLowerCase(Locale.ROOT)))
+            .collect(Collectors.joining(", "));
+          if (!pauseInfo.isEmpty()) {
+            pauseInfo = " (" + pauseInfo + ")";
+          }
+
           long t1 = System.nanoTime();
           double sec = (t1-merge.mergeStartNS)/1000000000.;
           double segmentMB = (merge.info.sizeInBytes()/1024./1024.);
-          double stoppedSec = merge.rateLimiter.getTotalStoppedNS()/1000000000.;
-          double throttleSec = merge.rateLimiter.getTotalPausedNS()/1000000000.;
           infoStream.message("IW", "merge codec=" + codec + " maxDoc=" + merge.info.info.maxDoc() + "; merged segment has " +
                              (mergeState.mergeFieldInfos.hasVectors() ? "vectors" : "no vectors") + "; " +
                              (mergeState.mergeFieldInfos.hasNorms() ? "norms" : "no norms") + "; " + 
@@ -4367,10 +4366,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
                              (mergeState.mergeFieldInfos.hasProx() ? "freqs" : "no freqs") + "; " +
                              (mergeState.mergeFieldInfos.hasPointValues() ? "points" : "no points") + "; " +
                              String.format(Locale.ROOT,
-                                           "%.1f sec (%.1f sec stopped, %.1f sec paused) to merge segment [%.2f MB, %.2f MB/sec]",
+                                           "%.1f sec%s to merge segment [%.2f MB, %.2f MB/sec]",
                                            sec,
-                                           stoppedSec,
-                                           throttleSec,
+                                           pauseInfo,
                                            segmentMB,
                                            segmentMB / sec));
         } else {
@@ -4406,7 +4404,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
           success = true;
         } catch (Throwable t) {
           synchronized(this) {
-            if (merge.rateLimiter.getAbort()) {
+            if (merge.isAborted()) {
               // This can happen if rollback is called while we were building
               // our CFS -- fall through to logic below to remove the non-CFS
               // merged files:
@@ -4439,7 +4437,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
           // registered with IFD
           deleteNewFiles(filesToRemove);
 
-          if (merge.rateLimiter.getAbort()) {
+          if (merge.isAborted()) {
             if (infoStream.isEnabled("IW")) {
               infoStream.message("IW", "abort merge after building CFS");
             }
@@ -5063,30 +5061,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     throw new IllegalArgumentException("number of documents in the index cannot exceed " + actualMaxDocs + " (current document count is " + pendingNumDocs.get() + "; added numDocs is " + addedNumDocs + ")");
   }
 
-  /** Wraps the incoming {@link Directory} so that we assign a per-thread
-   *  {@link MergeRateLimiter} to all created {@link IndexOutput}s. */
-  private Directory addMergeRateLimiters(Directory in) {
-    return new FilterDirectory(in) {
-      @Override
-      public IndexOutput createOutput(String name, IOContext context) throws IOException {
-        ensureOpen();
-
-        // Paranoia defense: if this trips we have a bug somewhere...
-        IndexWriter.this.ensureOpen(false);
-
-        // This Directory is only supposed to be used during merging,
-        // so all writes should have MERGE context, else there is a bug 
-        // somewhere that is failing to pass down the right IOContext:
-        assert context.context == IOContext.Context.MERGE: "got context=" + context.context;
-
-        MergeRateLimiter rateLimiter = rateLimiters.get();
-        assert rateLimiter != null;
-
-        return new RateLimitedIndexOutput(rateLimiter, in.createOutput(name, context));
-      }
-    };
-  }
-
   /** Returns the highest <a href="#sequence_number">sequence number</a> across
    *  all completed operations, or 0 if no operations have finished yet.  Still
    *  in-flight operations (in other threads) are not counted until they finish.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9540bc37/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
index dbf37df..d9a0ab8 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
@@ -19,12 +19,19 @@ package org.apache.lucene.index;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
 
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.MergeInfo;
-import org.apache.lucene.store.RateLimiter;
 
 /**
  * <p>Expert: a MergePolicy determines the sequence of
@@ -55,6 +62,125 @@ import org.apache.lucene.store.RateLimiter;
  * @lucene.experimental
  */
 public abstract class MergePolicy {
+  /**
+   * Progress and state for an executing merge. This class
+   * encapsulates the logic to pause and resume the merge thread
+   * or to abort the merge entirely.
+   * 
+   * @lucene.experimental */
+  public static class OneMergeProgress {
+    /** Reason for pausing the merge thread. */
+    public static enum PauseReason {
+      /** Stopped (because of throughput rate set to 0, typically). */
+      STOPPED,
+      /** Temporarily paused because of exceeded throughput rate. */
+      PAUSED,
+      /** Other reason. */
+      OTHER
+    };
+
+    private final ReentrantLock pauseLock = new ReentrantLock();
+    private final Condition pausing = pauseLock.newCondition();
+
+    /**
+     * Pause times (in nanoseconds) for each {@link PauseReason}.
+     */
+    private final EnumMap<PauseReason, AtomicLong> pauseTimesNS;
+    
+    private volatile boolean aborted;
+
+    /**
+     * This field is for sanity-check purposes only. Only the same thread that invoked
+     * {@link OneMerge#mergeInit()} is permitted to be calling 
+     * {@link #pauseNanos}. This is always verified at runtime. 
+     */
+    private Thread owner;
+
+    /** Creates a new merge progress info. */
+    public OneMergeProgress() {
+      // Place all the pause reasons in there immediately so that we can simply update values.
+      pauseTimesNS = new EnumMap<PauseReason,AtomicLong>(PauseReason.class);
+      for (PauseReason p : PauseReason.values()) {
+        pauseTimesNS.put(p, new AtomicLong());
+      }
+    }
+
+    /**
+     * Abort the merge this progress tracks at the next 
+     * possible moment.
+     */
+    public void abort() {
+      aborted = true;
+      wakeup(); // wakeup any paused merge thread.
+    }
+
+    /**
+     * Return the aborted state of this merge.
+     */
+    public boolean isAborted() {
+      return aborted;
+    }
+
+    /**
+     * Pauses the calling thread for at least <code>pauseNanos</code> nanoseconds
+     * unless the merge is aborted or the external condition returns <code>false</code>,
+     * in which case control returns immediately.
+     * 
+     * The external condition is required so that other threads can terminate the pausing immediately,
+     * before <code>pauseNanos</code> expires. We can't rely on just {@link Condition#awaitNanos(long)} alone
+     * because it can return due to spurious wakeups too.  
+     * 
+     * @param condition The pause condition that should return false if immediate return from this
+     *      method is needed. Other threads can wake up any sleeping thread by calling 
+     *      {@link #wakeup}, but it'd fall to sleep for the remainder of the requested time if this
+     *      condition 
+     */
+    public void pauseNanos(long pauseNanos, PauseReason reason, BooleanSupplier condition) throws InterruptedException {
+      if (Thread.currentThread() != owner) {
+        throw new RuntimeException("Only the merge owner thread can call pauseNanos(). This thread: "
+            + Thread.currentThread().getName() + ", owner thread: "
+            + owner);
+      }
+
+      long start = System.nanoTime();
+      AtomicLong timeUpdate = pauseTimesNS.get(reason);
+      pauseLock.lock();
+      try {
+        while (pauseNanos > 0 && !aborted && condition.getAsBoolean()) {
+          pauseNanos = pausing.awaitNanos(pauseNanos);
+        }
+      } finally {
+        pauseLock.unlock();
+        timeUpdate.addAndGet(System.nanoTime() - start);
+      }
+    }
+
+    /**
+     * Request a wakeup for any threads stalled in {@link #pauseNanos}.
+     */
+    public void wakeup() {
+      pauseLock.lock();
+      try {
+        pausing.signalAll();
+      } finally {
+        pauseLock.unlock();
+      }
+    }
+
+    /** Returns pause reasons and associated times in nanoseconds. */
+    public Map<PauseReason,Long> getPauseTimes() {
+      Set<Entry<PauseReason,AtomicLong>> entries = pauseTimesNS.entrySet();
+      return entries.stream()
+          .collect(Collectors.toMap(
+              (e) -> e.getKey(),
+              (e) -> e.getValue().get()));
+    }
+
+    final void setMergeThread(Thread owner) {
+      assert this.owner == null;
+      this.owner = owner;
+    }
+  }
 
   /** OneMerge provides the information necessary to perform
    *  an individual primitive merge operation, resulting in
@@ -64,7 +190,6 @@ public abstract class MergePolicy {
    *
    * @lucene.experimental */
   public static class OneMerge {
-
     SegmentCommitInfo info;         // used by IndexWriter
     boolean registerDone;           // used by IndexWriter
     long mergeGen;                  // used by IndexWriter
@@ -82,8 +207,10 @@ public abstract class MergePolicy {
     /** Segments to be merged. */
     public final List<SegmentCommitInfo> segments;
 
-    /** A private {@link RateLimiter} for this merge, used to rate limit writes and abort. */
-    public final MergeRateLimiter rateLimiter;
+    /**
+     * Control used to pause/stop/resume the merge thread. 
+     */
+    private final OneMergeProgress mergeProgress;
 
     volatile long mergeStartNS = -1;
 
@@ -106,9 +233,17 @@ public abstract class MergePolicy {
       }
       totalMaxDoc = count;
 
-      rateLimiter = new MergeRateLimiter(this);
+      mergeProgress = new OneMergeProgress();
     }
 
+    /** 
+     * Called by {@link IndexWriter} after the merge started and from the
+     * thread that will be executing the merge.
+     */
+    public void mergeInit() throws IOException {
+      mergeProgress.setMergeThread(Thread.currentThread());
+    }
+    
     /** Called by {@link IndexWriter} after the merge is done and all readers have been closed. */
     public void mergeFinished() throws IOException {
     }
@@ -163,7 +298,7 @@ public abstract class MergePolicy {
       if (maxNumSegments != -1) {
         b.append(" [maxNumSegments=" + maxNumSegments + "]");
       }
-      if (rateLimiter.getAbort()) {
+      if (isAborted()) {
         b.append(" [ABORTED]");
       }
       return b.toString();
@@ -194,7 +329,32 @@ public abstract class MergePolicy {
     /** Return {@link MergeInfo} describing this merge. */
     public MergeInfo getStoreMergeInfo() {
       return new MergeInfo(totalMaxDoc, estimatedMergeBytes, isExternal, maxNumSegments);
-    }    
+    }
+
+    /** Returns true if this merge was or should be aborted. */
+    public boolean isAborted() {
+      return mergeProgress.isAborted();
+    }
+
+    /** Marks this merge as aborted. The merge thread should terminate at the soonest possible moment. */
+    public void setAborted() {
+      this.mergeProgress.abort();
+    }
+
+    /** Checks if merge has been aborted and throws a merge exception if so. */
+    public void checkAborted() throws MergeAbortedException {
+      if (isAborted()) {
+        throw new MergePolicy.MergeAbortedException("merge is aborted: " + segString());
+      }
+    }
+
+    /**
+     * Returns a {@link OneMergeProgress} instance for this merge, which provides
+     * statistics of the merge threads (run time vs. sleep time) if merging is throttled.
+     */
+    public OneMergeProgress getMergeProgress() {
+      return mergeProgress;
+    }
   }
 
   /**
@@ -222,8 +382,7 @@ public abstract class MergePolicy {
       merges.add(merge);
     }
 
-    /** Returns a description of the merges in this
-    *  specification. */
+    /** Returns a description of the merges in this specification. */
     public String segString(Directory dir) {
       StringBuilder b = new StringBuilder();
       b.append("MergeSpec:\n");
@@ -235,8 +394,7 @@ public abstract class MergePolicy {
     }
   }
 
-  /** Exception thrown if there are any problems while
-   *  executing a merge. */
+  /** Exception thrown if there are any problems while executing a merge. */
   public static class MergeException extends RuntimeException {
     private Directory dir;
 
@@ -259,9 +417,9 @@ public abstract class MergePolicy {
     }
   }
 
-  /** Thrown when a merge was explicity aborted because
+  /** Thrown when a merge was explicitly aborted because
    *  {@link IndexWriter#abortMerges} was called.  Normally
-   *  this exception is privately caught and suppresed by
+   *  this exception is privately caught and suppressed by
    *  {@link IndexWriter}. */
   public static class MergeAbortedException extends IOException {
     /** Create a {@link MergeAbortedException}. */

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9540bc37/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java b/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java
index d04c2d2..e5361d5 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java
@@ -20,118 +20,107 @@ package org.apache.lucene.index;
 import org.apache.lucene.store.RateLimiter;
 import org.apache.lucene.util.ThreadInterruptedException;
 
-import static org.apache.lucene.store.RateLimiter.SimpleRateLimiter;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.lucene.index.MergePolicy.OneMergeProgress;
+import org.apache.lucene.index.MergePolicy.OneMergeProgress.PauseReason;
 
 /** This is the {@link RateLimiter} that {@link IndexWriter} assigns to each running merge, to 
  *  give {@link MergeScheduler}s ionice like control.
  *
- *  This is similar to {@link SimpleRateLimiter}, except it's merge-private,
- *  it will wake up if its rate changes while it's paused, it tracks how
- *  much time it spent stopped and paused, and it supports aborting.
- *
  *  @lucene.internal */
 
 public class MergeRateLimiter extends RateLimiter {
 
   private final static int MIN_PAUSE_CHECK_MSEC = 25;
-  volatile long totalBytesWritten;
+  
+  private final static long MIN_PAUSE_NS = TimeUnit.MILLISECONDS.toNanos(2);
+  private final static long MAX_PAUSE_NS = TimeUnit.MILLISECONDS.toNanos(250);
+
+  private volatile double mbPerSec;
+  private volatile long minPauseCheckBytes;
 
-  double mbPerSec;
   private long lastNS;
-  private long minPauseCheckBytes;
-  private boolean abort;
-  long totalPausedNS;
-  long totalStoppedNS;
-  final MergePolicy.OneMerge merge;
 
-  /** Returned by {@link #maybePause}. */
-  private static enum PauseResult {NO, STOPPED, PAUSED};
+  private AtomicLong totalBytesWritten = new AtomicLong();
 
-  /** Sole constructor. */
-  public MergeRateLimiter(MergePolicy.OneMerge merge) {
-    this.merge = merge;
+  private final OneMergeProgress mergeProgress;
 
+  /** Sole constructor. */
+  public MergeRateLimiter(OneMergeProgress mergeProgress) {
     // Initially no IO limit; use setter here so minPauseCheckBytes is set:
+    this.mergeProgress = mergeProgress;
     setMBPerSec(Double.POSITIVE_INFINITY);
   }
 
   @Override
-  public synchronized void setMBPerSec(double mbPerSec) {
-    // 0.0 is allowed: it means the merge is paused
-    if (mbPerSec < 0.0) {
-      throw new IllegalArgumentException("mbPerSec must be positive; got: " + mbPerSec);
+  public void setMBPerSec(double mbPerSec) {
+    // Synchronized to make updates to mbPerSec and minPauseCheckBytes atomic. 
+    synchronized (this) {
+      // 0.0 is allowed: it means the merge is paused
+      if (mbPerSec < 0.0) {
+        throw new IllegalArgumentException("mbPerSec must be positive; got: " + mbPerSec);
+      }
+      this.mbPerSec = mbPerSec;
+  
+      // NOTE: Double.POSITIVE_INFINITY casts to Long.MAX_VALUE
+      this.minPauseCheckBytes = Math.min(1024*1024, (long) ((MIN_PAUSE_CHECK_MSEC / 1000.0) * mbPerSec * 1024 * 1024));
+      assert minPauseCheckBytes >= 0;
     }
-    this.mbPerSec = mbPerSec;
-    // NOTE: Double.POSITIVE_INFINITY casts to Long.MAX_VALUE
-    minPauseCheckBytes = Math.min(1024*1024, (long) ((MIN_PAUSE_CHECK_MSEC / 1000.0) * mbPerSec * 1024 * 1024));
-    assert minPauseCheckBytes >= 0;
-    notify();
+
+    mergeProgress.wakeup();
   }
 
   @Override
-  public synchronized double getMBPerSec() {
+  public double getMBPerSec() {
     return mbPerSec;
   }
 
   /** Returns total bytes written by this merge. */
   public long getTotalBytesWritten() {
-    return totalBytesWritten;
+    return totalBytesWritten.get();
   }
 
   @Override
   public long pause(long bytes) throws MergePolicy.MergeAbortedException {
+    totalBytesWritten.addAndGet(bytes);
 
-    totalBytesWritten += bytes;
-
-    long startNS = System.nanoTime();
-    long curNS = startNS;
-
-    // While loop because 1) Thread.wait doesn't always sleep long
-    // enough, and 2) we wake up and check again when our rate limit
+    // While loop because we may wake up and check again when our rate limit
     // is changed while we were pausing:
-    long pausedNS = 0;
-    while (true) {
-      PauseResult result = maybePause(bytes, curNS);
-      if (result == PauseResult.NO) {
-        // Set to curNS, not targetNS, to enforce the instant rate, not
-        // the "averaaged over all history" rate:
-        lastNS = curNS;
-        break;
-      }
-      curNS = System.nanoTime();
-      long ns = curNS - startNS;
-      startNS = curNS;
-
-      // Separately track when merge was stopped vs rate limited:
-      if (result == PauseResult.STOPPED) {
-        totalStoppedNS += ns;
-      } else {
-        assert result == PauseResult.PAUSED;
-        totalPausedNS += ns;
-      }
-      pausedNS += ns;
+    long paused = 0;
+    long delta;
+    while ((delta = maybePause(bytes, System.nanoTime())) >= 0) {
+      // Keep waiting.
+      paused += delta;
     }
 
-    return pausedNS;
+    return paused;
   }
 
   /** Total NS merge was stopped. */
-  public synchronized long getTotalStoppedNS() {
-    return totalStoppedNS;
+  public long getTotalStoppedNS() {
+    return mergeProgress.getPauseTimes().get(PauseReason.STOPPED);
   } 
 
   /** Total NS merge was paused to rate limit IO. */
-  public synchronized long getTotalPausedNS() {
-    return totalPausedNS;
+  public long getTotalPausedNS() {
+    return mergeProgress.getPauseTimes().get(PauseReason.PAUSED);
   } 
 
-  /** Returns NO if no pause happened, STOPPED if pause because rate was 0.0 (merge is stopped), PAUSED if paused with a normal rate limit. */
-  private synchronized PauseResult maybePause(long bytes, long curNS) throws MergePolicy.MergeAbortedException {
-
+  /** 
+   * Returns the number of nanoseconds spent in a paused state or <code>-1</code>
+   * if no pause was applied. If the thread needs pausing, this method delegates 
+   * to the linked {@link OneMergeProgress}. 
+   */
+  private long maybePause(long bytes, long curNS) throws MergePolicy.MergeAbortedException {
     // Now is a good time to abort the merge:
-    checkAbort();
+    if (mergeProgress.isAborted()) {
+      throw new MergePolicy.MergeAbortedException("Merge aborted.");
+    }
 
-    double secondsToPause = (bytes/1024./1024.) / mbPerSec;
+    double rate = mbPerSec; // read from volatile rate once.
+    double secondsToPause = (bytes/1024./1024.) / rate;
 
     // Time we should sleep until; this is purely instantaneous
     // rate (just adds seconds onto the last time we had paused to);
@@ -140,54 +129,30 @@ public class MergeRateLimiter extends RateLimiter {
 
     long curPauseNS = targetNS - curNS;
 
-    // NOTE: except maybe on real-time JVMs, minimum realistic
-    // wait/sleep time is 1 msec; if you pass just 1 nsec the impl
-    // rounds up to 1 msec, so we don't bother unless it's > 2 msec:
-
-    if (curPauseNS <= 2000000) {
-      return PauseResult.NO;
+    // We don't bother with thread pausing if the pause is smaller than 2 msec.
+    if (curPauseNS <= MIN_PAUSE_NS) {
+      // Set to curNS, not targetNS, to enforce the instant rate, not
+      // the "averaged over all history" rate:
+      lastNS = curNS;
+      return -1;
     }
 
-    // Defensive: sleep for at most 250 msec; the loop above will call us again if we should keep sleeping:
-    if (curPauseNS > 250L*1000000) {
-      curPauseNS = 250L*1000000;
+    // Defensive: don't sleep for too long; the loop above will call us again if
+    // we should keep sleeping and the rate may be adjusted in between.
+    if (curPauseNS > MAX_PAUSE_NS) {
+      curPauseNS = MAX_PAUSE_NS;
     }
 
-    int sleepMS = (int) (curPauseNS / 1000000);
-    int sleepNS = (int) (curPauseNS % 1000000);
-
-    double rate = mbPerSec;
-
+    long start = System.nanoTime();
     try {
-      // CMS can wake us up here if it changes our target rate:
-      wait(sleepMS, sleepNS);
+      mergeProgress.pauseNanos(
+          curPauseNS, 
+          rate == 0.0 ? PauseReason.STOPPED : PauseReason.PAUSED,
+          () -> rate == mbPerSec);
     } catch (InterruptedException ie) {
       throw new ThreadInterruptedException(ie);
     }
-
-    if (rate == 0.0) {
-      return PauseResult.STOPPED;
-    } else {
-      return PauseResult.PAUSED;
-    }
-  }
-
-  /** Throws {@link MergePolicy.MergeAbortedException} if this merge was aborted. */
-  public synchronized void checkAbort() throws MergePolicy.MergeAbortedException {
-    if (abort) {
-      throw new MergePolicy.MergeAbortedException("merge is aborted: " + merge.segString());
-    }
-  }
-
-  /** Mark this merge aborted. */
-  public synchronized void setAbort() {
-    abort = true;
-    notify();
-  }
-
-  /** Returns true if this merge was aborted. */
-  public synchronized boolean getAbort() {
-    return abort;
+    return System.nanoTime() - start;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9540bc37/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java
index 65af45b..66d0870 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java
@@ -20,6 +20,9 @@ package org.apache.lucene.index;
 import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.lucene.index.MergePolicy.OneMerge;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RateLimitedIndexOutput;
 import org.apache.lucene.util.InfoStream;
 
 /** <p>Expert: {@link IndexWriter} uses an instance
@@ -42,6 +45,15 @@ public abstract class MergeScheduler implements Closeable {
    * */
   public abstract void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException;
 
+  /** 
+   * Wraps the incoming {@link Directory} so that we can merge-throttle it
+   * using {@link RateLimitedIndexOutput}. 
+   */
+  public Directory wrapForMerge(OneMerge merge, Directory in) {
+    // A no-op by default.
+    return in;
+  }
+
   /** Close this MergeScheduler. */
   @Override
   public abstract void close() throws IOException;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9540bc37/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java
index 1630653..e4c0136 100644
--- a/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java
+++ b/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java
@@ -16,6 +16,8 @@
  */
 package org.apache.lucene.index;
 
+import org.apache.lucene.index.MergePolicy.OneMerge;
+import org.apache.lucene.store.Directory;
 
 /**
  * A {@link MergeScheduler} which never executes any merges. It is also a
@@ -41,6 +43,11 @@ public final class NoMergeScheduler extends MergeScheduler {
 
   @Override
   public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) {}
+  
+  @Override
+  public Directory wrapForMerge(OneMerge merge, Directory in) {
+    return in;
+  }
 
   @Override
   public MergeScheduler clone() {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9540bc37/lucene/core/src/test/org/apache/lucene/index/TestMergeRateLimiter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestMergeRateLimiter.java b/lucene/core/src/test/org/apache/lucene/index/TestMergeRateLimiter.java
index ef922bb..723cfbc 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestMergeRateLimiter.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestMergeRateLimiter.java
@@ -27,8 +27,8 @@ public class TestMergeRateLimiter extends LuceneTestCase {
     RandomIndexWriter w = new RandomIndexWriter(random(), dir);
     w.addDocument(new Document());
     w.close();
-    MergePolicy.OneMerge merge = new MergePolicy.OneMerge(SegmentInfos.readLatestCommit(dir).asList());
-    MergeRateLimiter rateLimiter = new MergeRateLimiter(merge);
+
+    MergeRateLimiter rateLimiter = new MergeRateLimiter(new MergePolicy.OneMergeProgress());
     assertEquals(Double.POSITIVE_INFINITY, rateLimiter.getMBPerSec(), 0.0);
     assertTrue(rateLimiter.getMinPauseCheckBytes() > 0);
     dir.close();