You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2015/01/05 15:28:29 UTC

svn commit: r1649532 [1/2] - in /lucene/dev/trunk: lucene/ lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/ lucene/core/src/java/org/apache/lucene/codecs/ lucene/core/src/java/org/apache/lucene/codecs/compressing/ lucene/core/src/java/org/ap...

Author: mikemccand
Date: Mon Jan  5 14:28:28 2015
New Revision: 1649532

URL: http://svn.apache.org/r1649532
Log:
LUCENE-6119: CMS dynamically rate limits IO writes of each merge depending on incoming merge rate

Added:
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java   (with props)
Removed:
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimitedDirectoryWrapper.java
    lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/store/TestRateLimitedDirectoryWrapper.java
Modified:
    lucene/dev/trunk/lucene/CHANGES.txt
    lucene/dev/trunk/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextCompoundFormat.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/CompoundFormat.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/StoredFieldsWriter.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/TermVectorsWriter.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingTermVectorsWriter.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/lucene50/Lucene50CompoundFormat.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsAndPositionsEnum.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsEnum.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergeState.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/FilterDirectory.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/util/PrintStreamInfoStream.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/util/StringHelper.java
    lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/TestMergeSchedulerExternal.java
    lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
    lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDoc.java
    lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexFileDeleter.java
    lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
    lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
    lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java
    lucene/dev/trunk/lucene/sandbox/src/test/org/apache/lucene/codecs/idversion/TestIDVersionPostingsFormat.java
    lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/codecs/cranky/CrankyCompoundFormat.java
    lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/index/BaseCompoundFormatTestCase.java
    lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/index/SuppressingConcurrentMergeScheduler.java
    lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
    lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/StandardDirectoryFactory.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/BasicFunctionalityTest.java
    lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/core/MockDirectoryFactory.java
    lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/core/MockFSDirectoryFactory.java

Modified: lucene/dev/trunk/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/CHANGES.txt?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/CHANGES.txt (original)
+++ lucene/dev/trunk/lucene/CHANGES.txt Mon Jan  5 14:28:28 2015
@@ -130,6 +130,10 @@ New Features
 * LUCENE-5914: Add an option to Lucene50Codec to support either BEST_SPEED
   or BEST_COMPRESSION for stored fields. (Adrien Grand, Robert Muir)
 
+* LUCENE-6119: Add auto-IO-throttling to ConcurrentMergeScheduler, to
+  rate limit IO writes for each merge depending on incoming merge
+  rate.  (Mike McCandless)
+
 Optimizations
 
 * LUCENE-5960: Use a more efficient bitset, not a Set<Integer>, to

Modified: lucene/dev/trunk/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextCompoundFormat.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextCompoundFormat.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextCompoundFormat.java (original)
+++ lucene/dev/trunk/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextCompoundFormat.java Mon Jan  5 14:28:28 2015
@@ -29,7 +29,6 @@ import java.util.Locale;
 
 import org.apache.lucene.codecs.CompoundFormat;
 import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.MergeState.CheckAbort;
 import org.apache.lucene.index.IndexFileNames;
 import org.apache.lucene.index.SegmentInfo;
 import org.apache.lucene.store.Directory;
@@ -157,7 +156,7 @@ public class SimpleTextCompoundFormat ex
   }
 
   @Override
-  public void write(Directory dir, SegmentInfo si, Collection<String> files, CheckAbort checkAbort, IOContext context) throws IOException {
+  public void write(Directory dir, SegmentInfo si, Collection<String> files, IOContext context) throws IOException {
     String dataFile = IndexFileNames.segmentFileName(si.name, "", DATA_EXTENSION);
     
     int numFiles = files.size();
@@ -181,8 +180,6 @@ public class SimpleTextCompoundFormat ex
           out.copyBytes(in, in.length());
         }
         endOffsets[i] = out.getFilePointer();
-        
-        checkAbort.work(endOffsets[i] - startOffsets[i]);
       }
       
       long tocPos = out.getFilePointer();

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/CompoundFormat.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/CompoundFormat.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/CompoundFormat.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/CompoundFormat.java Mon Jan  5 14:28:28 2015
@@ -20,7 +20,6 @@ package org.apache.lucene.codecs;
 import java.io.IOException;
 import java.util.Collection;
 
-import org.apache.lucene.index.MergeState.CheckAbort;
 import org.apache.lucene.index.SegmentInfo;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
@@ -47,8 +46,7 @@ public abstract class CompoundFormat {
   /**
    * Packs the provided files into a compound format.
    */
-  // TODO: get checkAbort out of here, and everywhere, and have iw do it at a higher level
-  public abstract void write(Directory dir, SegmentInfo si, Collection<String> files, CheckAbort checkAbort, IOContext context) throws IOException;
+  public abstract void write(Directory dir, SegmentInfo si, Collection<String> files, IOContext context) throws IOException;
 
   /**
    * Returns the compound file names used by this segment.

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/StoredFieldsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/StoredFieldsWriter.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/StoredFieldsWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/StoredFieldsWriter.java Mon Jan  5 14:28:28 2015
@@ -96,7 +96,6 @@ public abstract class StoredFieldsWriter
         storedFieldsReader.visitDocument(docID, visitor);
         finishDocument();
         docCount++;
-        mergeState.checkAbort.work(300);
       }
     }
     finish(mergeState.mergeFieldInfos, docCount);

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/TermVectorsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/TermVectorsWriter.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/TermVectorsWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/TermVectorsWriter.java Mon Jan  5 14:28:28 2015
@@ -196,7 +196,6 @@ public abstract class TermVectorsWriter
         }
         addAllDocVectors(vectors, mergeState);
         docCount++;
-        mergeState.checkAbort.work(300);
       }
     }
     finish(mergeState.mergeFieldInfos, docCount);

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java Mon Jan  5 14:28:28 2015
@@ -506,7 +506,6 @@ public final class CompressingStoredFiel
           storedFieldsReader.visitDocument(docID, visitor);
           finishDocument();
           ++docCount;
-          mergeState.checkAbort.work(300);
         }
       } else {
         // optimized merge, we copy serialized (but decompressed) bytes directly
@@ -522,7 +521,6 @@ public final class CompressingStoredFiel
           numStoredFieldsInDoc = doc.numStoredFields;
           finishDocument();
           ++docCount;
-          mergeState.checkAbort.work(300);
         }
       }
     }

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingTermVectorsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingTermVectorsWriter.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingTermVectorsWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingTermVectorsWriter.java Mon Jan  5 14:28:28 2015
@@ -751,7 +751,6 @@ public final class CompressingTermVector
           }
           addAllDocVectors(vectors, mergeState);
           ++docCount;
-          mergeState.checkAbort.work(300);
         }
       } else {
         final CompressingStoredFieldsIndexReader index = matchingVectorsReader.getIndex();
@@ -781,7 +780,6 @@ public final class CompressingTermVector
               this.vectorsStream.copyBytes(vectorsStream, chunkLength);
               docCount += chunkDocs;
               this.numDocs += chunkDocs;
-              mergeState.checkAbort.work(300 * chunkDocs);
               i = nextLiveDoc(docBase + chunkDocs, liveDocs, maxDoc);
             } else {
               for (; i < docBase + chunkDocs; i = nextLiveDoc(i + 1, liveDocs, maxDoc)) {
@@ -793,7 +791,6 @@ public final class CompressingTermVector
                 }
                 addAllDocVectors(vectors, mergeState);
                 ++docCount;
-                mergeState.checkAbort.work(300);
               }
             }
           } else {
@@ -805,7 +802,6 @@ public final class CompressingTermVector
             }
             addAllDocVectors(vectors, mergeState);
             ++docCount;
-            mergeState.checkAbort.work(300);
             i = nextLiveDoc(i + 1, liveDocs, maxDoc);
           }
         }

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/lucene50/Lucene50CompoundFormat.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/lucene50/Lucene50CompoundFormat.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/lucene50/Lucene50CompoundFormat.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/lucene50/Lucene50CompoundFormat.java Mon Jan  5 14:28:28 2015
@@ -23,7 +23,6 @@ import java.util.Collection;
 import org.apache.lucene.codecs.CodecUtil;
 import org.apache.lucene.codecs.CompoundFormat;
 import org.apache.lucene.index.IndexFileNames;
-import org.apache.lucene.index.MergeState.CheckAbort;
 import org.apache.lucene.index.SegmentInfo;
 import org.apache.lucene.store.DataOutput;
 import org.apache.lucene.store.Directory;
@@ -73,7 +72,7 @@ public final class Lucene50CompoundForma
   }
 
   @Override
-  public void write(Directory dir, SegmentInfo si, Collection<String> files, CheckAbort checkAbort, IOContext context) throws IOException {
+  public void write(Directory dir, SegmentInfo si, Collection<String> files, IOContext context) throws IOException {
     String dataFile = IndexFileNames.segmentFileName(si.name, "", DATA_EXTENSION);
     String entriesFile = IndexFileNames.segmentFileName(si.name, "", ENTRIES_EXTENSION);
     
@@ -99,8 +98,6 @@ public final class Lucene50CompoundForma
         entries.writeString(IndexFileNames.stripSegmentName(file));
         entries.writeLong(startOffset);
         entries.writeLong(length);
-        
-        checkAbort.work(length);
       }
       
       CodecUtil.writeFooter(data);

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java Mon Jan  5 14:28:28 2015
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Locale;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -173,16 +174,19 @@ class BufferedUpdatesStream implements A
     Collections.sort(infos2, sortSegInfoByDelGen);
 
     CoalescedUpdates coalescedDeletes = null;
-    boolean anyNewDeletes = false;
 
     int infosIDX = infos2.size()-1;
     int delIDX = updates.size()-1;
 
+    long totDelCount = 0;
+    long totTermVisitedCount = 0;
+
     List<SegmentCommitInfo> allDeleted = null;
 
     while (infosIDX >= 0) {
       //System.out.println("BD: cycle delIDX=" + delIDX + " infoIDX=" + infosIDX);
 
+      final long segStartNS = System.nanoTime();
       final FrozenBufferedUpdates packet = delIDX >= 0 ? updates.get(delIDX) : null;
       final SegmentCommitInfo info = infos2.get(infosIDX);
       final long segGen = info.getBufferedDeletesGen();
@@ -213,12 +217,14 @@ class BufferedUpdatesStream implements A
         final ReadersAndUpdates rld = readerPool.get(info, true);
         final SegmentReader reader = rld.getReader(IOContext.READ);
         int delCount = 0;
+        long termVisitedCount = 0;
         final boolean segAllDeletes;
         try {
           final DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container();
           if (coalescedDeletes != null) {
-            //System.out.println("    del coalesced");
-            delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader);
+            TermDeleteCounts counts = applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader);
+            delCount += counts.delCount;
+            termVisitedCount += counts.termVisitedCount;
             delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader);
             applyDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, dvUpdates);
             applyDocValuesUpdates(coalescedDeletes.binaryDVUpdates, rld, reader, dvUpdates);
@@ -239,7 +245,8 @@ class BufferedUpdatesStream implements A
           rld.release(reader);
           readerPool.release(rld);
         }
-        anyNewDeletes |= delCount > 0;
+        totDelCount += delCount;
+        totTermVisitedCount += termVisitedCount;
 
         if (segAllDeletes) {
           if (allDeleted == null) {
@@ -249,7 +256,7 @@ class BufferedUpdatesStream implements A
         }
 
         if (infoStream.isEnabled("BD")) {
-          infoStream.message("BD", "seg=" + info + " segGen=" + segGen + " segDeletes=[" + packet + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] newDelCount=" + delCount + (segAllDeletes ? " 100% deleted" : ""));
+          infoStream.message("BD", String.format(Locale.ROOT, "%.3fs", ((System.nanoTime() - segStartNS)/1000000000.0)) + " seg=" + info + " segGen=" + segGen + " segDeletes=[" + packet + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] newDelCount=" + delCount + " termVisitedCount=" + termVisitedCount + (segAllDeletes ? " 100% deleted" : ""));
         }
 
         if (coalescedDeletes == null) {
@@ -274,9 +281,12 @@ class BufferedUpdatesStream implements A
           final ReadersAndUpdates rld = readerPool.get(info, true);
           final SegmentReader reader = rld.getReader(IOContext.READ);
           int delCount = 0;
+          long termVisitedCount = 0;
           final boolean segAllDeletes;
           try {
-            delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader);
+            TermDeleteCounts counts = applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader);
+            delCount += counts.delCount;
+            termVisitedCount += counts.termVisitedCount;
             delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader);
             DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container();
             applyDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, dvUpdates);
@@ -291,7 +301,9 @@ class BufferedUpdatesStream implements A
             rld.release(reader);
             readerPool.release(rld);
           }
-          anyNewDeletes |= delCount > 0;
+
+          totDelCount += delCount;
+          totTermVisitedCount += termVisitedCount;
 
           if (segAllDeletes) {
             if (allDeleted == null) {
@@ -301,7 +313,7 @@ class BufferedUpdatesStream implements A
           }
 
           if (infoStream.isEnabled("BD")) {
-            infoStream.message("BD", "seg=" + info + " segGen=" + segGen + " coalesced deletes=[" + coalescedDeletes + "] newDelCount=" + delCount + (segAllDeletes ? " 100% deleted" : ""));
+            infoStream.message("BD", String.format(Locale.ROOT, "%.3fs", ((System.nanoTime() - segStartNS)/1000000000.0)) + " seg=" + info + " segGen=" + segGen + " coalesced deletes=[" + coalescedDeletes + "] newDelCount=" + delCount + " termVisitedCount=" + termVisitedCount + (segAllDeletes ? " 100% deleted" : ""));
           }
         }
         info.setBufferedDeletesGen(gen);
@@ -312,11 +324,11 @@ class BufferedUpdatesStream implements A
 
     assert checkDeleteStats();
     if (infoStream.isEnabled("BD")) {
-      infoStream.message("BD", "applyDeletes took " + (System.currentTimeMillis()-t0) + " msec");
+      infoStream.message("BD", "applyDeletes took " + (System.currentTimeMillis()-t0) + " msec for " + infos.size() + " segments, " + totDelCount + " deleted docs, " + totTermVisitedCount + " visited terms");
     }
     // assert infos != segmentInfos || !any() : "infos=" + infos + " segmentInfos=" + segmentInfos + " any=" + any;
 
-    return new ApplyDeletesResult(anyNewDeletes, gen, allDeleted);
+    return new ApplyDeletesResult(totDelCount > 0, gen, allDeleted);
   }
 
   synchronized long getNextGen() {
@@ -374,9 +386,23 @@ class BufferedUpdatesStream implements A
     }
   }
 
+  private static class TermDeleteCounts {
+    /** How many documents were actually deleted. */
+    public final int delCount;
+
+    /** How many terms we checked. */
+    public final long termVisitedCount;
+
+    public TermDeleteCounts(int delCount, long termVisitedCount) {
+      this.delCount = delCount;
+      this.termVisitedCount = termVisitedCount;
+    }
+  }
+
   // Delete by Term
-  private synchronized long applyTermDeletes(Iterable<Term> termsIter, ReadersAndUpdates rld, SegmentReader reader) throws IOException {
-    long delCount = 0;
+  private synchronized TermDeleteCounts applyTermDeletes(Iterable<Term> termsIter, ReadersAndUpdates rld, SegmentReader reader) throws IOException {
+    int delCount = 0;
+    long termVisitedCount = 0;
     Fields fields = reader.fields();
 
     TermsEnum termsEnum = null;
@@ -388,8 +414,10 @@ class BufferedUpdatesStream implements A
 
     boolean any = false;
 
-    //System.out.println(Thread.currentThread().getName() + " del terms reader=" + reader);
+    long ns = System.nanoTime();
+
     for (Term term : termsIter) {
+      termVisitedCount++;
       // Since we visit terms sorted, we gain performance
       // by re-using the same TermsEnum and seeking only
       // forwards
@@ -440,7 +468,7 @@ class BufferedUpdatesStream implements A
       }
     }
 
-    return delCount;
+    return new TermDeleteCounts(delCount, termVisitedCount);
   }
 
   // DocValues updates

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java Mon Jan  5 14:28:28 2015
@@ -18,15 +18,15 @@ package org.apache.lucene.index;
  */
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.lucene.search.Query;
 import org.apache.lucene.index.BufferedUpdatesStream.QueryAndLimit;
 import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
 import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
+import org.apache.lucene.search.Query;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.MergedIterator;
 
@@ -35,16 +35,19 @@ class CoalescedUpdates {
   final List<Iterable<Term>> iterables = new ArrayList<>();
   final List<NumericDocValuesUpdate> numericDVUpdates = new ArrayList<>();
   final List<BinaryDocValuesUpdate> binaryDVUpdates = new ArrayList<>();
+  int totalTermCount;
   
   @Override
   public String toString() {
     // note: we could add/collect more debugging information
-    return "CoalescedUpdates(termSets=" + iterables.size() + ",queries="
-        + queries.size() + ",numericDVUpdates=" + numericDVUpdates.size()
-        + ",binaryDVUpdates=" + binaryDVUpdates.size() + ")";
+    return "CoalescedUpdates(termSets=" + iterables.size()
+      + ",totalTermCount=" + totalTermCount
+      + ",queries=" + queries.size() + ",numericDVUpdates=" + numericDVUpdates.size()
+      + ",binaryDVUpdates=" + binaryDVUpdates.size() + ")";
   }
 
   void update(FrozenBufferedUpdates in) {
+    totalTermCount += in.termCount;
     iterables.add(in.termsIterable());
 
     for (int queryIdx = 0; queryIdx < in.queries.length; queryIdx++) {

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java Mon Jan  5 14:28:28 2015
@@ -19,9 +19,11 @@ package org.apache.lucene.index;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.List;
+import java.util.Locale;
 
+import org.apache.lucene.index.MergePolicy.OneMerge;
+import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.CollectionUtil;
 import org.apache.lucene.util.IOUtils;
@@ -54,18 +56,17 @@ import org.apache.lucene.util.ThreadInte
  *  settings for spinning or solid state disks for such
  *  operating systems, use {@link #setDefaultMaxMergesAndThreads(boolean)}.
  */ 
+
 public class ConcurrentMergeScheduler extends MergeScheduler {
 
   /** Dynamic default for {@code maxThreadCount} and {@code maxMergeCount},
    *  used to detect whether the index is backed by an SSD or rotational disk and
    *  set {@code maxThreadCount} accordingly.  If it's an SSD,
-   *  {@code maxThreadCount} is set to {@code max(1, min(3, cpuCoreCount/2))},
+   *  {@code maxThreadCount} is set to {@code max(1, min(4, cpuCoreCount/2))},
    *  otherwise 1.  Note that detection only currently works on
    *  Linux; other platforms will assume the index is not on an SSD. */
   public static final int AUTO_DETECT_MERGES_AND_THREADS = -1;
 
-  private int mergeThreadPriority = -1;
-
   /** List of currently active {@link MergeThread}s. */
   protected final List<MergeThread> mergeThreads = new ArrayList<>();
   
@@ -81,16 +82,27 @@ public class ConcurrentMergeScheduler ex
   // throttling the incoming threads
   private int maxMergeCount = AUTO_DETECT_MERGES_AND_THREADS;
 
-  /** {@link Directory} that holds the index. */
-  protected Directory dir;
-
-  /** {@link IndexWriter} that owns this instance. */
-  protected IndexWriter writer;
-
   /** How many {@link MergeThread}s have kicked off (this is use
    *  to name them). */
   protected int mergeThreadCount;
 
+  /** Floor for IO write rate limit (we will never go any lower than this) */
+  private static final double MIN_MERGE_MB_PER_SEC = 5.0;
+
+  /** Initial value for IO write rate limit when doAutoIOThrottle is true */
+  private static final double START_MB_PER_SEC = 20.0;
+
+  /** Merges below this size are not counted in the maxThreadCount, i.e. they can freely run in their own thread (up until maxMergeCount). */
+  private static final double MIN_BIG_MERGE_MB = 50.0;
+
+  /** Current IO writes throttle rate */
+  protected double targetMBPerSec = START_MB_PER_SEC;
+
+  /** true if we should rate-limit writes for each merge */
+  private boolean doAutoIOThrottle = true;
+
+  private double forceMergeMBPerSec = Double.POSITIVE_INFINITY;
+
   /** Sole constructor, with all settings set to default
    *  values. */
   public ConcurrentMergeScheduler() {
@@ -142,10 +154,48 @@ public class ConcurrentMergeScheduler ex
   public synchronized void setDefaultMaxMergesAndThreads(boolean spins) {
     if (spins) {
       maxThreadCount = 1;
-      maxMergeCount = 2;
+      maxMergeCount = 6;
+    } else {
+      maxThreadCount = Math.max(1, Math.min(4, Runtime.getRuntime().availableProcessors()/2));
+      maxMergeCount = maxThreadCount+5;
+    }
+  }
+
+  /** Set the per-merge IO throttle rate for forced merges (default: {@code Double.POSITIVE_INFINITY}). */
+  public synchronized void setForceMergeMBPerSec(double v) {
+    forceMergeMBPerSec = v;
+    updateMergeThreads();
+  }
+
+  /** Get the per-merge IO throttle rate for forced merges. */
+  public synchronized double getForceMergeMBPerSec() {
+    return forceMergeMBPerSec;
+  }
+
+  /** Turn on dynamic IO throttling, to adaptively rate limit writes
+   *  bytes/sec to the minimal rate necessary so merges do not fall behind.
+   *  By default this is enabled. */
+  public synchronized void enableAutoIOThrottle() {
+    doAutoIOThrottle = true;
+    targetMBPerSec = START_MB_PER_SEC;
+    updateMergeThreads();
+  }
+
+  /** Turn off auto IO throttling.
+   *
+   * @see #enableAutoIOThrottle */
+  public synchronized void disableAutoIOThrottle() {
+    doAutoIOThrottle = false;
+    updateMergeThreads();
+  }
+
+  /** Returns the currently set per-merge IO writes rate limit, if {@link #enableAutoIOThrottle}
+   *  was called, else {@code Double.POSITIVE_INFINITY}. */
+  public synchronized double getIORateLimitMBPerSec() {
+    if (doAutoIOThrottle) {
+      return targetMBPerSec;
     } else {
-      maxThreadCount = Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors()/2));
-      maxMergeCount = maxThreadCount+2;
+      return Double.POSITIVE_INFINITY;
     }
   }
 
@@ -161,48 +211,18 @@ public class ConcurrentMergeScheduler ex
     return maxMergeCount;
   }
 
-  /** Return the priority that merge threads run at.  By
-   *  default the priority is 1 plus the priority of (ie,
-   *  slightly higher priority than) the first thread that
-   *  calls merge. */
-  public synchronized int getMergeThreadPriority() {
-    initMergeThreadPriority();
-    return mergeThreadPriority;
-  }
-
-  /** Set the base priority that merge threads run at.
-   *  Note that CMS may increase priority of some merge
-   *  threads beyond this base priority.  It's best not to
-   *  set this any higher than
-   *  Thread.MAX_PRIORITY-maxThreadCount, so that CMS has
-   *  room to set relative priority among threads.  */
-  public synchronized void setMergeThreadPriority(int pri) {
-    if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY)
-      throw new IllegalArgumentException("priority must be in range " + Thread.MIN_PRIORITY + " .. " + Thread.MAX_PRIORITY + " inclusive");
-    mergeThreadPriority = pri;
-    updateMergeThreads();
+  synchronized void removeMergeThread(MergeThread thread) {
+    boolean result = mergeThreads.remove(thread);
+    assert result;
   }
 
-  /** Sorts {@link MergeThread}s; larger merges come first. */
-  protected static final Comparator<MergeThread> compareByMergeDocCount = new Comparator<MergeThread>() {
-    @Override
-    public int compare(MergeThread t1, MergeThread t2) {
-      final MergePolicy.OneMerge m1 = t1.getCurrentMerge();
-      final MergePolicy.OneMerge m2 = t2.getCurrentMerge();
-      
-      final int c1 = m1 == null ? Integer.MAX_VALUE : m1.totalDocCount;
-      final int c2 = m2 == null ? Integer.MAX_VALUE : m2.totalDocCount;
-
-      return c2 - c1;
-    }
-  };
-
   /**
-   * Called whenever the running merges have changed, to pause and unpause
-   * threads. This method sorts the merge threads by their merge size in
+   * Called whenever the running merges have changed, to set merge IO limits.
+   * This method sorts the merge threads by their merge size in
    * descending order and then pauses/unpauses threads from first to last --
    * that way, smaller merges are guaranteed to run before larger ones.
    */
+
   protected synchronized void updateMergeThreads() {
 
     // Only look at threads that are alive & not in the
@@ -217,93 +237,121 @@ public class ConcurrentMergeScheduler ex
         mergeThreads.remove(threadIdx);
         continue;
       }
-      if (mergeThread.getCurrentMerge() != null) {
-        activeMerges.add(mergeThread);
-      }
+      activeMerges.add(mergeThread);
       threadIdx++;
     }
 
-    // Sort the merge threads in descending order.
-    CollectionUtil.timSort(activeMerges, compareByMergeDocCount);
-    
-    int pri = mergeThreadPriority;
+    // Sort the merge threads, largest first:
+    CollectionUtil.timSort(activeMerges);
+
     final int activeMergeCount = activeMerges.size();
-    for (threadIdx=0;threadIdx<activeMergeCount;threadIdx++) {
-      final MergeThread mergeThread = activeMerges.get(threadIdx);
-      final MergePolicy.OneMerge merge = mergeThread.getCurrentMerge();
-      if (merge == null) { 
-        continue;
+
+    int bigMergeCount = 0;
+
+    for (threadIdx=activeMergeCount-1;threadIdx>=0;threadIdx--) {
+      MergeThread mergeThread = activeMerges.get(threadIdx);
+      if (mergeThread.merge.estimatedMergeBytes > MIN_BIG_MERGE_MB*1024*1024) {
+        bigMergeCount = 1+threadIdx;
+        break;
       }
+    }
+
+    long now = System.nanoTime();
+
+    StringBuilder message;
+    if (verbose()) {
+      message = new StringBuilder();
+      message.append(String.format(Locale.ROOT, "updateMergeThreads ioThrottle=%s targetMBPerSec=%.1f MB/sec", doAutoIOThrottle, targetMBPerSec));
+    } else {
+      message = null;
+    }
+
+    for (threadIdx=0;threadIdx<activeMergeCount;threadIdx++) {
+      MergeThread mergeThread = activeMerges.get(threadIdx);
+
+      OneMerge merge = mergeThread.merge;
 
       // pause the thread if maxThreadCount is smaller than the number of merge threads.
-      final boolean doPause = threadIdx < activeMergeCount - maxThreadCount;
+      final boolean doPause = threadIdx < bigMergeCount - maxThreadCount;
+
+      double newMBPerSec;
+      if (doPause) {
+        newMBPerSec = 0.0;
+      } else if (merge.maxNumSegments != -1) {
+        newMBPerSec = forceMergeMBPerSec;
+      } else if (doAutoIOThrottle == false) {
+        newMBPerSec = Double.POSITIVE_INFINITY;
+      } else if (merge.estimatedMergeBytes < MIN_BIG_MERGE_MB*1024*1024) {
+        // Don't rate limit small merges:
+        newMBPerSec = Double.POSITIVE_INFINITY;
+      } else {
+        newMBPerSec = targetMBPerSec;
+      }
 
+      double curMBPerSec = merge.rateLimiter.getMBPerSec();
+      
       if (verbose()) {
-        if (doPause != merge.getPause()) {
-          if (doPause) {
-            message("pause thread " + mergeThread.getName());
+        long mergeStartNS = merge.mergeStartNS;
+        if (mergeStartNS == -1) {
+          // IndexWriter didn't start the merge yet:
+          mergeStartNS = now;
+        }
+        message.append('\n');
+        message.append(String.format(Locale.ROOT, "merge thread %s estSize=%.1f MB (written=%.1f MB) runTime=%.1fs (stopped=%.1fs, paused=%.1fs) rate=%s\n",
+                                     mergeThread.getName(),
+                                     bytesToMB(merge.estimatedMergeBytes),
+                                     bytesToMB(merge.rateLimiter.totalBytesWritten),
+                                     nsToSec(now - mergeStartNS),
+                                     nsToSec(merge.rateLimiter.getTotalStoppedNS()),
+                                     nsToSec(merge.rateLimiter.getTotalPausedNS()),
+                                     rateToString(merge.rateLimiter.getMBPerSec())));
+
+        if (newMBPerSec != curMBPerSec) {
+          if (newMBPerSec == 0.0) {
+            message.append("  now stop");
+          } else if (curMBPerSec == 0.0) {
+            if (newMBPerSec == Double.POSITIVE_INFINITY) {
+              message.append("  now resume");
+            } else {
+              message.append(String.format(Locale.ROOT, "  now resume to %.1f MB/sec", newMBPerSec));
+            }
           } else {
-            message("unpause thread " + mergeThread.getName());
+            message.append(String.format(Locale.ROOT, "  now change from %.1f MB/sec to %.1f MB/sec", curMBPerSec, newMBPerSec));
           }
+        } else if (curMBPerSec == 0.0) {
+          message.append("  leave stopped");
+        } else {
+          message.append(String.format(Locale.ROOT, "  leave running at %.1f MB/sec", curMBPerSec));
         }
       }
-      if (doPause != merge.getPause()) {
-        merge.setPause(doPause);
-      }
 
-      if (!doPause) {
-        if (verbose()) {
-          message("set priority of merge thread " + mergeThread.getName() + " to " + pri);
-        }
-        mergeThread.setThreadPriority(pri);
-        pri = Math.min(Thread.MAX_PRIORITY, 1+pri);
-      }
+      merge.rateLimiter.setMBPerSec(newMBPerSec);
     }
-  }
-
-  /**
-   * Returns true if verbosing is enabled. This method is usually used in
-   * conjunction with {@link #message(String)}, like that:
-   * 
-   * <pre class="prettyprint">
-   * if (verbose()) {
-   *   message(&quot;your message&quot;);
-   * }
-   * </pre>
-   */
-  protected boolean verbose() {
-    return writer != null && writer.infoStream.isEnabled("CMS");
-  }
-  
-  /**
-   * Outputs the given message - this method assumes {@link #verbose()} was
-   * called and returned true.
-   */
-  protected void message(String message) {
-    writer.infoStream.message("CMS", message);
-  }
-
-  private synchronized void initMergeThreadPriority() {
-    if (mergeThreadPriority == -1) {
-      // Default to slightly higher priority than our
-      // calling thread
-      mergeThreadPriority = 1+Thread.currentThread().getPriority();
-      if (mergeThreadPriority > Thread.MAX_PRIORITY)
-        mergeThreadPriority = Thread.MAX_PRIORITY;
+    if (verbose()) {
+      message(message.toString());
     }
   }
 
-  private synchronized void initMaxMergesAndThreads() throws IOException {
+  private synchronized void initDynamicDefaults(IndexWriter writer) throws IOException {
     if (maxThreadCount == AUTO_DETECT_MERGES_AND_THREADS) {
-      assert writer != null;
       boolean spins = IOUtils.spins(writer.getDirectory());
       setDefaultMaxMergesAndThreads(spins);
       if (verbose()) {
-        message("initMaxMergesAndThreads spins=" + spins + " maxThreadCount=" + maxThreadCount + " maxMergeCount=" + maxMergeCount);
+        message("initDynamicDefaults spins=" + spins + " maxThreadCount=" + maxThreadCount + " maxMergeCount=" + maxMergeCount);
       }
     }
   }
 
+  private static String rateToString(double mbPerSec) {
+    if (mbPerSec == 0.0) {
+      return "stopped";
+    } else if (mbPerSec == Double.POSITIVE_INFINITY) {
+      return "unlimited";
+    } else {
+      return String.format(Locale.ROOT, "%.1f MB/sec", mbPerSec);
+    }
+  }
+
   @Override
   public void close() {
     sync();
@@ -346,12 +394,9 @@ public class ConcurrentMergeScheduler ex
    */
   protected synchronized int mergeThreadCount() {
     int count = 0;
-    for (MergeThread mt : mergeThreads) {
-      if (mt.isAlive()) {
-        MergePolicy.OneMerge merge = mt.getCurrentMerge();
-        if (merge != null && merge.isAborted() == false) {
-          count++;
-        }
+    for (MergeThread mergeThread : mergeThreads) {
+      if (mergeThread.isAlive() && mergeThread.merge.rateLimiter.getAbort() == false) {
+        count++;
       }
     }
     return count;
@@ -362,12 +407,7 @@ public class ConcurrentMergeScheduler ex
 
     assert !Thread.holdsLock(writer);
 
-    this.writer = writer;
-
-    initMergeThreadPriority();
-    initMaxMergesAndThreads();
-
-    dir = writer.getDirectory();
+    initDynamicDefaults(writer);
 
     // First, quickly run through the newly proposed merges
     // and add any orthogonal merges (ie a merge not
@@ -385,9 +425,9 @@ public class ConcurrentMergeScheduler ex
     // pending merges, until it's empty:
     while (true) {
 
-      maybeStall();
+      maybeStall(writer);
 
-      MergePolicy.OneMerge merge = writer.getNextMerge();
+      OneMerge merge = writer.getNextMerge();
       if (merge == null) {
         if (verbose()) {
           message("  no more merges pending; now return");
@@ -395,6 +435,8 @@ public class ConcurrentMergeScheduler ex
         return;
       }
 
+      updateIOThrottle(merge);
+
       boolean success = false;
       try {
         if (verbose()) {
@@ -405,17 +447,13 @@ public class ConcurrentMergeScheduler ex
         // merge:
         final MergeThread merger = getMergeThread(writer, merge);
         mergeThreads.add(merger);
+
         if (verbose()) {
           message("    launch new thread [" + merger.getName() + "]");
         }
 
         merger.start();
 
-        // Must call this after starting the thread else
-        // the new thread is removed from mergeThreads
-        // (since it's not alive yet):
-        updateMergeThreads();
-
         success = true;
       } finally {
         if (!success) {
@@ -433,7 +471,7 @@ public class ConcurrentMergeScheduler ex
    *  as limiting how many threads are allowed to index, can do nothing
    *  here and throttle elsewhere. */
 
-  protected synchronized void maybeStall() {
+  protected synchronized void maybeStall(IndexWriter writer) {
     long startStallTime = 0;
     while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) {
       // This means merging has fallen too far behind: we
@@ -465,127 +503,78 @@ public class ConcurrentMergeScheduler ex
   }
 
   /** Does the actual merge, by calling {@link IndexWriter#merge} */
-  protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
+  protected void doMerge(IndexWriter writer, OneMerge merge) throws IOException {
     writer.merge(merge);
   }
 
   /** Create and return a new MergeThread */
-  protected synchronized MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
+  protected synchronized MergeThread getMergeThread(IndexWriter writer, OneMerge merge) throws IOException {
     final MergeThread thread = new MergeThread(writer, merge);
-    thread.setThreadPriority(mergeThreadPriority);
     thread.setDaemon(true);
     thread.setName("Lucene Merge Thread #" + mergeThreadCount++);
     return thread;
   }
 
-  /** Runs a merge thread, which may run one or more merges
-   *  in sequence. */
-  protected class MergeThread extends Thread {
-
-    IndexWriter tWriter;
-    MergePolicy.OneMerge startMerge;
-    MergePolicy.OneMerge runningMerge;
-    private volatile boolean done;
-
-    /** Sole constructor. */
-    public MergeThread(IndexWriter writer, MergePolicy.OneMerge startMerge) {
-      this.tWriter = writer;
-      this.startMerge = startMerge;
-    }
-
-    /** Record the currently running merge. */
-    public synchronized void setRunningMerge(MergePolicy.OneMerge merge) {
-      runningMerge = merge;
-    }
+  /** Runs a merge thread to execute a single merge, then exits. */
+  protected class MergeThread extends Thread implements Comparable<MergeThread> {
 
-    /** Return the currently running merge. */
-    public synchronized MergePolicy.OneMerge getRunningMerge() {
-      return runningMerge;
-    }
+    final IndexWriter writer;
+    final OneMerge merge;
 
-    /** Return the current merge, or null if this {@code
-     *  MergeThread} is done. */
-    public synchronized MergePolicy.OneMerge getCurrentMerge() {
-      if (done) {
-        return null;
-      } else if (runningMerge != null) {
-        return runningMerge;
-      } else {
-        return startMerge;
-      }
+    /** Sole constructor. */
+    public MergeThread(IndexWriter writer, OneMerge merge) {
+      this.writer = writer;
+      this.merge = merge;
     }
-
-    /** Set the priority of this thread. */
-    public void setThreadPriority(int pri) {
-      try {
-        setPriority(pri);
-      } catch (NullPointerException npe) {
-        // Strangely, Sun's JDK 1.5 on Linux sometimes
-        // throws NPE out of here...
-      } catch (SecurityException se) {
-        // Ignore this because we will still run fine with
-        // normal thread priority
-      }
+    
+    @Override
+    public int compareTo(MergeThread other) {
+      // Larger merges sort first:
+      return Long.compare(other.merge.estimatedMergeBytes, merge.estimatedMergeBytes);
     }
 
     @Override
     public void run() {
-      
-      // First time through the while loop we do the merge
-      // that we were started with:
-      MergePolicy.OneMerge merge = this.startMerge;
-      
+
       try {
 
         if (verbose()) {
           message("  merge thread: start");
         }
 
-        while(true) {
-          setRunningMerge(merge);
-          doMerge(merge);
-
-          // Subsequent times through the loop we do any new
-          // merge that writer says is necessary:
-          merge = tWriter.getNextMerge();
-
-          // Notify here in case any threads were stalled;
-          // they will notice that the pending merge has
-          // been pulled and possibly resume:
-          synchronized(ConcurrentMergeScheduler.this) {
-            ConcurrentMergeScheduler.this.notifyAll();
-          }
-
-          if (merge != null) {
-            updateMergeThreads();
-            if (verbose()) {
-              message("  merge thread: do another merge " + tWriter.segString(merge.segments));
-            }
-          } else {
-            break;
-          }
-        }
+        doMerge(writer, merge);
 
         if (verbose()) {
           message("  merge thread: done");
         }
 
+        removeMergeThread(this);
+
+        // Let CMS run new merges if necessary:
+        try {
+          merge(writer, MergeTrigger.MERGE_FINISHED, true);
+        } catch (AlreadyClosedException ace) {
+          // OK
+        } catch (IOException ioe) {
+          throw new RuntimeException(ioe);
+        }
+
       } catch (Throwable exc) {
 
-        // Ignore the exception if it was due to abort:
-        if (!(exc instanceof MergePolicy.MergeAbortedException)) {
-          //System.out.println(Thread.currentThread().getName() + ": CMS: exc");
-          //exc.printStackTrace(System.out);
-          if (!suppressExceptions) {
-            // suppressExceptions is normally only set during
-            // testing.
-            handleMergeException(exc);
-          }
+        if (exc instanceof MergePolicy.MergeAbortedException) {
+          // OK to ignore
+        } else if (suppressExceptions == false) {
+          // suppressExceptions is normally only set during
+          // testing.
+          handleMergeException(writer.getDirectory(), exc);
         }
+
       } finally {
-        done = true;
         synchronized(ConcurrentMergeScheduler.this) {
           updateMergeThreads();
+
+          // In case we had stalled indexing, we can now wake up
+          // and possibly unstall:
           ConcurrentMergeScheduler.this.notifyAll();
         }
       }
@@ -594,7 +583,7 @@ public class ConcurrentMergeScheduler ex
 
   /** Called when an exception is hit in a background merge
    *  thread */
-  protected void handleMergeException(Throwable exc) {
+  protected void handleMergeException(Directory dir, Throwable exc) {
     try {
       // When an exception is hit during merge, IndexWriter
       // removes any partial files and then allows another
@@ -606,6 +595,7 @@ public class ConcurrentMergeScheduler ex
     } catch (InterruptedException ie) {
       throw new ThreadInterruptedException(ie);
     }
+
     throw new MergePolicy.MergeException(exc, dir);
   }
 
@@ -626,7 +616,115 @@ public class ConcurrentMergeScheduler ex
     StringBuilder sb = new StringBuilder(getClass().getSimpleName() + ": ");
     sb.append("maxThreadCount=").append(maxThreadCount).append(", ");    
     sb.append("maxMergeCount=").append(maxMergeCount).append(", ");    
-    sb.append("mergeThreadPriority=").append(mergeThreadPriority);
+    sb.append("ioThrottle=").append(doAutoIOThrottle);
     return sb.toString();
   }
+
+  private boolean isBacklog(long now, OneMerge merge) {
+    double mergeMB = bytesToMB(merge.estimatedMergeBytes);
+    for (MergeThread mergeThread : mergeThreads) {
+      long mergeStartNS = mergeThread.merge.mergeStartNS;
+      if (mergeThread.isAlive() && mergeThread.merge != merge &&
+          mergeStartNS != -1 &&
+          mergeThread.merge.estimatedMergeBytes >= MIN_BIG_MERGE_MB*1024*1024 &&
+          nsToSec(now-mergeStartNS) > 3.0) {
+        double otherMergeMB = bytesToMB(mergeThread.merge.estimatedMergeBytes);
+        double ratio = otherMergeMB / mergeMB;
+        if (ratio > 0.3 && ratio < 3.0) {
+          return true;
+        }
+      }
+    }
+
+    return false;
+  }
+
+  /** Tunes IO throttle when a new merge starts. */
+  private synchronized void updateIOThrottle(OneMerge merge) throws IOException {
+    if (doAutoIOThrottle == false) {
+      return;
+    }
+
+    double mergeMB = bytesToMB(merge.estimatedMergeBytes);
+    if (mergeMB < MIN_BIG_MERGE_MB) {
+      // Only watch non-trivial merges for throttling; this is safe because the MP must eventually
+      // have to do larger merges:
+      return;
+    }
+
+    long now = System.nanoTime();
+
+    // Simplistic closed-loop feedback control: if we find any other similarly
+    // sized merges running, then we are falling behind, so we bump up the
+    // IO throttle, else we lower it:
+    boolean newBacklog = isBacklog(now, merge);
+
+    boolean curBacklog = false;
+
+    if (newBacklog == false) {
+      if (mergeThreads.size() > maxThreadCount) {
+        // If there are already more than the maximum merge threads allowed, count that as backlog:
+        curBacklog = true;
+      } else {
+        // Now see if any still-running merges are backlog'd:
+        for (MergeThread mergeThread : mergeThreads) {
+          if (isBacklog(now, mergeThread.merge)) {
+            curBacklog = true;
+            break;
+          }
+        }
+      }
+    }
+
+    double curMBPerSec = targetMBPerSec;
+
+    if (newBacklog) {
+      // This new merge adds to the backlog: increase IO throttle by 20%
+      targetMBPerSec *= 1.20;
+      if (targetMBPerSec > 10000) {
+        targetMBPerSec = 10000;
+      }
+      if (verbose()) {
+        if (curMBPerSec == targetMBPerSec) {
+          message(String.format(Locale.ROOT, "io throttle: new merge backlog; leave IO rate at ceiling %.1f MB/sec", targetMBPerSec));
+        } else {
+          message(String.format(Locale.ROOT, "io throttle: new merge backlog; increase IO rate to %.1f MB/sec", targetMBPerSec));
+        }
+      }
+    } else if (curBacklog) {
+      // We still have an existing backlog; leave the rate as is:
+      if (verbose()) {
+        message(String.format(Locale.ROOT, "io throttle: current merge backlog; leave IO rate at %.1f MB/sec",
+                              targetMBPerSec));
+      }
+    } else {
+      // We are not falling behind: decrease IO throttle by 10%
+      targetMBPerSec /= 1.10;
+      if (targetMBPerSec < MIN_MERGE_MB_PER_SEC) {
+        targetMBPerSec = MIN_MERGE_MB_PER_SEC;
+      }
+      if (verbose()) {
+        if (curMBPerSec == targetMBPerSec) {
+          message(String.format(Locale.ROOT, "io throttle: no merge backlog; leave IO rate at floor %.1f MB/sec", targetMBPerSec));
+        } else {
+          message(String.format(Locale.ROOT, "io throttle: no merge backlog; decrease IO rate to %.1f MB/sec", targetMBPerSec));
+        }
+      }
+    }
+
+    targetMBPerSecChanged();
+    updateMergeThreads();
+  }
+
+  /** Subclass can override to tweak targetMBPerSec. */
+  protected void targetMBPerSecChanged() {
+  }
+
+  private static double nsToSec(long ns) {
+    return ns / 1000000000.0;
+  }
+
+  private static double bytesToMB(long bytes) {
+    return bytes/1024./1024.;
+  }
 }

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java Mon Jan  5 14:28:28 2015
@@ -20,9 +20,8 @@ package org.apache.lucene.index;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
-import java.util.HashSet;
+import java.util.Locale;
 import java.util.Queue;
-import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -35,7 +34,6 @@ import org.apache.lucene.search.Query;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.Accountable;
-import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.InfoStream;
 
 /**
@@ -553,11 +551,13 @@ final class DocumentsWriter implements C
     final double ramBufferSizeMB = config.getRAMBufferSizeMB();
     if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
         flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) {
-      if (infoStream.isEnabled("DW")) {
-        infoStream.message("DW", "force apply deletes bytesUsed=" + flushControl.getDeleteBytesUsed() + " vs ramBuffer=" + (1024*1024*ramBufferSizeMB));
-      }
       hasEvents = true;
       if (!this.applyAllDeletes(deleteQueue)) {
+        if (infoStream.isEnabled("DW")) {
+          infoStream.message("DW", String.format(Locale.ROOT, "force apply deletes bytesUsed=%.1f MB vs ramBuffer=%.1f MB",
+                                                 flushControl.getDeleteBytesUsed()/(1024.*1024.),
+                                                 ramBufferSizeMB));
+        }
         putEvent(ApplyDeletesEvent.INSTANCE);
       }
     }

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Mon Jan  5 14:28:28 2015
@@ -485,7 +485,7 @@ class DocumentsWriterPerThread {
     try {
       
       if (indexWriterConfig.getUseCompoundFile()) {
-        filesToDelete.addAll(IndexWriter.createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, newSegment.info, context));
+        filesToDelete.addAll(IndexWriter.createCompoundFile(infoStream, directory, newSegment.info, context));
         newSegment.info.setUseCompoundFile(true);
       }
 

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java Mon Jan  5 14:28:28 2015
@@ -47,18 +47,22 @@ import org.apache.lucene.index.DocValues
 import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
 import org.apache.lucene.index.FieldInfos.FieldNumbers;
 import org.apache.lucene.index.IndexWriterConfig.OpenMode;
-import org.apache.lucene.index.MergeState.CheckAbort;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.FlushInfo;
 import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.Lock;
 import org.apache.lucene.store.LockObtainFailedException;
 import org.apache.lucene.store.MergeInfo;
+import org.apache.lucene.store.RateLimitedIndexOutput;
 import org.apache.lucene.store.TrackingDirectoryWrapper;
 import org.apache.lucene.util.Accountable;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.CloseableThreadLocal;
 import org.apache.lucene.util.Constants;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.InfoStream;
@@ -247,6 +251,7 @@ public class IndexWriter implements Clos
   volatile Throwable tragedy;
 
   private final Directory directory;  // where this index resides
+  private final Directory mergeDirectory;  // used for merging
   private final Analyzer analyzer;    // how to analyze text
 
   private volatile long changeCount; // increments every time a change is completed
@@ -319,6 +324,8 @@ public class IndexWriter implements Clos
    *  card to make sure they can later charge you when you check out. */
   final AtomicLong pendingNumDocs = new AtomicLong();
 
+  final CloseableThreadLocal<MergeRateLimiter> rateLimiters = new CloseableThreadLocal<>();
+
   DirectoryReader getReader() throws IOException {
     return getReader(true);
   }
@@ -741,10 +748,17 @@ public class IndexWriter implements Clos
   public IndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
     conf.setIndexWriter(this); // prevent reuse by other instances
     config = conf;
+
     directory = d;
+
+    // Directory we use for merging, so we can abort running merges, and so
+    // merge schedulers can optionally rate-limit per-merge IO:
+    mergeDirectory = addMergeRateLimiters(d);
+
     analyzer = config.getAnalyzer();
     infoStream = config.getInfoStream();
     mergeScheduler = config.getMergeScheduler();
+    mergeScheduler.setInfoStream(infoStream);
     codec = config.getCodec();
 
     bufferedUpdatesStream = new BufferedUpdatesStream(infoStream);
@@ -1696,7 +1710,7 @@ public class IndexWriter implements Clos
             for(int i=0;i<size;i++) {
               final MergePolicy.OneMerge merge = mergeExceptions.get(i);
               if (merge.maxNumSegments != -1) {
-                throw new IOException("background merge hit exception: " + merge.segString(directory), merge.getException());
+                throw new IOException("background merge hit exception: " + merge.segString(), merge.getException());
               }
             }
           }
@@ -1786,7 +1800,7 @@ public class IndexWriter implements Clos
             }
             Throwable t = merge.getException();
             if (t != null) {
-              throw new IOException("background merge hit exception: " + merge.segString(directory), t);
+              throw new IOException("background merge hit exception: " + merge.segString(), t);
             }
           }
 
@@ -1969,6 +1983,8 @@ public class IndexWriter implements Clos
         stopMerges = true;
       }
 
+      rateLimiters.close();
+
       if (infoStream.isEnabled("IW")) {
         infoStream.message("IW", "rollback: done finish merges");
       }
@@ -2155,7 +2171,7 @@ public class IndexWriter implements Clos
       if (infoStream.isEnabled("IW")) {
         infoStream.message("IW", "now abort pending merge " + segString(merge.segments));
       }
-      merge.abort();
+      merge.rateLimiter.setAbort();
       mergeFinish(merge);
     }
     pendingMerges.clear();
@@ -2164,7 +2180,7 @@ public class IndexWriter implements Clos
       if (infoStream.isEnabled("IW")) {
         infoStream.message("IW", "now abort running merge " + segString(merge.segments));
       }
-      merge.abort();
+      merge.rateLimiter.setAbort();
     }
 
     // These merges periodically check whether they have
@@ -2405,7 +2421,7 @@ public class IndexWriter implements Clos
               infoStream.message("IW", "addIndexes: process segment origName=" + info.info.name + " newName=" + newSegName + " info=" + info);
             }
 
-            IOContext context = new IOContext(new MergeInfo(info.info.getDocCount(), info.sizeInBytes(), true, -1));
+            IOContext context = new IOContext(new FlushInfo(info.info.getDocCount(), info.sizeInBytes()));
 
             FieldInfos fis = readFieldInfos(info);
             for(FieldInfo fi : fis) {
@@ -2516,19 +2532,21 @@ public class IndexWriter implements Clos
       // exceed the limit:
       reserveDocs(numDocs);
       
-      final IOContext context = new IOContext(new MergeInfo(numDocs, -1, true, -1));
+      final IOContext context = new IOContext(new MergeInfo(numDocs, -1, false, -1));
 
       // TODO: somehow we should fix this merge so it's
       // abortable so that IW.close(false) is able to stop it
-      TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(directory);
+      TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(mergeDirectory);
 
       SegmentInfo info = new SegmentInfo(directory, Version.LATEST, mergedName, -1,
                                          false, codec, null, StringHelper.randomId(), new HashMap<>());
 
       SegmentMerger merger = new SegmentMerger(mergeReaders, info, infoStream, trackingDir,
-                                               MergeState.CheckAbort.NONE, globalFieldNumberMap, 
+                                               globalFieldNumberMap, 
                                                context);
       
+      rateLimiters.set(new MergeRateLimiter(null));
+
       if (!merger.shouldMerge()) {
         return;
       }
@@ -2567,7 +2585,7 @@ public class IndexWriter implements Clos
       if (useCompoundFile) {
         Collection<String> filesToDelete = infoPerCommit.files();
         try {
-          createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, info, context);
+          createCompoundFile(infoStream, mergeDirectory, info, context);
         } finally {
           // delete new non cfs files directly: they were never
           // registered with IFD
@@ -3040,6 +3058,9 @@ public class IndexWriter implements Clos
   final synchronized void applyAllDeletesAndUpdates() throws IOException {
     flushDeletesCount.incrementAndGet();
     final BufferedUpdatesStream.ApplyDeletesResult result;
+    if (infoStream.isEnabled("IW")) {
+      infoStream.message("IW", "now apply all deletes for all segments maxDoc=" + (docWriter.getNumDocs() + segmentInfos.totalDocCount()));
+    }
     result = bufferedUpdatesStream.applyDeletesAndUpdates(readerPool, segmentInfos.asList());
     if (result.anyDeletes) {
       checkpoint();
@@ -3354,7 +3375,7 @@ public class IndexWriter implements Clos
     // deleter.refresh() call that will remove any index
     // file that current segments does not reference), we
     // abort this merge
-    if (merge.isAborted()) {
+    if (merge.rateLimiter.getAbort()) {
       if (infoStream.isEnabled("IW")) {
         infoStream.message("IW", "commitMerge: skip: it was aborted");
       }
@@ -3513,6 +3534,8 @@ public class IndexWriter implements Clos
 
     boolean success = false;
 
+    rateLimiters.set(merge.rateLimiter);
+
     final long t0 = System.currentTimeMillis();
 
     final MergePolicy mergePolicy = config.getMergePolicy();
@@ -3550,7 +3573,7 @@ public class IndexWriter implements Clos
           // This merge (and, generally, any change to the
           // segments) may now enable new merges, so we call
           // merge policy & update pending merges.
-          if (success && !merge.isAborted() && (merge.maxNumSegments != -1 || (!closed && !closing))) {
+          if (success && merge.rateLimiter.getAbort() == false && (merge.maxNumSegments != -1 || (!closed && !closing))) {
             updatePendingMerges(mergePolicy, MergeTrigger.MERGE_FINISHED, merge.maxNumSegments);
           }
         }
@@ -3558,7 +3581,7 @@ public class IndexWriter implements Clos
     } catch (OutOfMemoryError oom) {
       tragicEvent(oom, "merge");
     }
-    if (merge.info != null && !merge.isAborted()) {
+    if (merge.info != null && merge.rateLimiter.getAbort() == false) {
       if (infoStream.isEnabled("IW")) {
         infoStream.message("IW", "merge time " + (System.currentTimeMillis()-t0) + " msec for " + merge.info.info.getDocCount() + " docs");
       }
@@ -3583,7 +3606,7 @@ public class IndexWriter implements Clos
     assert merge.segments.size() > 0;
 
     if (stopMerges) {
-      merge.abort();
+      merge.rateLimiter.setAbort();
       throw new MergePolicy.MergeAbortedException("merge is aborted: " + segString(merge.segments));
     }
 
@@ -3694,7 +3717,7 @@ public class IndexWriter implements Clos
       return;
     }
 
-    if (merge.isAborted()) {
+    if (merge.rateLimiter.getAbort()) {
       return;
     }
 
@@ -3703,6 +3726,10 @@ public class IndexWriter implements Clos
     // and then open them again for merging.  Maybe  we
     // could pre-pool them somehow in that case...
 
+    if (infoStream.isEnabled("IW")) {
+      infoStream.message("IW", "now apply deletes for " + merge.segments.size() + " merging segments");
+    }
+
     // Lock order: IW -> BD
     final BufferedUpdatesStream.ApplyDeletesResult result = bufferedUpdatesStream.applyDeletesAndUpdates(readerPool, merge.segments);
     
@@ -3839,14 +3866,13 @@ public class IndexWriter implements Clos
    *  instance */
   private int mergeMiddle(MergePolicy.OneMerge merge, MergePolicy mergePolicy) throws IOException {
 
-    merge.checkAborted(directory);
+    merge.rateLimiter.checkAbort();
 
     List<SegmentCommitInfo> sourceSegments = merge.segments;
     
     IOContext context = new IOContext(merge.getMergeInfo());
 
-    final MergeState.CheckAbort checkAbort = new MergeState.CheckAbort(merge, directory);
-    final TrackingDirectoryWrapper dirWrapper = new TrackingDirectoryWrapper(directory);
+    final TrackingDirectoryWrapper dirWrapper = new TrackingDirectoryWrapper(mergeDirectory);
 
     if (infoStream.isEnabled("IW")) {
       infoStream.message("IW", "merging " + segString(merge.segments));
@@ -3926,15 +3952,12 @@ public class IndexWriter implements Clos
       // OneMerge to return a view over the actual segments to merge
       final SegmentMerger merger = new SegmentMerger(merge.getMergeReaders(),
                                                      merge.info.info, infoStream, dirWrapper,
-                                                     checkAbort, globalFieldNumberMap, 
+                                                     globalFieldNumberMap, 
                                                      context);
 
-      merge.checkAborted(directory);
+      merge.rateLimiter.checkAbort();
 
-      long mergeStartTime = 0;
-      if (infoStream.isEnabled("IW")) {
-        mergeStartTime = System.nanoTime();
-      }
+      merge.mergeStartNS = System.nanoTime();
 
       // This is where all the work happens:
       boolean success3 = false;
@@ -3954,13 +3977,13 @@ public class IndexWriter implements Clos
       assert mergeState.segmentInfo == merge.info.info;
       merge.info.info.setFiles(new HashSet<>(dirWrapper.getCreatedFiles()));
 
-      // Record which codec was used to write the segment
-
       if (infoStream.isEnabled("IW")) {
         if (merger.shouldMerge()) {
           long t1 = System.nanoTime();
-          double sec = (t1-mergeStartTime)/1000000000.;
+          double sec = (t1-merge.mergeStartNS)/1000000000.;
           double segmentMB = (merge.info.sizeInBytes()/1024./1024.);
+          double stoppedSec = merge.rateLimiter.getTotalStoppedNS()/1000000000.;
+          double throttleSec = merge.rateLimiter.getTotalPausedNS()/1000000000.;
           infoStream.message("IW", "merge codec=" + codec + " docCount=" + merge.info.info.getDocCount() + "; merged segment has " +
                              (mergeState.mergeFieldInfos.hasVectors() ? "vectors" : "no vectors") + "; " +
                              (mergeState.mergeFieldInfos.hasNorms() ? "norms" : "no norms") + "; " + 
@@ -3968,8 +3991,10 @@ public class IndexWriter implements Clos
                              (mergeState.mergeFieldInfos.hasProx() ? "prox" : "no prox") + "; " + 
                              (mergeState.mergeFieldInfos.hasProx() ? "freqs" : "no freqs") + "; " +
                              String.format(Locale.ROOT,
-                                           "%d msec to merge segment [%.2f MB, %.2f MB/sec]",
-                                           ((t1-mergeStartTime)/1000000),
+                                           "%.1f sec (%.1f sec stopped, %.1f sec paused) to merge segment [%.2f MB, %.2f MB/sec]",
+                                           sec,
+                                           stoppedSec,
+                                           throttleSec,
                                            segmentMB,
                                            segmentMB / sec));
         } else {
@@ -4002,11 +4027,11 @@ public class IndexWriter implements Clos
         Collection<String> filesToRemove = merge.info.files();
 
         try {
-          filesToRemove = createCompoundFile(infoStream, directory, checkAbort, merge.info.info, context);
+          filesToRemove = createCompoundFile(infoStream, mergeDirectory, merge.info.info, context);
           success = true;
         } catch (IOException ioe) {
           synchronized(this) {
-            if (merge.isAborted()) {
+            if (merge.rateLimiter.getAbort()) {
               // This can happen if rollback or close(false)
               // is called -- fall through to logic below to
               // remove the partially created CFS:
@@ -4042,7 +4067,7 @@ public class IndexWriter implements Clos
           // registered with IFD
           deleter.deleteNewFiles(filesToRemove);
 
-          if (merge.isAborted()) {
+          if (merge.rateLimiter.getAbort()) {
             if (infoStream.isEnabled("IW")) {
               infoStream.message("IW", "abort merge after building CFS");
             }
@@ -4496,7 +4521,7 @@ public class IndexWriter implements Clos
    * deletion files, this SegmentInfo must not reference such files when this
    * method is called, because they are not allowed within a compound file.
    */
-  static final Collection<String> createCompoundFile(InfoStream infoStream, Directory directory, CheckAbort checkAbort, final SegmentInfo info, IOContext context)
+  static final Collection<String> createCompoundFile(InfoStream infoStream, Directory directory, final SegmentInfo info, IOContext context)
           throws IOException {
 
     // TODO: use trackingdirectorywrapper instead of files() to know which files to delete when things fail:
@@ -4510,7 +4535,7 @@ public class IndexWriter implements Clos
     
     boolean success = false;
     try {
-      info.getCodec().compoundFormat().write(directory, info, files, checkAbort, context);
+      info.getCodec().compoundFormat().write(directory, info, files, context);
       success = true;
     } finally {
       if (!success) {
@@ -4643,4 +4668,24 @@ public class IndexWriter implements Clos
       throw new IllegalStateException("number of documents in the index cannot exceed " + actualMaxDocs);
     }
   }
+
+  /** Wraps the incoming {@link Directory} so that we assign a per-thread
+   *  {@link MergeRateLimiter} to all created {@link IndexOutput}s. */
+  private Directory addMergeRateLimiters(Directory in) {
+    return new FilterDirectory(in) {
+      @Override
+      public IndexOutput createOutput(String name, IOContext context) throws IOException {
+        ensureOpen();
+
+        // This Directory is only supposed to be used during merging,
+        // so all writes should have MERGE context, else there is a bug 
+        // somewhere that is failing to pass down the right IOContext:
+        assert context.context == IOContext.Context.MERGE: "got context=" + context.context;
+        IndexOutput output = in.createOutput(name, context);
+        MergeRateLimiter rateLimiter = rateLimiters.get();
+        assert rateLimiter != null;
+        return new RateLimitedIndexOutput(rateLimiter, output);
+      }
+    };
+  }
 }

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsAndPositionsEnum.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsAndPositionsEnum.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsAndPositionsEnum.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsAndPositionsEnum.java Mon Jan  5 14:28:28 2015
@@ -17,8 +17,8 @@ package org.apache.lucene.index;
  * limitations under the License.
  */
 
-import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.index.MultiDocsAndPositionsEnum.EnumWithSlice;
+import org.apache.lucene.util.BytesRef;
 
 import java.io.IOException;
 
@@ -97,13 +97,6 @@ final class MappingMultiDocsAndPositions
 
       int doc = current.nextDoc();
       if (doc != NO_MORE_DOCS) {
-
-        mergeState.checkAbortCount++;
-        if (mergeState.checkAbortCount > 60000) {
-          mergeState.checkAbort.work(mergeState.checkAbortCount/5.0);
-          mergeState.checkAbortCount = 0;
-        }
-
         // compact deletions
         doc = currentMap.get(doc);
         if (doc == -1) {

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsEnum.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsEnum.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsEnum.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsEnum.java Mon Jan  5 14:28:28 2015
@@ -97,13 +97,6 @@ final class MappingMultiDocsEnum extends
 
       int doc = current.nextDoc();
       if (doc != NO_MORE_DOCS) {
-
-        mergeState.checkAbortCount++;
-        if (mergeState.checkAbortCount > 60000) {
-          mergeState.checkAbort.work(mergeState.checkAbortCount/5.0);
-          mergeState.checkAbortCount = 0;
-        }
-
         // compact deletions
         doc = currentMap.get(doc);
         if (doc == -1) {

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java Mon Jan  5 14:28:28 2015
@@ -17,16 +17,17 @@ package org.apache.lucene.index;
  * limitations under the License.
  */
 
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.MergeInfo;
-import org.apache.lucene.util.FixedBitSet;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.MergeInfo;
+import org.apache.lucene.store.RateLimiter;
+import org.apache.lucene.util.FixedBitSet;
+
 /**
  * <p>Expert: a MergePolicy determines the sequence of
  * primitive merge operations.</p>
@@ -107,11 +108,14 @@ public abstract class MergePolicy {
     /** Segments to be merged. */
     public final List<SegmentCommitInfo> segments;
 
+    /** A private {@link RateLimiter} for this merge, used to rate limit writes and abort. */
+    public final MergeRateLimiter rateLimiter;
+
+    volatile long mergeStartNS = -1;
+
     /** Total number of documents in segments to be merged, not accounting for deletions. */
     public final int totalDocCount;
-    boolean aborted;
     Throwable error;
-    boolean paused;
 
     /** Sole constructor.
      * @param segments List of {@link SegmentCommitInfo}s
@@ -127,6 +131,8 @@ public abstract class MergePolicy {
         count += info.info.getDocCount();
       }
       totalDocCount = count;
+
+      rateLimiter = new MergeRateLimiter(this);
     }
 
     /** Called by {@link IndexWriter} after the merge is done and all readers have been closed. */
@@ -186,68 +192,16 @@ public abstract class MergePolicy {
       return error;
     }
 
-    /** Mark this merge as aborted.  If this is called
-     *  before the merge is committed then the merge will
-     *  not be committed. */
-    synchronized void abort() {
-      aborted = true;
-      notifyAll();
-    }
-
-    /** Returns true if this merge was aborted. */
-    synchronized boolean isAborted() {
-      return aborted;
-    }
-
-    /** Called periodically by {@link IndexWriter} while
-     *  merging to see if the merge is aborted. */
-    public synchronized void checkAborted(Directory dir) throws MergeAbortedException {
-      if (aborted) {
-        throw new MergeAbortedException("merge is aborted: " + segString(dir));
-      }
-
-      while (paused) {
-        try {
-          // In theory we could wait() indefinitely, but we
-          // do 250 msec, defensively
-          wait(250);
-        } catch (InterruptedException ie) {
-          throw new RuntimeException(ie);
-        }
-        if (aborted) {
-          throw new MergeAbortedException("merge is aborted: " + segString(dir));
-        }
-      }
-    }
-
-    /** Set or clear whether this merge is paused paused (for example
-     *  {@link ConcurrentMergeScheduler} will pause merges
-     *  if too many are running). */
-    synchronized public void setPause(boolean paused) {
-      this.paused = paused;
-      if (!paused) {
-        // Wakeup merge thread, if it's waiting
-        notifyAll();
-      }
-    }
-
-    /** Returns true if this merge is paused.
-     *
-     *  @see #setPause(boolean) */
-    synchronized public boolean getPause() {
-      return paused;
-    }
-
     /** Returns a readable description of the current merge
      *  state. */
-    public String segString(Directory dir) {
+    public String segString() {
       StringBuilder b = new StringBuilder();
       final int numSegments = segments.size();
       for(int i=0;i<numSegments;i++) {
         if (i > 0) {
           b.append(' ');
         }
-        b.append(segments.get(i).toString(dir, 0));
+        b.append(segments.get(i).toString());
       }
       if (info != null) {
         b.append(" into ").append(info.info.name);
@@ -255,7 +209,7 @@ public abstract class MergePolicy {
       if (maxNumSegments != -1) {
         b.append(" [maxNumSegments=" + maxNumSegments + "]");
       }
-      if (aborted) {
+      if (rateLimiter.getAbort()) {
         b.append(" [ABORTED]");
       }
       return b.toString();
@@ -321,7 +275,7 @@ public abstract class MergePolicy {
       b.append("MergeSpec:\n");
       final int count = merges.size();
       for(int i=0;i<count;i++) {
-        b.append("  ").append(1 + i).append(": ").append(merges.get(i).segString(dir));
+        b.append("  ").append(1 + i).append(": ").append(merges.get(i).segString());
       }
       return b.toString();
     }
@@ -538,5 +492,4 @@ public abstract class MergePolicy {
     v *= 1024 * 1024;
     this.maxCFSSegmentSize = v > Long.MAX_VALUE ? Long.MAX_VALUE : (long) v;
   }
-
 }

Added: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java?rev=1649532&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java (added)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java Mon Jan  5 14:28:28 2015
@@ -0,0 +1,194 @@
+package org.apache.lucene.index;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.store.RateLimiter;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+import static org.apache.lucene.store.RateLimiter.SimpleRateLimiter;
+
+/** This is the {@link RateLimiter} that {@link IndexWriter} assigns to each running merge, to 
+ *  give {@link MergeScheduler}s ionice like control.
+ *
+ *  This is similar to {@link SimpleRateLimiter}, except it's merge-private,
+ *  it will wake up if its rate changes while it's paused, it tracks how
+ *  much time it spent stopped and paused, and it supports aborting.
+ *
+ *  @lucene.internal */
+
+public class MergeRateLimiter extends RateLimiter {
+
+  private final static int MIN_PAUSE_CHECK_MSEC = 25;
+  volatile long totalBytesWritten;
+
+  // By default no IO limit:
+  double mbPerSec = Double.POSITIVE_INFINITY;
+  private long lastNS;
+  private long minPauseCheckBytes;
+  private boolean abort;
+  long totalPausedNS;
+  long totalStoppedNS;
+  final MergePolicy.OneMerge merge;
+
+  /** Returned by {@link #maybePause}. */
+  private static enum PauseResult {NO, STOPPED, PAUSED};
+
+  /** Sole constructor. */
+  public MergeRateLimiter(MergePolicy.OneMerge merge) {
+    this.merge = merge;
+  }
+
+  @Override
+  public synchronized void setMBPerSec(double mbPerSec) {
+    // 0.0 is allowed: it means the merge is paused
+    if (mbPerSec < 0.0) {
+      throw new IllegalArgumentException("mbPerSec must be positive; got: " + mbPerSec);
+    }
+    this.mbPerSec = mbPerSec;
+    // NOTE: Double.POSITIVE_INFINITY casts to Long.MAX_VALUE
+    minPauseCheckBytes = Math.min(1024*1024, (long) ((MIN_PAUSE_CHECK_MSEC / 1000.0) * mbPerSec * 1024 * 1024));
+    assert minPauseCheckBytes >= 0;
+    notify();
+  }
+
+  @Override
+  public synchronized double getMBPerSec() {
+    return mbPerSec;
+  }
+
+  /** Returns total bytes written by this merge. */
+  public long getTotalBytesWritten() {
+    return totalBytesWritten;
+  }
+
+  @Override
+  public long pause(long bytes) throws MergePolicy.MergeAbortedException {
+
+    totalBytesWritten += bytes;
+
+    long startNS = System.nanoTime();
+    long curNS = startNS;
+
+    // While loop because 1) Thread.wait doesn't always sleep long
+    // enough, and 2) we wake up and check again when our rate limit
+    // is changed while we were pausing:
+    long pausedNS = 0;
+    while (true) {
+      PauseResult result = maybePause(bytes, curNS);
+      if (result == PauseResult.NO) {
+        // Set to curNS, not targetNS, to enforce the instant rate, not
+        // the "averaaged over all history" rate:
+        lastNS = curNS;
+        break;
+      }
+      curNS = System.nanoTime();
+      long ns = curNS - startNS;
+      startNS = curNS;
+
+      // Separately track when merge was stopped vs rate limited:
+      if (result == PauseResult.STOPPED) {
+        totalStoppedNS += ns;
+      } else {
+        assert result == PauseResult.PAUSED;
+        totalPausedNS += ns;
+      }
+      pausedNS += ns;
+    }
+
+    return pausedNS;
+  }
+
+  /** Total NS merge was stopped. */
+  public synchronized long getTotalStoppedNS() {
+    return totalStoppedNS;
+  } 
+
+  /** Total NS merge was paused to rate limit IO. */
+  public synchronized long getTotalPausedNS() {
+    return totalPausedNS;
+  } 
+
+  /** Returns NO if no pause happened, STOPPED if pause because rate was 0.0 (merge is stopped), PAUSED if paused with a normal rate limit. */
+  private synchronized PauseResult maybePause(long bytes, long curNS) throws MergePolicy.MergeAbortedException {
+    double secondsToPause = (bytes/1024./1024.) / mbPerSec;
+
+    // Time we should sleep until; this is purely instantaneous
+    // rate (just adds seconds onto the last time we had paused to);
+    // maybe we should also offer decayed recent history one?
+    long targetNS = lastNS + (long) (1000000000 * secondsToPause);
+
+    long curPauseNS = targetNS - curNS;
+
+    // NOTE: except maybe on real-time JVMs, minimum realistic
+    // wait/sleep time is 1 msec; if you pass just 1 nsec the impl
+    // rounds up to 1 msec, so we don't bother unless it's > 2 msec:
+
+    if (curPauseNS <= 2000000) {
+      return PauseResult.NO;
+    }
+
+    // Defensive: sleep for at most 250 msec; the loop above will call us again if we should keep sleeping:
+    if (curPauseNS > 250L*1000000) {
+      curPauseNS = 250L*1000000;
+    }
+
+    int sleepMS = (int) (curPauseNS / 1000000);
+    int sleepNS = (int) (curPauseNS % 1000000);
+
+    // Now is a good time to abort the merge:
+    checkAbort();
+
+    double rate = mbPerSec;
+
+    try {
+      // CMS can wake us up here if it changes our target rate:
+      wait(sleepMS, sleepNS);
+    } catch (InterruptedException ie) {
+      throw new ThreadInterruptedException(ie);
+    }
+
+    if (rate == 0.0) {
+      return PauseResult.STOPPED;
+    } else {
+      return PauseResult.PAUSED;
+    }
+  }
+
+  /** Throws {@link MergePolicy.MergeAbortedException} if this merge was aborted. */
+  public synchronized void checkAbort() throws MergePolicy.MergeAbortedException {
+    if (abort) {
+      throw new MergePolicy.MergeAbortedException("merge is aborted: " + merge.segString());
+    }
+  }
+
+  /** Mark this merge aborted. */
+  public synchronized void setAbort() {
+    abort = true;
+    notify();
+  }
+
+  /** Returns true if this merge was aborted. */
+  public synchronized boolean getAbort() {
+    return abort;
+  }
+
+  @Override
+  public long getMinPauseCheckBytes() {
+    return minPauseCheckBytes;
+  }
+}

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java Mon Jan  5 14:28:28 2015
@@ -20,6 +20,8 @@ package org.apache.lucene.index;
 import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.lucene.util.InfoStream;
+
 /** <p>Expert: {@link IndexWriter} uses an instance
  *  implementing this interface to execute the merges
  *  selected by a {@link MergePolicy}.  The default
@@ -46,4 +48,34 @@ public abstract class MergeScheduler imp
   /** Close this MergeScheduler. */
   @Override
   public abstract void close() throws IOException;
+
+  /** For messages about merge scheduling */
+  protected InfoStream infoStream;
+
+  /** IndexWriter calls this on init. */
+  final void setInfoStream(InfoStream infoStream) {
+    this.infoStream = infoStream;
+  }
+
+  /**
+   * Returns true if infoStream messages are enabled. This method is usually used in
+   * conjunction with {@link #message(String)}:
+   * 
+   * <pre class="prettyprint">
+   * if (verbose()) {
+   *   message(&quot;your message&quot;);
+   * }
+   * </pre>
+   */
+  protected boolean verbose() {
+    return infoStream != null && infoStream.isEnabled("MS");
+  }
+ 
+  /**
+   * Outputs the given message - this method assumes {@link #verbose()} was
+   * called and returned true.
+   */
+  protected void message(String message) {
+    infoStream.message("MS", message);
+  }
 }