You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2015/01/05 15:28:29 UTC
svn commit: r1649532 [1/2] - in /lucene/dev/trunk: lucene/
lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/
lucene/core/src/java/org/apache/lucene/codecs/
lucene/core/src/java/org/apache/lucene/codecs/compressing/
lucene/core/src/java/org/ap...
Author: mikemccand
Date: Mon Jan 5 14:28:28 2015
New Revision: 1649532
URL: http://svn.apache.org/r1649532
Log:
LUCENE-6119: CMS dynamically rate limits IO writes of each merge depending on incoming merge rate
Added:
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java (with props)
Removed:
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimitedDirectoryWrapper.java
lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/store/TestRateLimitedDirectoryWrapper.java
Modified:
lucene/dev/trunk/lucene/CHANGES.txt
lucene/dev/trunk/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextCompoundFormat.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/CompoundFormat.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/StoredFieldsWriter.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/TermVectorsWriter.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingTermVectorsWriter.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/lucene50/Lucene50CompoundFormat.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsAndPositionsEnum.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsEnum.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergeState.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/FilterDirectory.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/store/RateLimiter.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/util/PrintStreamInfoStream.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/util/StringHelper.java
lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/TestMergeSchedulerExternal.java
lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDoc.java
lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexFileDeleter.java
lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java
lucene/dev/trunk/lucene/sandbox/src/test/org/apache/lucene/codecs/idversion/TestIDVersionPostingsFormat.java
lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/codecs/cranky/CrankyCompoundFormat.java
lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/index/BaseCompoundFormatTestCase.java
lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/index/SuppressingConcurrentMergeScheduler.java
lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/StandardDirectoryFactory.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/BasicFunctionalityTest.java
lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/core/MockDirectoryFactory.java
lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/core/MockFSDirectoryFactory.java
Modified: lucene/dev/trunk/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/CHANGES.txt?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/CHANGES.txt (original)
+++ lucene/dev/trunk/lucene/CHANGES.txt Mon Jan 5 14:28:28 2015
@@ -130,6 +130,10 @@ New Features
* LUCENE-5914: Add an option to Lucene50Codec to support either BEST_SPEED
or BEST_COMPRESSION for stored fields. (Adrien Grand, Robert Muir)
+* LUCENE-6119: Add auto-IO-throttling to ConcurrentMergeScheduler, to
+ rate limit IO writes for each merge depending on incoming merge
+ rate. (Mike McCandless)
+
Optimizations
* LUCENE-5960: Use a more efficient bitset, not a Set<Integer>, to
Modified: lucene/dev/trunk/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextCompoundFormat.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextCompoundFormat.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextCompoundFormat.java (original)
+++ lucene/dev/trunk/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextCompoundFormat.java Mon Jan 5 14:28:28 2015
@@ -29,7 +29,6 @@ import java.util.Locale;
import org.apache.lucene.codecs.CompoundFormat;
import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.MergeState.CheckAbort;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.Directory;
@@ -157,7 +156,7 @@ public class SimpleTextCompoundFormat ex
}
@Override
- public void write(Directory dir, SegmentInfo si, Collection<String> files, CheckAbort checkAbort, IOContext context) throws IOException {
+ public void write(Directory dir, SegmentInfo si, Collection<String> files, IOContext context) throws IOException {
String dataFile = IndexFileNames.segmentFileName(si.name, "", DATA_EXTENSION);
int numFiles = files.size();
@@ -181,8 +180,6 @@ public class SimpleTextCompoundFormat ex
out.copyBytes(in, in.length());
}
endOffsets[i] = out.getFilePointer();
-
- checkAbort.work(endOffsets[i] - startOffsets[i]);
}
long tocPos = out.getFilePointer();
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/CompoundFormat.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/CompoundFormat.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/CompoundFormat.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/CompoundFormat.java Mon Jan 5 14:28:28 2015
@@ -20,7 +20,6 @@ package org.apache.lucene.codecs;
import java.io.IOException;
import java.util.Collection;
-import org.apache.lucene.index.MergeState.CheckAbort;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
@@ -47,8 +46,7 @@ public abstract class CompoundFormat {
/**
* Packs the provided files into a compound format.
*/
- // TODO: get checkAbort out of here, and everywhere, and have iw do it at a higher level
- public abstract void write(Directory dir, SegmentInfo si, Collection<String> files, CheckAbort checkAbort, IOContext context) throws IOException;
+ public abstract void write(Directory dir, SegmentInfo si, Collection<String> files, IOContext context) throws IOException;
/**
* Returns the compound file names used by this segment.
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/StoredFieldsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/StoredFieldsWriter.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/StoredFieldsWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/StoredFieldsWriter.java Mon Jan 5 14:28:28 2015
@@ -96,7 +96,6 @@ public abstract class StoredFieldsWriter
storedFieldsReader.visitDocument(docID, visitor);
finishDocument();
docCount++;
- mergeState.checkAbort.work(300);
}
}
finish(mergeState.mergeFieldInfos, docCount);
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/TermVectorsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/TermVectorsWriter.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/TermVectorsWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/TermVectorsWriter.java Mon Jan 5 14:28:28 2015
@@ -196,7 +196,6 @@ public abstract class TermVectorsWriter
}
addAllDocVectors(vectors, mergeState);
docCount++;
- mergeState.checkAbort.work(300);
}
}
finish(mergeState.mergeFieldInfos, docCount);
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java Mon Jan 5 14:28:28 2015
@@ -506,7 +506,6 @@ public final class CompressingStoredFiel
storedFieldsReader.visitDocument(docID, visitor);
finishDocument();
++docCount;
- mergeState.checkAbort.work(300);
}
} else {
// optimized merge, we copy serialized (but decompressed) bytes directly
@@ -522,7 +521,6 @@ public final class CompressingStoredFiel
numStoredFieldsInDoc = doc.numStoredFields;
finishDocument();
++docCount;
- mergeState.checkAbort.work(300);
}
}
}
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingTermVectorsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingTermVectorsWriter.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingTermVectorsWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingTermVectorsWriter.java Mon Jan 5 14:28:28 2015
@@ -751,7 +751,6 @@ public final class CompressingTermVector
}
addAllDocVectors(vectors, mergeState);
++docCount;
- mergeState.checkAbort.work(300);
}
} else {
final CompressingStoredFieldsIndexReader index = matchingVectorsReader.getIndex();
@@ -781,7 +780,6 @@ public final class CompressingTermVector
this.vectorsStream.copyBytes(vectorsStream, chunkLength);
docCount += chunkDocs;
this.numDocs += chunkDocs;
- mergeState.checkAbort.work(300 * chunkDocs);
i = nextLiveDoc(docBase + chunkDocs, liveDocs, maxDoc);
} else {
for (; i < docBase + chunkDocs; i = nextLiveDoc(i + 1, liveDocs, maxDoc)) {
@@ -793,7 +791,6 @@ public final class CompressingTermVector
}
addAllDocVectors(vectors, mergeState);
++docCount;
- mergeState.checkAbort.work(300);
}
}
} else {
@@ -805,7 +802,6 @@ public final class CompressingTermVector
}
addAllDocVectors(vectors, mergeState);
++docCount;
- mergeState.checkAbort.work(300);
i = nextLiveDoc(i + 1, liveDocs, maxDoc);
}
}
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/lucene50/Lucene50CompoundFormat.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/lucene50/Lucene50CompoundFormat.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/lucene50/Lucene50CompoundFormat.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/lucene50/Lucene50CompoundFormat.java Mon Jan 5 14:28:28 2015
@@ -23,7 +23,6 @@ import java.util.Collection;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.CompoundFormat;
import org.apache.lucene.index.IndexFileNames;
-import org.apache.lucene.index.MergeState.CheckAbort;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.Directory;
@@ -73,7 +72,7 @@ public final class Lucene50CompoundForma
}
@Override
- public void write(Directory dir, SegmentInfo si, Collection<String> files, CheckAbort checkAbort, IOContext context) throws IOException {
+ public void write(Directory dir, SegmentInfo si, Collection<String> files, IOContext context) throws IOException {
String dataFile = IndexFileNames.segmentFileName(si.name, "", DATA_EXTENSION);
String entriesFile = IndexFileNames.segmentFileName(si.name, "", ENTRIES_EXTENSION);
@@ -99,8 +98,6 @@ public final class Lucene50CompoundForma
entries.writeString(IndexFileNames.stripSegmentName(file));
entries.writeLong(startOffset);
entries.writeLong(length);
-
- checkAbort.work(length);
}
CodecUtil.writeFooter(data);
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java Mon Jan 5 14:28:28 2015
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Locale;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -173,16 +174,19 @@ class BufferedUpdatesStream implements A
Collections.sort(infos2, sortSegInfoByDelGen);
CoalescedUpdates coalescedDeletes = null;
- boolean anyNewDeletes = false;
int infosIDX = infos2.size()-1;
int delIDX = updates.size()-1;
+ long totDelCount = 0;
+ long totTermVisitedCount = 0;
+
List<SegmentCommitInfo> allDeleted = null;
while (infosIDX >= 0) {
//System.out.println("BD: cycle delIDX=" + delIDX + " infoIDX=" + infosIDX);
+ final long segStartNS = System.nanoTime();
final FrozenBufferedUpdates packet = delIDX >= 0 ? updates.get(delIDX) : null;
final SegmentCommitInfo info = infos2.get(infosIDX);
final long segGen = info.getBufferedDeletesGen();
@@ -213,12 +217,14 @@ class BufferedUpdatesStream implements A
final ReadersAndUpdates rld = readerPool.get(info, true);
final SegmentReader reader = rld.getReader(IOContext.READ);
int delCount = 0;
+ long termVisitedCount = 0;
final boolean segAllDeletes;
try {
final DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container();
if (coalescedDeletes != null) {
- //System.out.println(" del coalesced");
- delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader);
+ TermDeleteCounts counts = applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader);
+ delCount += counts.delCount;
+ termVisitedCount += counts.termVisitedCount;
delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader);
applyDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, dvUpdates);
applyDocValuesUpdates(coalescedDeletes.binaryDVUpdates, rld, reader, dvUpdates);
@@ -239,7 +245,8 @@ class BufferedUpdatesStream implements A
rld.release(reader);
readerPool.release(rld);
}
- anyNewDeletes |= delCount > 0;
+ totDelCount += delCount;
+ totTermVisitedCount += termVisitedCount;
if (segAllDeletes) {
if (allDeleted == null) {
@@ -249,7 +256,7 @@ class BufferedUpdatesStream implements A
}
if (infoStream.isEnabled("BD")) {
- infoStream.message("BD", "seg=" + info + " segGen=" + segGen + " segDeletes=[" + packet + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] newDelCount=" + delCount + (segAllDeletes ? " 100% deleted" : ""));
+ infoStream.message("BD", String.format(Locale.ROOT, "%.3fs", ((System.nanoTime() - segStartNS)/1000000000.0)) + " seg=" + info + " segGen=" + segGen + " segDeletes=[" + packet + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] newDelCount=" + delCount + " termVisitedCount=" + termVisitedCount + (segAllDeletes ? " 100% deleted" : ""));
}
if (coalescedDeletes == null) {
@@ -274,9 +281,12 @@ class BufferedUpdatesStream implements A
final ReadersAndUpdates rld = readerPool.get(info, true);
final SegmentReader reader = rld.getReader(IOContext.READ);
int delCount = 0;
+ long termVisitedCount = 0;
final boolean segAllDeletes;
try {
- delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader);
+ TermDeleteCounts counts = applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader);
+ delCount += counts.delCount;
+ termVisitedCount += counts.termVisitedCount;
delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader);
DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container();
applyDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, dvUpdates);
@@ -291,7 +301,9 @@ class BufferedUpdatesStream implements A
rld.release(reader);
readerPool.release(rld);
}
- anyNewDeletes |= delCount > 0;
+
+ totDelCount += delCount;
+ totTermVisitedCount += termVisitedCount;
if (segAllDeletes) {
if (allDeleted == null) {
@@ -301,7 +313,7 @@ class BufferedUpdatesStream implements A
}
if (infoStream.isEnabled("BD")) {
- infoStream.message("BD", "seg=" + info + " segGen=" + segGen + " coalesced deletes=[" + coalescedDeletes + "] newDelCount=" + delCount + (segAllDeletes ? " 100% deleted" : ""));
+ infoStream.message("BD", String.format(Locale.ROOT, "%.3fs", ((System.nanoTime() - segStartNS)/1000000000.0)) + " seg=" + info + " segGen=" + segGen + " coalesced deletes=[" + coalescedDeletes + "] newDelCount=" + delCount + " termVisitedCount=" + termVisitedCount + (segAllDeletes ? " 100% deleted" : ""));
}
}
info.setBufferedDeletesGen(gen);
@@ -312,11 +324,11 @@ class BufferedUpdatesStream implements A
assert checkDeleteStats();
if (infoStream.isEnabled("BD")) {
- infoStream.message("BD", "applyDeletes took " + (System.currentTimeMillis()-t0) + " msec");
+ infoStream.message("BD", "applyDeletes took " + (System.currentTimeMillis()-t0) + " msec for " + infos.size() + " segments, " + totDelCount + " deleted docs, " + totTermVisitedCount + " visited terms");
}
// assert infos != segmentInfos || !any() : "infos=" + infos + " segmentInfos=" + segmentInfos + " any=" + any;
- return new ApplyDeletesResult(anyNewDeletes, gen, allDeleted);
+ return new ApplyDeletesResult(totDelCount > 0, gen, allDeleted);
}
synchronized long getNextGen() {
@@ -374,9 +386,23 @@ class BufferedUpdatesStream implements A
}
}
+ private static class TermDeleteCounts {
+ /** How many documents were actually deleted. */
+ public final int delCount;
+
+ /** How many terms we checked. */
+ public final long termVisitedCount;
+
+ public TermDeleteCounts(int delCount, long termVisitedCount) {
+ this.delCount = delCount;
+ this.termVisitedCount = termVisitedCount;
+ }
+ }
+
// Delete by Term
- private synchronized long applyTermDeletes(Iterable<Term> termsIter, ReadersAndUpdates rld, SegmentReader reader) throws IOException {
- long delCount = 0;
+ private synchronized TermDeleteCounts applyTermDeletes(Iterable<Term> termsIter, ReadersAndUpdates rld, SegmentReader reader) throws IOException {
+ int delCount = 0;
+ long termVisitedCount = 0;
Fields fields = reader.fields();
TermsEnum termsEnum = null;
@@ -388,8 +414,10 @@ class BufferedUpdatesStream implements A
boolean any = false;
- //System.out.println(Thread.currentThread().getName() + " del terms reader=" + reader);
+ long ns = System.nanoTime();
+
for (Term term : termsIter) {
+ termVisitedCount++;
// Since we visit terms sorted, we gain performance
// by re-using the same TermsEnum and seeking only
// forwards
@@ -440,7 +468,7 @@ class BufferedUpdatesStream implements A
}
}
- return delCount;
+ return new TermDeleteCounts(delCount, termVisitedCount);
}
// DocValues updates
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java Mon Jan 5 14:28:28 2015
@@ -18,15 +18,15 @@ package org.apache.lucene.index;
*/
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.lucene.search.Query;
import org.apache.lucene.index.BufferedUpdatesStream.QueryAndLimit;
import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
+import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.MergedIterator;
@@ -35,16 +35,19 @@ class CoalescedUpdates {
final List<Iterable<Term>> iterables = new ArrayList<>();
final List<NumericDocValuesUpdate> numericDVUpdates = new ArrayList<>();
final List<BinaryDocValuesUpdate> binaryDVUpdates = new ArrayList<>();
+ int totalTermCount;
@Override
public String toString() {
// note: we could add/collect more debugging information
- return "CoalescedUpdates(termSets=" + iterables.size() + ",queries="
- + queries.size() + ",numericDVUpdates=" + numericDVUpdates.size()
- + ",binaryDVUpdates=" + binaryDVUpdates.size() + ")";
+ return "CoalescedUpdates(termSets=" + iterables.size()
+ + ",totalTermCount=" + totalTermCount
+ + ",queries=" + queries.size() + ",numericDVUpdates=" + numericDVUpdates.size()
+ + ",binaryDVUpdates=" + binaryDVUpdates.size() + ")";
}
void update(FrozenBufferedUpdates in) {
+ totalTermCount += in.termCount;
iterables.add(in.termsIterable());
for (int queryIdx = 0; queryIdx < in.queries.length; queryIdx++) {
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java Mon Jan 5 14:28:28 2015
@@ -19,9 +19,11 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Comparator;
import java.util.List;
+import java.util.Locale;
+import org.apache.lucene.index.MergePolicy.OneMerge;
+import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.IOUtils;
@@ -54,18 +56,17 @@ import org.apache.lucene.util.ThreadInte
* settings for spinning or solid state disks for such
* operating systems, use {@link #setDefaultMaxMergesAndThreads(boolean)}.
*/
+
public class ConcurrentMergeScheduler extends MergeScheduler {
/** Dynamic default for {@code maxThreadCount} and {@code maxMergeCount},
* used to detect whether the index is backed by an SSD or rotational disk and
* set {@code maxThreadCount} accordingly. If it's an SSD,
- * {@code maxThreadCount} is set to {@code max(1, min(3, cpuCoreCount/2))},
+ * {@code maxThreadCount} is set to {@code max(1, min(4, cpuCoreCount/2))},
* otherwise 1. Note that detection only currently works on
* Linux; other platforms will assume the index is not on an SSD. */
public static final int AUTO_DETECT_MERGES_AND_THREADS = -1;
- private int mergeThreadPriority = -1;
-
/** List of currently active {@link MergeThread}s. */
protected final List<MergeThread> mergeThreads = new ArrayList<>();
@@ -81,16 +82,27 @@ public class ConcurrentMergeScheduler ex
// throttling the incoming threads
private int maxMergeCount = AUTO_DETECT_MERGES_AND_THREADS;
- /** {@link Directory} that holds the index. */
- protected Directory dir;
-
- /** {@link IndexWriter} that owns this instance. */
- protected IndexWriter writer;
-
/** How many {@link MergeThread}s have kicked off (this is use
* to name them). */
protected int mergeThreadCount;
+ /** Floor for IO write rate limit (we will never go any lower than this) */
+ private static final double MIN_MERGE_MB_PER_SEC = 5.0;
+
+ /** Initial value for IO write rate limit when doAutoIOThrottle is true */
+ private static final double START_MB_PER_SEC = 20.0;
+
+ /** Merges below this size are not counted in the maxThreadCount, i.e. they can freely run in their own thread (up until maxMergeCount). */
+ private static final double MIN_BIG_MERGE_MB = 50.0;
+
+ /** Current IO writes throttle rate */
+ protected double targetMBPerSec = START_MB_PER_SEC;
+
+ /** true if we should rate-limit writes for each merge */
+ private boolean doAutoIOThrottle = true;
+
+ private double forceMergeMBPerSec = Double.POSITIVE_INFINITY;
+
/** Sole constructor, with all settings set to default
* values. */
public ConcurrentMergeScheduler() {
@@ -142,10 +154,48 @@ public class ConcurrentMergeScheduler ex
public synchronized void setDefaultMaxMergesAndThreads(boolean spins) {
if (spins) {
maxThreadCount = 1;
- maxMergeCount = 2;
+ maxMergeCount = 6;
+ } else {
+ maxThreadCount = Math.max(1, Math.min(4, Runtime.getRuntime().availableProcessors()/2));
+ maxMergeCount = maxThreadCount+5;
+ }
+ }
+
+ /** Set the per-merge IO throttle rate for forced merges (default: {@code Double.POSITIVE_INFINITY}). */
+ public synchronized void setForceMergeMBPerSec(double v) {
+ forceMergeMBPerSec = v;
+ updateMergeThreads();
+ }
+
+ /** Get the per-merge IO throttle rate for forced merges. */
+ public synchronized double getForceMergeMBPerSec() {
+ return forceMergeMBPerSec;
+ }
+
+ /** Turn on dynamic IO throttling, to adaptively rate limit writes
+ * bytes/sec to the minimal rate necessary so merges do not fall behind.
+ * By default this is enabled. */
+ public synchronized void enableAutoIOThrottle() {
+ doAutoIOThrottle = true;
+ targetMBPerSec = START_MB_PER_SEC;
+ updateMergeThreads();
+ }
+
+ /** Turn off auto IO throttling.
+ *
+ * @see #enableAutoIOThrottle */
+ public synchronized void disableAutoIOThrottle() {
+ doAutoIOThrottle = false;
+ updateMergeThreads();
+ }
+
+ /** Returns the currently set per-merge IO writes rate limit, if {@link #enableAutoIOThrottle}
+ * was called, else {@code Double.POSITIVE_INFINITY}. */
+ public synchronized double getIORateLimitMBPerSec() {
+ if (doAutoIOThrottle) {
+ return targetMBPerSec;
} else {
- maxThreadCount = Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors()/2));
- maxMergeCount = maxThreadCount+2;
+ return Double.POSITIVE_INFINITY;
}
}
@@ -161,48 +211,18 @@ public class ConcurrentMergeScheduler ex
return maxMergeCount;
}
- /** Return the priority that merge threads run at. By
- * default the priority is 1 plus the priority of (ie,
- * slightly higher priority than) the first thread that
- * calls merge. */
- public synchronized int getMergeThreadPriority() {
- initMergeThreadPriority();
- return mergeThreadPriority;
- }
-
- /** Set the base priority that merge threads run at.
- * Note that CMS may increase priority of some merge
- * threads beyond this base priority. It's best not to
- * set this any higher than
- * Thread.MAX_PRIORITY-maxThreadCount, so that CMS has
- * room to set relative priority among threads. */
- public synchronized void setMergeThreadPriority(int pri) {
- if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY)
- throw new IllegalArgumentException("priority must be in range " + Thread.MIN_PRIORITY + " .. " + Thread.MAX_PRIORITY + " inclusive");
- mergeThreadPriority = pri;
- updateMergeThreads();
+ synchronized void removeMergeThread(MergeThread thread) {
+ boolean result = mergeThreads.remove(thread);
+ assert result;
}
- /** Sorts {@link MergeThread}s; larger merges come first. */
- protected static final Comparator<MergeThread> compareByMergeDocCount = new Comparator<MergeThread>() {
- @Override
- public int compare(MergeThread t1, MergeThread t2) {
- final MergePolicy.OneMerge m1 = t1.getCurrentMerge();
- final MergePolicy.OneMerge m2 = t2.getCurrentMerge();
-
- final int c1 = m1 == null ? Integer.MAX_VALUE : m1.totalDocCount;
- final int c2 = m2 == null ? Integer.MAX_VALUE : m2.totalDocCount;
-
- return c2 - c1;
- }
- };
-
/**
- * Called whenever the running merges have changed, to pause and unpause
- * threads. This method sorts the merge threads by their merge size in
+ * Called whenever the running merges have changed, to set merge IO limits.
+ * This method sorts the merge threads by their merge size in
* descending order and then pauses/unpauses threads from first to last --
* that way, smaller merges are guaranteed to run before larger ones.
*/
+
protected synchronized void updateMergeThreads() {
// Only look at threads that are alive & not in the
@@ -217,93 +237,121 @@ public class ConcurrentMergeScheduler ex
mergeThreads.remove(threadIdx);
continue;
}
- if (mergeThread.getCurrentMerge() != null) {
- activeMerges.add(mergeThread);
- }
+ activeMerges.add(mergeThread);
threadIdx++;
}
- // Sort the merge threads in descending order.
- CollectionUtil.timSort(activeMerges, compareByMergeDocCount);
-
- int pri = mergeThreadPriority;
+ // Sort the merge threads, largest first:
+ CollectionUtil.timSort(activeMerges);
+
final int activeMergeCount = activeMerges.size();
- for (threadIdx=0;threadIdx<activeMergeCount;threadIdx++) {
- final MergeThread mergeThread = activeMerges.get(threadIdx);
- final MergePolicy.OneMerge merge = mergeThread.getCurrentMerge();
- if (merge == null) {
- continue;
+
+ int bigMergeCount = 0;
+
+ for (threadIdx=activeMergeCount-1;threadIdx>=0;threadIdx--) {
+ MergeThread mergeThread = activeMerges.get(threadIdx);
+ if (mergeThread.merge.estimatedMergeBytes > MIN_BIG_MERGE_MB*1024*1024) {
+ bigMergeCount = 1+threadIdx;
+ break;
}
+ }
+
+ long now = System.nanoTime();
+
+ StringBuilder message;
+ if (verbose()) {
+ message = new StringBuilder();
+ message.append(String.format(Locale.ROOT, "updateMergeThreads ioThrottle=%s targetMBPerSec=%.1f MB/sec", doAutoIOThrottle, targetMBPerSec));
+ } else {
+ message = null;
+ }
+
+ for (threadIdx=0;threadIdx<activeMergeCount;threadIdx++) {
+ MergeThread mergeThread = activeMerges.get(threadIdx);
+
+ OneMerge merge = mergeThread.merge;
// pause the thread if maxThreadCount is smaller than the number of merge threads.
- final boolean doPause = threadIdx < activeMergeCount - maxThreadCount;
+ final boolean doPause = threadIdx < bigMergeCount - maxThreadCount;
+
+ double newMBPerSec;
+ if (doPause) {
+ newMBPerSec = 0.0;
+ } else if (merge.maxNumSegments != -1) {
+ newMBPerSec = forceMergeMBPerSec;
+ } else if (doAutoIOThrottle == false) {
+ newMBPerSec = Double.POSITIVE_INFINITY;
+ } else if (merge.estimatedMergeBytes < MIN_BIG_MERGE_MB*1024*1024) {
+ // Don't rate limit small merges:
+ newMBPerSec = Double.POSITIVE_INFINITY;
+ } else {
+ newMBPerSec = targetMBPerSec;
+ }
+ double curMBPerSec = merge.rateLimiter.getMBPerSec();
+
if (verbose()) {
- if (doPause != merge.getPause()) {
- if (doPause) {
- message("pause thread " + mergeThread.getName());
+ long mergeStartNS = merge.mergeStartNS;
+ if (mergeStartNS == -1) {
+ // IndexWriter didn't start the merge yet:
+ mergeStartNS = now;
+ }
+ message.append('\n');
+ message.append(String.format(Locale.ROOT, "merge thread %s estSize=%.1f MB (written=%.1f MB) runTime=%.1fs (stopped=%.1fs, paused=%.1fs) rate=%s\n",
+ mergeThread.getName(),
+ bytesToMB(merge.estimatedMergeBytes),
+ bytesToMB(merge.rateLimiter.totalBytesWritten),
+ nsToSec(now - mergeStartNS),
+ nsToSec(merge.rateLimiter.getTotalStoppedNS()),
+ nsToSec(merge.rateLimiter.getTotalPausedNS()),
+ rateToString(merge.rateLimiter.getMBPerSec())));
+
+ if (newMBPerSec != curMBPerSec) {
+ if (newMBPerSec == 0.0) {
+ message.append(" now stop");
+ } else if (curMBPerSec == 0.0) {
+ if (newMBPerSec == Double.POSITIVE_INFINITY) {
+ message.append(" now resume");
+ } else {
+ message.append(String.format(Locale.ROOT, " now resume to %.1f MB/sec", newMBPerSec));
+ }
} else {
- message("unpause thread " + mergeThread.getName());
+ message.append(String.format(Locale.ROOT, " now change from %.1f MB/sec to %.1f MB/sec", curMBPerSec, newMBPerSec));
}
+ } else if (curMBPerSec == 0.0) {
+ message.append(" leave stopped");
+ } else {
+ message.append(String.format(Locale.ROOT, " leave running at %.1f MB/sec", curMBPerSec));
}
}
- if (doPause != merge.getPause()) {
- merge.setPause(doPause);
- }
- if (!doPause) {
- if (verbose()) {
- message("set priority of merge thread " + mergeThread.getName() + " to " + pri);
- }
- mergeThread.setThreadPriority(pri);
- pri = Math.min(Thread.MAX_PRIORITY, 1+pri);
- }
+ merge.rateLimiter.setMBPerSec(newMBPerSec);
}
- }
-
- /**
- * Returns true if verbosing is enabled. This method is usually used in
- * conjunction with {@link #message(String)}, like that:
- *
- * <pre class="prettyprint">
- * if (verbose()) {
- * message("your message");
- * }
- * </pre>
- */
- protected boolean verbose() {
- return writer != null && writer.infoStream.isEnabled("CMS");
- }
-
- /**
- * Outputs the given message - this method assumes {@link #verbose()} was
- * called and returned true.
- */
- protected void message(String message) {
- writer.infoStream.message("CMS", message);
- }
-
- private synchronized void initMergeThreadPriority() {
- if (mergeThreadPriority == -1) {
- // Default to slightly higher priority than our
- // calling thread
- mergeThreadPriority = 1+Thread.currentThread().getPriority();
- if (mergeThreadPriority > Thread.MAX_PRIORITY)
- mergeThreadPriority = Thread.MAX_PRIORITY;
+ if (verbose()) {
+ message(message.toString());
}
}
- private synchronized void initMaxMergesAndThreads() throws IOException {
+ private synchronized void initDynamicDefaults(IndexWriter writer) throws IOException {
if (maxThreadCount == AUTO_DETECT_MERGES_AND_THREADS) {
- assert writer != null;
boolean spins = IOUtils.spins(writer.getDirectory());
setDefaultMaxMergesAndThreads(spins);
if (verbose()) {
- message("initMaxMergesAndThreads spins=" + spins + " maxThreadCount=" + maxThreadCount + " maxMergeCount=" + maxMergeCount);
+ message("initDynamicDefaults spins=" + spins + " maxThreadCount=" + maxThreadCount + " maxMergeCount=" + maxMergeCount);
}
}
}
+ private static String rateToString(double mbPerSec) {
+ if (mbPerSec == 0.0) {
+ return "stopped";
+ } else if (mbPerSec == Double.POSITIVE_INFINITY) {
+ return "unlimited";
+ } else {
+ return String.format(Locale.ROOT, "%.1f MB/sec", mbPerSec);
+ }
+ }
+
@Override
public void close() {
sync();
@@ -346,12 +394,9 @@ public class ConcurrentMergeScheduler ex
*/
protected synchronized int mergeThreadCount() {
int count = 0;
- for (MergeThread mt : mergeThreads) {
- if (mt.isAlive()) {
- MergePolicy.OneMerge merge = mt.getCurrentMerge();
- if (merge != null && merge.isAborted() == false) {
- count++;
- }
+ for (MergeThread mergeThread : mergeThreads) {
+ if (mergeThread.isAlive() && mergeThread.merge.rateLimiter.getAbort() == false) {
+ count++;
}
}
return count;
@@ -362,12 +407,7 @@ public class ConcurrentMergeScheduler ex
assert !Thread.holdsLock(writer);
- this.writer = writer;
-
- initMergeThreadPriority();
- initMaxMergesAndThreads();
-
- dir = writer.getDirectory();
+ initDynamicDefaults(writer);
// First, quickly run through the newly proposed merges
// and add any orthogonal merges (ie a merge not
@@ -385,9 +425,9 @@ public class ConcurrentMergeScheduler ex
// pending merges, until it's empty:
while (true) {
- maybeStall();
+ maybeStall(writer);
- MergePolicy.OneMerge merge = writer.getNextMerge();
+ OneMerge merge = writer.getNextMerge();
if (merge == null) {
if (verbose()) {
message(" no more merges pending; now return");
@@ -395,6 +435,8 @@ public class ConcurrentMergeScheduler ex
return;
}
+ updateIOThrottle(merge);
+
boolean success = false;
try {
if (verbose()) {
@@ -405,17 +447,13 @@ public class ConcurrentMergeScheduler ex
// merge:
final MergeThread merger = getMergeThread(writer, merge);
mergeThreads.add(merger);
+
if (verbose()) {
message(" launch new thread [" + merger.getName() + "]");
}
merger.start();
- // Must call this after starting the thread else
- // the new thread is removed from mergeThreads
- // (since it's not alive yet):
- updateMergeThreads();
-
success = true;
} finally {
if (!success) {
@@ -433,7 +471,7 @@ public class ConcurrentMergeScheduler ex
* as limiting how many threads are allowed to index, can do nothing
* here and throttle elsewhere. */
- protected synchronized void maybeStall() {
+ protected synchronized void maybeStall(IndexWriter writer) {
long startStallTime = 0;
while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) {
// This means merging has fallen too far behind: we
@@ -465,127 +503,78 @@ public class ConcurrentMergeScheduler ex
}
/** Does the actual merge, by calling {@link IndexWriter#merge} */
- protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
+ protected void doMerge(IndexWriter writer, OneMerge merge) throws IOException {
writer.merge(merge);
}
/** Create and return a new MergeThread */
- protected synchronized MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
+ protected synchronized MergeThread getMergeThread(IndexWriter writer, OneMerge merge) throws IOException {
final MergeThread thread = new MergeThread(writer, merge);
- thread.setThreadPriority(mergeThreadPriority);
thread.setDaemon(true);
thread.setName("Lucene Merge Thread #" + mergeThreadCount++);
return thread;
}
- /** Runs a merge thread, which may run one or more merges
- * in sequence. */
- protected class MergeThread extends Thread {
-
- IndexWriter tWriter;
- MergePolicy.OneMerge startMerge;
- MergePolicy.OneMerge runningMerge;
- private volatile boolean done;
-
- /** Sole constructor. */
- public MergeThread(IndexWriter writer, MergePolicy.OneMerge startMerge) {
- this.tWriter = writer;
- this.startMerge = startMerge;
- }
-
- /** Record the currently running merge. */
- public synchronized void setRunningMerge(MergePolicy.OneMerge merge) {
- runningMerge = merge;
- }
+ /** Runs a merge thread to execute a single merge, then exits. */
+ protected class MergeThread extends Thread implements Comparable<MergeThread> {
- /** Return the currently running merge. */
- public synchronized MergePolicy.OneMerge getRunningMerge() {
- return runningMerge;
- }
+ final IndexWriter writer;
+ final OneMerge merge;
- /** Return the current merge, or null if this {@code
- * MergeThread} is done. */
- public synchronized MergePolicy.OneMerge getCurrentMerge() {
- if (done) {
- return null;
- } else if (runningMerge != null) {
- return runningMerge;
- } else {
- return startMerge;
- }
+ /** Sole constructor. */
+ public MergeThread(IndexWriter writer, OneMerge merge) {
+ this.writer = writer;
+ this.merge = merge;
}
-
- /** Set the priority of this thread. */
- public void setThreadPriority(int pri) {
- try {
- setPriority(pri);
- } catch (NullPointerException npe) {
- // Strangely, Sun's JDK 1.5 on Linux sometimes
- // throws NPE out of here...
- } catch (SecurityException se) {
- // Ignore this because we will still run fine with
- // normal thread priority
- }
+
+ @Override
+ public int compareTo(MergeThread other) {
+ // Larger merges sort first:
+ return Long.compare(other.merge.estimatedMergeBytes, merge.estimatedMergeBytes);
}
@Override
public void run() {
-
- // First time through the while loop we do the merge
- // that we were started with:
- MergePolicy.OneMerge merge = this.startMerge;
-
+
try {
if (verbose()) {
message(" merge thread: start");
}
- while(true) {
- setRunningMerge(merge);
- doMerge(merge);
-
- // Subsequent times through the loop we do any new
- // merge that writer says is necessary:
- merge = tWriter.getNextMerge();
-
- // Notify here in case any threads were stalled;
- // they will notice that the pending merge has
- // been pulled and possibly resume:
- synchronized(ConcurrentMergeScheduler.this) {
- ConcurrentMergeScheduler.this.notifyAll();
- }
-
- if (merge != null) {
- updateMergeThreads();
- if (verbose()) {
- message(" merge thread: do another merge " + tWriter.segString(merge.segments));
- }
- } else {
- break;
- }
- }
+ doMerge(writer, merge);
if (verbose()) {
message(" merge thread: done");
}
+ removeMergeThread(this);
+
+ // Let CMS run new merges if necessary:
+ try {
+ merge(writer, MergeTrigger.MERGE_FINISHED, true);
+ } catch (AlreadyClosedException ace) {
+ // OK
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+
} catch (Throwable exc) {
- // Ignore the exception if it was due to abort:
- if (!(exc instanceof MergePolicy.MergeAbortedException)) {
- //System.out.println(Thread.currentThread().getName() + ": CMS: exc");
- //exc.printStackTrace(System.out);
- if (!suppressExceptions) {
- // suppressExceptions is normally only set during
- // testing.
- handleMergeException(exc);
- }
+ if (exc instanceof MergePolicy.MergeAbortedException) {
+ // OK to ignore
+ } else if (suppressExceptions == false) {
+ // suppressExceptions is normally only set during
+ // testing.
+ handleMergeException(writer.getDirectory(), exc);
}
+
} finally {
- done = true;
synchronized(ConcurrentMergeScheduler.this) {
updateMergeThreads();
+
+ // In case we had stalled indexing, we can now wake up
+ // and possibly unstall:
ConcurrentMergeScheduler.this.notifyAll();
}
}
@@ -594,7 +583,7 @@ public class ConcurrentMergeScheduler ex
/** Called when an exception is hit in a background merge
* thread */
- protected void handleMergeException(Throwable exc) {
+ protected void handleMergeException(Directory dir, Throwable exc) {
try {
// When an exception is hit during merge, IndexWriter
// removes any partial files and then allows another
@@ -606,6 +595,7 @@ public class ConcurrentMergeScheduler ex
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
+
throw new MergePolicy.MergeException(exc, dir);
}
@@ -626,7 +616,115 @@ public class ConcurrentMergeScheduler ex
StringBuilder sb = new StringBuilder(getClass().getSimpleName() + ": ");
sb.append("maxThreadCount=").append(maxThreadCount).append(", ");
sb.append("maxMergeCount=").append(maxMergeCount).append(", ");
- sb.append("mergeThreadPriority=").append(mergeThreadPriority);
+ sb.append("ioThrottle=").append(doAutoIOThrottle);
return sb.toString();
}
+
+ private boolean isBacklog(long now, OneMerge merge) {
+ double mergeMB = bytesToMB(merge.estimatedMergeBytes);
+ for (MergeThread mergeThread : mergeThreads) {
+ long mergeStartNS = mergeThread.merge.mergeStartNS;
+ if (mergeThread.isAlive() && mergeThread.merge != merge &&
+ mergeStartNS != -1 &&
+ mergeThread.merge.estimatedMergeBytes >= MIN_BIG_MERGE_MB*1024*1024 &&
+ nsToSec(now-mergeStartNS) > 3.0) {
+ double otherMergeMB = bytesToMB(mergeThread.merge.estimatedMergeBytes);
+ double ratio = otherMergeMB / mergeMB;
+ if (ratio > 0.3 && ratio < 3.0) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ /** Tunes IO throttle when a new merge starts. */
+ private synchronized void updateIOThrottle(OneMerge merge) throws IOException {
+ if (doAutoIOThrottle == false) {
+ return;
+ }
+
+ double mergeMB = bytesToMB(merge.estimatedMergeBytes);
+ if (mergeMB < MIN_BIG_MERGE_MB) {
+ // Only watch non-trivial merges for throttling; this is safe because the MP must eventually
+ // have to do larger merges:
+ return;
+ }
+
+ long now = System.nanoTime();
+
+ // Simplistic closed-loop feedback control: if we find any other similarly
+ // sized merges running, then we are falling behind, so we bump up the
+ // IO throttle, else we lower it:
+ boolean newBacklog = isBacklog(now, merge);
+
+ boolean curBacklog = false;
+
+ if (newBacklog == false) {
+ if (mergeThreads.size() > maxThreadCount) {
+ // If there are already more than the maximum merge threads allowed, count that as backlog:
+ curBacklog = true;
+ } else {
+ // Now see if any still-running merges are backlog'd:
+ for (MergeThread mergeThread : mergeThreads) {
+ if (isBacklog(now, mergeThread.merge)) {
+ curBacklog = true;
+ break;
+ }
+ }
+ }
+ }
+
+ double curMBPerSec = targetMBPerSec;
+
+ if (newBacklog) {
+ // This new merge adds to the backlog: increase IO throttle by 20%
+ targetMBPerSec *= 1.20;
+ if (targetMBPerSec > 10000) {
+ targetMBPerSec = 10000;
+ }
+ if (verbose()) {
+ if (curMBPerSec == targetMBPerSec) {
+ message(String.format(Locale.ROOT, "io throttle: new merge backlog; leave IO rate at ceiling %.1f MB/sec", targetMBPerSec));
+ } else {
+ message(String.format(Locale.ROOT, "io throttle: new merge backlog; increase IO rate to %.1f MB/sec", targetMBPerSec));
+ }
+ }
+ } else if (curBacklog) {
+ // We still have an existing backlog; leave the rate as is:
+ if (verbose()) {
+ message(String.format(Locale.ROOT, "io throttle: current merge backlog; leave IO rate at %.1f MB/sec",
+ targetMBPerSec));
+ }
+ } else {
+ // We are not falling behind: decrease IO throttle by 10%
+ targetMBPerSec /= 1.10;
+ if (targetMBPerSec < MIN_MERGE_MB_PER_SEC) {
+ targetMBPerSec = MIN_MERGE_MB_PER_SEC;
+ }
+ if (verbose()) {
+ if (curMBPerSec == targetMBPerSec) {
+ message(String.format(Locale.ROOT, "io throttle: no merge backlog; leave IO rate at floor %.1f MB/sec", targetMBPerSec));
+ } else {
+ message(String.format(Locale.ROOT, "io throttle: no merge backlog; decrease IO rate to %.1f MB/sec", targetMBPerSec));
+ }
+ }
+ }
+
+ targetMBPerSecChanged();
+ updateMergeThreads();
+ }
+
+ /** Subclass can override to tweak targetMBPerSec. */
+ protected void targetMBPerSecChanged() {
+ }
+
+ private static double nsToSec(long ns) {
+ return ns / 1000000000.0;
+ }
+
+ private static double bytesToMB(long bytes) {
+ return bytes/1024./1024.;
+ }
}
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java Mon Jan 5 14:28:28 2015
@@ -20,9 +20,8 @@ package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
-import java.util.HashSet;
+import java.util.Locale;
import java.util.Queue;
-import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
@@ -35,7 +34,6 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Accountable;
-import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
/**
@@ -553,11 +551,13 @@ final class DocumentsWriter implements C
final double ramBufferSizeMB = config.getRAMBufferSizeMB();
if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) {
- if (infoStream.isEnabled("DW")) {
- infoStream.message("DW", "force apply deletes bytesUsed=" + flushControl.getDeleteBytesUsed() + " vs ramBuffer=" + (1024*1024*ramBufferSizeMB));
- }
hasEvents = true;
if (!this.applyAllDeletes(deleteQueue)) {
+ if (infoStream.isEnabled("DW")) {
+ infoStream.message("DW", String.format(Locale.ROOT, "force apply deletes bytesUsed=%.1f MB vs ramBuffer=%.1f MB",
+ flushControl.getDeleteBytesUsed()/(1024.*1024.),
+ ramBufferSizeMB));
+ }
putEvent(ApplyDeletesEvent.INSTANCE);
}
}
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Mon Jan 5 14:28:28 2015
@@ -485,7 +485,7 @@ class DocumentsWriterPerThread {
try {
if (indexWriterConfig.getUseCompoundFile()) {
- filesToDelete.addAll(IndexWriter.createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, newSegment.info, context));
+ filesToDelete.addAll(IndexWriter.createCompoundFile(infoStream, directory, newSegment.info, context));
newSegment.info.setUseCompoundFile(true);
}
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java Mon Jan 5 14:28:28 2015
@@ -47,18 +47,22 @@ import org.apache.lucene.index.DocValues
import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
import org.apache.lucene.index.FieldInfos.FieldNumbers;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
-import org.apache.lucene.index.MergeState.CheckAbort;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.FlushInfo;
import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.MergeInfo;
+import org.apache.lucene.store.RateLimitedIndexOutput;
import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.CloseableThreadLocal;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
@@ -247,6 +251,7 @@ public class IndexWriter implements Clos
volatile Throwable tragedy;
private final Directory directory; // where this index resides
+ private final Directory mergeDirectory; // used for merging
private final Analyzer analyzer; // how to analyze text
private volatile long changeCount; // increments every time a change is completed
@@ -319,6 +324,8 @@ public class IndexWriter implements Clos
* card to make sure they can later charge you when you check out. */
final AtomicLong pendingNumDocs = new AtomicLong();
+ final CloseableThreadLocal<MergeRateLimiter> rateLimiters = new CloseableThreadLocal<>();
+
DirectoryReader getReader() throws IOException {
return getReader(true);
}
@@ -741,10 +748,17 @@ public class IndexWriter implements Clos
public IndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
conf.setIndexWriter(this); // prevent reuse by other instances
config = conf;
+
directory = d;
+
+ // Directory we use for merging, so we can abort running merges, and so
+ // merge schedulers can optionally rate-limit per-merge IO:
+ mergeDirectory = addMergeRateLimiters(d);
+
analyzer = config.getAnalyzer();
infoStream = config.getInfoStream();
mergeScheduler = config.getMergeScheduler();
+ mergeScheduler.setInfoStream(infoStream);
codec = config.getCodec();
bufferedUpdatesStream = new BufferedUpdatesStream(infoStream);
@@ -1696,7 +1710,7 @@ public class IndexWriter implements Clos
for(int i=0;i<size;i++) {
final MergePolicy.OneMerge merge = mergeExceptions.get(i);
if (merge.maxNumSegments != -1) {
- throw new IOException("background merge hit exception: " + merge.segString(directory), merge.getException());
+ throw new IOException("background merge hit exception: " + merge.segString(), merge.getException());
}
}
}
@@ -1786,7 +1800,7 @@ public class IndexWriter implements Clos
}
Throwable t = merge.getException();
if (t != null) {
- throw new IOException("background merge hit exception: " + merge.segString(directory), t);
+ throw new IOException("background merge hit exception: " + merge.segString(), t);
}
}
@@ -1969,6 +1983,8 @@ public class IndexWriter implements Clos
stopMerges = true;
}
+ rateLimiters.close();
+
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "rollback: done finish merges");
}
@@ -2155,7 +2171,7 @@ public class IndexWriter implements Clos
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now abort pending merge " + segString(merge.segments));
}
- merge.abort();
+ merge.rateLimiter.setAbort();
mergeFinish(merge);
}
pendingMerges.clear();
@@ -2164,7 +2180,7 @@ public class IndexWriter implements Clos
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now abort running merge " + segString(merge.segments));
}
- merge.abort();
+ merge.rateLimiter.setAbort();
}
// These merges periodically check whether they have
@@ -2405,7 +2421,7 @@ public class IndexWriter implements Clos
infoStream.message("IW", "addIndexes: process segment origName=" + info.info.name + " newName=" + newSegName + " info=" + info);
}
- IOContext context = new IOContext(new MergeInfo(info.info.getDocCount(), info.sizeInBytes(), true, -1));
+ IOContext context = new IOContext(new FlushInfo(info.info.getDocCount(), info.sizeInBytes()));
FieldInfos fis = readFieldInfos(info);
for(FieldInfo fi : fis) {
@@ -2516,19 +2532,21 @@ public class IndexWriter implements Clos
// exceed the limit:
reserveDocs(numDocs);
- final IOContext context = new IOContext(new MergeInfo(numDocs, -1, true, -1));
+ final IOContext context = new IOContext(new MergeInfo(numDocs, -1, false, -1));
// TODO: somehow we should fix this merge so it's
// abortable so that IW.close(false) is able to stop it
- TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(directory);
+ TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(mergeDirectory);
SegmentInfo info = new SegmentInfo(directory, Version.LATEST, mergedName, -1,
false, codec, null, StringHelper.randomId(), new HashMap<>());
SegmentMerger merger = new SegmentMerger(mergeReaders, info, infoStream, trackingDir,
- MergeState.CheckAbort.NONE, globalFieldNumberMap,
+ globalFieldNumberMap,
context);
+ rateLimiters.set(new MergeRateLimiter(null));
+
if (!merger.shouldMerge()) {
return;
}
@@ -2567,7 +2585,7 @@ public class IndexWriter implements Clos
if (useCompoundFile) {
Collection<String> filesToDelete = infoPerCommit.files();
try {
- createCompoundFile(infoStream, directory, MergeState.CheckAbort.NONE, info, context);
+ createCompoundFile(infoStream, mergeDirectory, info, context);
} finally {
// delete new non cfs files directly: they were never
// registered with IFD
@@ -3040,6 +3058,9 @@ public class IndexWriter implements Clos
final synchronized void applyAllDeletesAndUpdates() throws IOException {
flushDeletesCount.incrementAndGet();
final BufferedUpdatesStream.ApplyDeletesResult result;
+ if (infoStream.isEnabled("IW")) {
+ infoStream.message("IW", "now apply all deletes for all segments maxDoc=" + (docWriter.getNumDocs() + segmentInfos.totalDocCount()));
+ }
result = bufferedUpdatesStream.applyDeletesAndUpdates(readerPool, segmentInfos.asList());
if (result.anyDeletes) {
checkpoint();
@@ -3354,7 +3375,7 @@ public class IndexWriter implements Clos
// deleter.refresh() call that will remove any index
// file that current segments does not reference), we
// abort this merge
- if (merge.isAborted()) {
+ if (merge.rateLimiter.getAbort()) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "commitMerge: skip: it was aborted");
}
@@ -3513,6 +3534,8 @@ public class IndexWriter implements Clos
boolean success = false;
+ rateLimiters.set(merge.rateLimiter);
+
final long t0 = System.currentTimeMillis();
final MergePolicy mergePolicy = config.getMergePolicy();
@@ -3550,7 +3573,7 @@ public class IndexWriter implements Clos
// This merge (and, generally, any change to the
// segments) may now enable new merges, so we call
// merge policy & update pending merges.
- if (success && !merge.isAborted() && (merge.maxNumSegments != -1 || (!closed && !closing))) {
+ if (success && merge.rateLimiter.getAbort() == false && (merge.maxNumSegments != -1 || (!closed && !closing))) {
updatePendingMerges(mergePolicy, MergeTrigger.MERGE_FINISHED, merge.maxNumSegments);
}
}
@@ -3558,7 +3581,7 @@ public class IndexWriter implements Clos
} catch (OutOfMemoryError oom) {
tragicEvent(oom, "merge");
}
- if (merge.info != null && !merge.isAborted()) {
+ if (merge.info != null && merge.rateLimiter.getAbort() == false) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "merge time " + (System.currentTimeMillis()-t0) + " msec for " + merge.info.info.getDocCount() + " docs");
}
@@ -3583,7 +3606,7 @@ public class IndexWriter implements Clos
assert merge.segments.size() > 0;
if (stopMerges) {
- merge.abort();
+ merge.rateLimiter.setAbort();
throw new MergePolicy.MergeAbortedException("merge is aborted: " + segString(merge.segments));
}
@@ -3694,7 +3717,7 @@ public class IndexWriter implements Clos
return;
}
- if (merge.isAborted()) {
+ if (merge.rateLimiter.getAbort()) {
return;
}
@@ -3703,6 +3726,10 @@ public class IndexWriter implements Clos
// and then open them again for merging. Maybe we
// could pre-pool them somehow in that case...
+ if (infoStream.isEnabled("IW")) {
+ infoStream.message("IW", "now apply deletes for " + merge.segments.size() + " merging segments");
+ }
+
// Lock order: IW -> BD
final BufferedUpdatesStream.ApplyDeletesResult result = bufferedUpdatesStream.applyDeletesAndUpdates(readerPool, merge.segments);
@@ -3839,14 +3866,13 @@ public class IndexWriter implements Clos
* instance */
private int mergeMiddle(MergePolicy.OneMerge merge, MergePolicy mergePolicy) throws IOException {
- merge.checkAborted(directory);
+ merge.rateLimiter.checkAbort();
List<SegmentCommitInfo> sourceSegments = merge.segments;
IOContext context = new IOContext(merge.getMergeInfo());
- final MergeState.CheckAbort checkAbort = new MergeState.CheckAbort(merge, directory);
- final TrackingDirectoryWrapper dirWrapper = new TrackingDirectoryWrapper(directory);
+ final TrackingDirectoryWrapper dirWrapper = new TrackingDirectoryWrapper(mergeDirectory);
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "merging " + segString(merge.segments));
@@ -3926,15 +3952,12 @@ public class IndexWriter implements Clos
// OneMerge to return a view over the actual segments to merge
final SegmentMerger merger = new SegmentMerger(merge.getMergeReaders(),
merge.info.info, infoStream, dirWrapper,
- checkAbort, globalFieldNumberMap,
+ globalFieldNumberMap,
context);
- merge.checkAborted(directory);
+ merge.rateLimiter.checkAbort();
- long mergeStartTime = 0;
- if (infoStream.isEnabled("IW")) {
- mergeStartTime = System.nanoTime();
- }
+ merge.mergeStartNS = System.nanoTime();
// This is where all the work happens:
boolean success3 = false;
@@ -3954,13 +3977,13 @@ public class IndexWriter implements Clos
assert mergeState.segmentInfo == merge.info.info;
merge.info.info.setFiles(new HashSet<>(dirWrapper.getCreatedFiles()));
- // Record which codec was used to write the segment
-
if (infoStream.isEnabled("IW")) {
if (merger.shouldMerge()) {
long t1 = System.nanoTime();
- double sec = (t1-mergeStartTime)/1000000000.;
+ double sec = (t1-merge.mergeStartNS)/1000000000.;
double segmentMB = (merge.info.sizeInBytes()/1024./1024.);
+ double stoppedSec = merge.rateLimiter.getTotalStoppedNS()/1000000000.;
+ double throttleSec = merge.rateLimiter.getTotalPausedNS()/1000000000.;
infoStream.message("IW", "merge codec=" + codec + " docCount=" + merge.info.info.getDocCount() + "; merged segment has " +
(mergeState.mergeFieldInfos.hasVectors() ? "vectors" : "no vectors") + "; " +
(mergeState.mergeFieldInfos.hasNorms() ? "norms" : "no norms") + "; " +
@@ -3968,8 +3991,10 @@ public class IndexWriter implements Clos
(mergeState.mergeFieldInfos.hasProx() ? "prox" : "no prox") + "; " +
(mergeState.mergeFieldInfos.hasProx() ? "freqs" : "no freqs") + "; " +
String.format(Locale.ROOT,
- "%d msec to merge segment [%.2f MB, %.2f MB/sec]",
- ((t1-mergeStartTime)/1000000),
+ "%.1f sec (%.1f sec stopped, %.1f sec paused) to merge segment [%.2f MB, %.2f MB/sec]",
+ sec,
+ stoppedSec,
+ throttleSec,
segmentMB,
segmentMB / sec));
} else {
@@ -4002,11 +4027,11 @@ public class IndexWriter implements Clos
Collection<String> filesToRemove = merge.info.files();
try {
- filesToRemove = createCompoundFile(infoStream, directory, checkAbort, merge.info.info, context);
+ filesToRemove = createCompoundFile(infoStream, mergeDirectory, merge.info.info, context);
success = true;
} catch (IOException ioe) {
synchronized(this) {
- if (merge.isAborted()) {
+ if (merge.rateLimiter.getAbort()) {
// This can happen if rollback or close(false)
// is called -- fall through to logic below to
// remove the partially created CFS:
@@ -4042,7 +4067,7 @@ public class IndexWriter implements Clos
// registered with IFD
deleter.deleteNewFiles(filesToRemove);
- if (merge.isAborted()) {
+ if (merge.rateLimiter.getAbort()) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "abort merge after building CFS");
}
@@ -4496,7 +4521,7 @@ public class IndexWriter implements Clos
* deletion files, this SegmentInfo must not reference such files when this
* method is called, because they are not allowed within a compound file.
*/
- static final Collection<String> createCompoundFile(InfoStream infoStream, Directory directory, CheckAbort checkAbort, final SegmentInfo info, IOContext context)
+ static final Collection<String> createCompoundFile(InfoStream infoStream, Directory directory, final SegmentInfo info, IOContext context)
throws IOException {
// TODO: use trackingdirectorywrapper instead of files() to know which files to delete when things fail:
@@ -4510,7 +4535,7 @@ public class IndexWriter implements Clos
boolean success = false;
try {
- info.getCodec().compoundFormat().write(directory, info, files, checkAbort, context);
+ info.getCodec().compoundFormat().write(directory, info, files, context);
success = true;
} finally {
if (!success) {
@@ -4643,4 +4668,24 @@ public class IndexWriter implements Clos
throw new IllegalStateException("number of documents in the index cannot exceed " + actualMaxDocs);
}
}
+
+ /** Wraps the incoming {@link Directory} so that we assign a per-thread
+ * {@link MergeRateLimiter} to all created {@link IndexOutput}s. */
+ private Directory addMergeRateLimiters(Directory in) {
+ return new FilterDirectory(in) {
+ @Override
+ public IndexOutput createOutput(String name, IOContext context) throws IOException {
+ ensureOpen();
+
+ // This Directory is only supposed to be used during merging,
+ // so all writes should have MERGE context, else there is a bug
+ // somewhere that is failing to pass down the right IOContext:
+ assert context.context == IOContext.Context.MERGE: "got context=" + context.context;
+ IndexOutput output = in.createOutput(name, context);
+ MergeRateLimiter rateLimiter = rateLimiters.get();
+ assert rateLimiter != null;
+ return new RateLimitedIndexOutput(rateLimiter, output);
+ }
+ };
+ }
}
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsAndPositionsEnum.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsAndPositionsEnum.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsAndPositionsEnum.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsAndPositionsEnum.java Mon Jan 5 14:28:28 2015
@@ -17,8 +17,8 @@ package org.apache.lucene.index;
* limitations under the License.
*/
-import org.apache.lucene.util.BytesRef;
import org.apache.lucene.index.MultiDocsAndPositionsEnum.EnumWithSlice;
+import org.apache.lucene.util.BytesRef;
import java.io.IOException;
@@ -97,13 +97,6 @@ final class MappingMultiDocsAndPositions
int doc = current.nextDoc();
if (doc != NO_MORE_DOCS) {
-
- mergeState.checkAbortCount++;
- if (mergeState.checkAbortCount > 60000) {
- mergeState.checkAbort.work(mergeState.checkAbortCount/5.0);
- mergeState.checkAbortCount = 0;
- }
-
// compact deletions
doc = currentMap.get(doc);
if (doc == -1) {
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsEnum.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsEnum.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsEnum.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MappingMultiDocsEnum.java Mon Jan 5 14:28:28 2015
@@ -97,13 +97,6 @@ final class MappingMultiDocsEnum extends
int doc = current.nextDoc();
if (doc != NO_MORE_DOCS) {
-
- mergeState.checkAbortCount++;
- if (mergeState.checkAbortCount > 60000) {
- mergeState.checkAbort.work(mergeState.checkAbortCount/5.0);
- mergeState.checkAbortCount = 0;
- }
-
// compact deletions
doc = currentMap.get(doc);
if (doc == -1) {
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java Mon Jan 5 14:28:28 2015
@@ -17,16 +17,17 @@ package org.apache.lucene.index;
* limitations under the License.
*/
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.MergeInfo;
-import org.apache.lucene.util.FixedBitSet;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.MergeInfo;
+import org.apache.lucene.store.RateLimiter;
+import org.apache.lucene.util.FixedBitSet;
+
/**
* <p>Expert: a MergePolicy determines the sequence of
* primitive merge operations.</p>
@@ -107,11 +108,14 @@ public abstract class MergePolicy {
/** Segments to be merged. */
public final List<SegmentCommitInfo> segments;
+ /** A private {@link RateLimiter} for this merge, used to rate limit writes and abort. */
+ public final MergeRateLimiter rateLimiter;
+
+ volatile long mergeStartNS = -1;
+
/** Total number of documents in segments to be merged, not accounting for deletions. */
public final int totalDocCount;
- boolean aborted;
Throwable error;
- boolean paused;
/** Sole constructor.
* @param segments List of {@link SegmentCommitInfo}s
@@ -127,6 +131,8 @@ public abstract class MergePolicy {
count += info.info.getDocCount();
}
totalDocCount = count;
+
+ rateLimiter = new MergeRateLimiter(this);
}
/** Called by {@link IndexWriter} after the merge is done and all readers have been closed. */
@@ -186,68 +192,16 @@ public abstract class MergePolicy {
return error;
}
- /** Mark this merge as aborted. If this is called
- * before the merge is committed then the merge will
- * not be committed. */
- synchronized void abort() {
- aborted = true;
- notifyAll();
- }
-
- /** Returns true if this merge was aborted. */
- synchronized boolean isAborted() {
- return aborted;
- }
-
- /** Called periodically by {@link IndexWriter} while
- * merging to see if the merge is aborted. */
- public synchronized void checkAborted(Directory dir) throws MergeAbortedException {
- if (aborted) {
- throw new MergeAbortedException("merge is aborted: " + segString(dir));
- }
-
- while (paused) {
- try {
- // In theory we could wait() indefinitely, but we
- // do 250 msec, defensively
- wait(250);
- } catch (InterruptedException ie) {
- throw new RuntimeException(ie);
- }
- if (aborted) {
- throw new MergeAbortedException("merge is aborted: " + segString(dir));
- }
- }
- }
-
- /** Set or clear whether this merge is paused paused (for example
- * {@link ConcurrentMergeScheduler} will pause merges
- * if too many are running). */
- synchronized public void setPause(boolean paused) {
- this.paused = paused;
- if (!paused) {
- // Wakeup merge thread, if it's waiting
- notifyAll();
- }
- }
-
- /** Returns true if this merge is paused.
- *
- * @see #setPause(boolean) */
- synchronized public boolean getPause() {
- return paused;
- }
-
/** Returns a readable description of the current merge
* state. */
- public String segString(Directory dir) {
+ public String segString() {
StringBuilder b = new StringBuilder();
final int numSegments = segments.size();
for(int i=0;i<numSegments;i++) {
if (i > 0) {
b.append(' ');
}
- b.append(segments.get(i).toString(dir, 0));
+ b.append(segments.get(i).toString());
}
if (info != null) {
b.append(" into ").append(info.info.name);
@@ -255,7 +209,7 @@ public abstract class MergePolicy {
if (maxNumSegments != -1) {
b.append(" [maxNumSegments=" + maxNumSegments + "]");
}
- if (aborted) {
+ if (rateLimiter.getAbort()) {
b.append(" [ABORTED]");
}
return b.toString();
@@ -321,7 +275,7 @@ public abstract class MergePolicy {
b.append("MergeSpec:\n");
final int count = merges.size();
for(int i=0;i<count;i++) {
- b.append(" ").append(1 + i).append(": ").append(merges.get(i).segString(dir));
+ b.append(" ").append(1 + i).append(": ").append(merges.get(i).segString());
}
return b.toString();
}
@@ -538,5 +492,4 @@ public abstract class MergePolicy {
v *= 1024 * 1024;
this.maxCFSSegmentSize = v > Long.MAX_VALUE ? Long.MAX_VALUE : (long) v;
}
-
}
Added: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java?rev=1649532&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java (added)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java Mon Jan 5 14:28:28 2015
@@ -0,0 +1,194 @@
+package org.apache.lucene.index;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.store.RateLimiter;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+import static org.apache.lucene.store.RateLimiter.SimpleRateLimiter;
+
+/** This is the {@link RateLimiter} that {@link IndexWriter} assigns to each running merge, to
+ * give {@link MergeScheduler}s ionice like control.
+ *
+ * This is similar to {@link SimpleRateLimiter}, except it's merge-private,
+ * it will wake up if its rate changes while it's paused, it tracks how
+ * much time it spent stopped and paused, and it supports aborting.
+ *
+ * @lucene.internal */
+
+public class MergeRateLimiter extends RateLimiter {
+
+ private final static int MIN_PAUSE_CHECK_MSEC = 25;
+ volatile long totalBytesWritten;
+
+ // By default no IO limit:
+ double mbPerSec = Double.POSITIVE_INFINITY;
+ private long lastNS;
+ private long minPauseCheckBytes;
+ private boolean abort;
+ long totalPausedNS;
+ long totalStoppedNS;
+ final MergePolicy.OneMerge merge;
+
+ /** Returned by {@link #maybePause}. */
+ private static enum PauseResult {NO, STOPPED, PAUSED};
+
+ /** Sole constructor. */
+ public MergeRateLimiter(MergePolicy.OneMerge merge) {
+ this.merge = merge;
+ }
+
+ @Override
+ public synchronized void setMBPerSec(double mbPerSec) {
+ // 0.0 is allowed: it means the merge is paused
+ if (mbPerSec < 0.0) {
+ throw new IllegalArgumentException("mbPerSec must be positive; got: " + mbPerSec);
+ }
+ this.mbPerSec = mbPerSec;
+ // NOTE: Double.POSITIVE_INFINITY casts to Long.MAX_VALUE
+ minPauseCheckBytes = Math.min(1024*1024, (long) ((MIN_PAUSE_CHECK_MSEC / 1000.0) * mbPerSec * 1024 * 1024));
+ assert minPauseCheckBytes >= 0;
+ notify();
+ }
+
+ @Override
+ public synchronized double getMBPerSec() {
+ return mbPerSec;
+ }
+
+ /** Returns total bytes written by this merge. */
+ public long getTotalBytesWritten() {
+ return totalBytesWritten;
+ }
+
+ @Override
+ public long pause(long bytes) throws MergePolicy.MergeAbortedException {
+
+ totalBytesWritten += bytes;
+
+ long startNS = System.nanoTime();
+ long curNS = startNS;
+
+ // While loop because 1) Thread.wait doesn't always sleep long
+ // enough, and 2) we wake up and check again when our rate limit
+ // is changed while we were pausing:
+ long pausedNS = 0;
+ while (true) {
+ PauseResult result = maybePause(bytes, curNS);
+ if (result == PauseResult.NO) {
+ // Set to curNS, not targetNS, to enforce the instant rate, not
+ // the "averaaged over all history" rate:
+ lastNS = curNS;
+ break;
+ }
+ curNS = System.nanoTime();
+ long ns = curNS - startNS;
+ startNS = curNS;
+
+ // Separately track when merge was stopped vs rate limited:
+ if (result == PauseResult.STOPPED) {
+ totalStoppedNS += ns;
+ } else {
+ assert result == PauseResult.PAUSED;
+ totalPausedNS += ns;
+ }
+ pausedNS += ns;
+ }
+
+ return pausedNS;
+ }
+
+ /** Total NS merge was stopped. */
+ public synchronized long getTotalStoppedNS() {
+ return totalStoppedNS;
+ }
+
+ /** Total NS merge was paused to rate limit IO. */
+ public synchronized long getTotalPausedNS() {
+ return totalPausedNS;
+ }
+
+ /** Returns NO if no pause happened, STOPPED if pause because rate was 0.0 (merge is stopped), PAUSED if paused with a normal rate limit. */
+ private synchronized PauseResult maybePause(long bytes, long curNS) throws MergePolicy.MergeAbortedException {
+ double secondsToPause = (bytes/1024./1024.) / mbPerSec;
+
+ // Time we should sleep until; this is purely instantaneous
+ // rate (just adds seconds onto the last time we had paused to);
+ // maybe we should also offer decayed recent history one?
+ long targetNS = lastNS + (long) (1000000000 * secondsToPause);
+
+ long curPauseNS = targetNS - curNS;
+
+ // NOTE: except maybe on real-time JVMs, minimum realistic
+ // wait/sleep time is 1 msec; if you pass just 1 nsec the impl
+ // rounds up to 1 msec, so we don't bother unless it's > 2 msec:
+
+ if (curPauseNS <= 2000000) {
+ return PauseResult.NO;
+ }
+
+ // Defensive: sleep for at most 250 msec; the loop above will call us again if we should keep sleeping:
+ if (curPauseNS > 250L*1000000) {
+ curPauseNS = 250L*1000000;
+ }
+
+ int sleepMS = (int) (curPauseNS / 1000000);
+ int sleepNS = (int) (curPauseNS % 1000000);
+
+ // Now is a good time to abort the merge:
+ checkAbort();
+
+ double rate = mbPerSec;
+
+ try {
+ // CMS can wake us up here if it changes our target rate:
+ wait(sleepMS, sleepNS);
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ }
+
+ if (rate == 0.0) {
+ return PauseResult.STOPPED;
+ } else {
+ return PauseResult.PAUSED;
+ }
+ }
+
+ /** Throws {@link MergePolicy.MergeAbortedException} if this merge was aborted. */
+ public synchronized void checkAbort() throws MergePolicy.MergeAbortedException {
+ if (abort) {
+ throw new MergePolicy.MergeAbortedException("merge is aborted: " + merge.segString());
+ }
+ }
+
+ /** Mark this merge aborted. */
+ public synchronized void setAbort() {
+ abort = true;
+ notify();
+ }
+
+ /** Returns true if this merge was aborted. */
+ public synchronized boolean getAbort() {
+ return abort;
+ }
+
+ @Override
+ public long getMinPauseCheckBytes() {
+ return minPauseCheckBytes;
+ }
+}
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java?rev=1649532&r1=1649531&r2=1649532&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java Mon Jan 5 14:28:28 2015
@@ -20,6 +20,8 @@ package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException;
+import org.apache.lucene.util.InfoStream;
+
/** <p>Expert: {@link IndexWriter} uses an instance
* implementing this interface to execute the merges
* selected by a {@link MergePolicy}. The default
@@ -46,4 +48,34 @@ public abstract class MergeScheduler imp
/** Close this MergeScheduler. */
@Override
public abstract void close() throws IOException;
+
+ /** For messages about merge scheduling */
+ protected InfoStream infoStream;
+
+ /** IndexWriter calls this on init. */
+ final void setInfoStream(InfoStream infoStream) {
+ this.infoStream = infoStream;
+ }
+
+ /**
+ * Returns true if infoStream messages are enabled. This method is usually used in
+ * conjunction with {@link #message(String)}:
+ *
+ * <pre class="prettyprint">
+ * if (verbose()) {
+ * message("your message");
+ * }
+ * </pre>
+ */
+ protected boolean verbose() {
+ return infoStream != null && infoStream.isEnabled("MS");
+ }
+
+ /**
+ * Outputs the given message - this method assumes {@link #verbose()} was
+ * called and returned true.
+ */
+ protected void message(String message) {
+ infoStream.message("MS", message);
+ }
}