You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2017/06/25 02:06:31 UTC
[09/47] lucene-solr:feature/autoscaling: LUCENE-7868: use multiple
threads to concurrently resolve deletes and DV udpates
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 14fbbae..4870282 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -29,10 +29,11 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
-import java.util.Map.Entry;
import java.util.Map;
+import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -71,6 +72,8 @@ import org.apache.lucene.util.ThreadInterruptedException;
import org.apache.lucene.util.UnicodeUtil;
import org.apache.lucene.util.Version;
+import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
+
/**
An <code>IndexWriter</code> creates and maintains an index.
@@ -110,9 +113,7 @@ import org.apache.lucene.util.Version;
The default is to flush when RAM usage hits
{@link IndexWriterConfig#DEFAULT_RAM_BUFFER_SIZE_MB} MB. For
best indexing speed you should flush by RAM usage with a
- large RAM buffer. Additionally, if IndexWriter reaches the configured number of
- buffered deletes (see {@link IndexWriterConfig#setMaxBufferedDeleteTerms})
- the deleted terms and queries are flushed and applied to existing segments.
+ large RAM buffer.
In contrast to the other flush options {@link IndexWriterConfig#setRAMBufferSizeMB} and
{@link IndexWriterConfig#setMaxBufferedDocs(int)}, deleted terms
won't trigger a segment flush. Note that flushing just moves the
@@ -237,7 +238,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
/** Used only for testing. */
boolean enableTestPoints = false;
- private static final int UNBOUNDED_MAX_MERGE_SEGMENTS = -1;
+ static final int UNBOUNDED_MAX_MERGE_SEGMENTS = -1;
/**
* Name of the write lock in the index.
@@ -272,7 +273,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
volatile Throwable tragedy;
private final Directory directoryOrig; // original user directory
- private final Directory directory; // wrapped with additional checks
+ final Directory directory; // wrapped with additional checks
private final Analyzer analyzer; // how to analyze text
private final AtomicLong changeCount = new AtomicLong(); // increments every time a change is completed
@@ -289,7 +290,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
final SegmentInfos segmentInfos; // the segments
final FieldNumbers globalFieldNumberMap;
- private final DocumentsWriter docWriter;
+ final DocumentsWriter docWriter;
private final Queue<Event> eventQueue;
final IndexFileDeleter deleter;
@@ -302,11 +303,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
private volatile boolean closed;
private volatile boolean closing;
+ final AtomicBoolean maybeMerge = new AtomicBoolean();
+
private Iterable<Map.Entry<String,String>> commitUserData;
// Holds all SegmentInfo instances currently involved in
// merges
- private HashSet<SegmentCommitInfo> mergingSegments = new HashSet<>();
+ HashSet<SegmentCommitInfo> mergingSegments = new HashSet<>();
private final MergeScheduler mergeScheduler;
private LinkedList<MergePolicy.OneMerge> pendingMerges = new LinkedList<>();
@@ -317,11 +320,16 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
private boolean didMessageState;
final AtomicInteger flushCount = new AtomicInteger();
+
final AtomicInteger flushDeletesCount = new AtomicInteger();
final ReaderPool readerPool = new ReaderPool();
final BufferedUpdatesStream bufferedUpdatesStream;
+ /** Counts how many merges have completed; this is used by {@link FrozenBufferedUpdates#apply}
+ * to handle concurrently apply deletes/updates with merges completing. */
+ final AtomicLong mergeFinishedGen = new AtomicLong();
+
// This is a "write once" variable (like the organic dye
// on a DVD-R that may or may not be heated by a laser and
// then cooled to permanently record the event): it's
@@ -449,21 +457,36 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} else {
anyChanges = false;
}
- if (!anyChanges) {
+ if (anyChanges == false) {
// prevent double increment since docWriter#doFlush increments the flushcount
// if we flushed anything.
flushCount.incrementAndGet();
}
- // Prevent segmentInfos from changing while opening the
- // reader; in theory we could instead do similar retry logic,
- // just like we do when loading segments_N
+
+ processEvents(false, true);
+
+ if (applyAllDeletes) {
+ applyAllDeletesAndUpdates();
+ }
+
synchronized(this) {
- anyChanges |= maybeApplyDeletes(applyAllDeletes);
+
+ // NOTE: we cannot carry doc values updates in memory yet, so we always must write them through to disk and re-open each
+ // SegmentReader:
+
+ // TODO: we could instead just clone SIS and pull/incref readers in sync'd block, and then do this w/o IW's lock?
+ // Must do this sync'd on IW to prevent a merge from completing at the last second and failing to write its DV updates:
+ readerPool.writeAllDocValuesUpdates();
+
if (writeAllDeletes) {
// Must move the deletes to disk:
readerPool.commit(segmentInfos);
}
+ // Prevent segmentInfos from changing while opening the
+ // reader; in theory we could instead do similar retry logic,
+ // just like we do when loading segments_N
+
r = StandardDirectoryReader.open(this, segmentInfos, applyAllDeletes, writeAllDeletes);
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "return reader version=" + r.getVersion() + " reader=" + r);
@@ -483,6 +506,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
}
}
+ anyChanges |= maybeMerge.getAndSet(false);
if (anyChanges) {
maybeMerge(config.getMergePolicy(), MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
@@ -509,7 +533,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
/** Holds shared SegmentReader instances. IndexWriter uses
- * SegmentReaders for 1) applying deletes, 2) doing
+ * SegmentReaders for 1) applying deletes/DV updates, 2) doing
* merges, 3) handing out a real-time reader. This pool
* reuses instances of the SegmentReaders in all these
* places if it is in "near real-time mode" (getReader()
@@ -519,8 +543,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
private final Map<SegmentCommitInfo,ReadersAndUpdates> readerMap = new HashMap<>();
- // used only by asserts
- public synchronized boolean infoIsLive(SegmentCommitInfo info) {
+ /** Asserts this info still exists in IW's segment infos */
+ public synchronized boolean assertInfoIsLive(SegmentCommitInfo info) {
int idx = segmentInfos.indexOf(info);
assert idx != -1: "info=" + info + " isn't live";
assert segmentInfos.info(idx) == info: "info=" + info + " doesn't match live info in segmentInfos";
@@ -531,12 +555,19 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
final ReadersAndUpdates rld = readerMap.get(info);
if (rld != null) {
assert info == rld.info;
-// System.out.println("[" + Thread.currentThread().getName() + "] ReaderPool.drop: " + info);
readerMap.remove(info);
rld.dropReaders();
}
}
+ public synchronized long ramBytesUsed() {
+ long bytes = 0;
+ for (ReadersAndUpdates rld : readerMap.values()) {
+ bytes += rld.ramBytesUsed.get();
+ }
+ return bytes;
+ }
+
public synchronized boolean anyPendingDeletes() {
for(ReadersAndUpdates rld : readerMap.values()) {
if (rld.getPendingDeleteCount() != 0) {
@@ -556,30 +587,39 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Matches incRef in get:
rld.decRef();
- // Pool still holds a ref:
- assert rld.refCount() >= 1;
-
- if (!poolReaders && rld.refCount() == 1) {
- // This is the last ref to this RLD, and we're not
- // pooling, so remove it:
-// System.out.println("[" + Thread.currentThread().getName() + "] ReaderPool.release: " + rld.info);
- if (rld.writeLiveDocs(directory)) {
- // Make sure we only write del docs for a live segment:
- assert assertInfoLive == false || infoIsLive(rld.info);
- // Must checkpoint because we just
- // created new _X_N.del and field updates files;
- // don't call IW.checkpoint because that also
- // increments SIS.version, which we do not want to
- // do here: it was done previously (after we
- // invoked BDS.applyDeletes), whereas here all we
- // did was move the state to disk:
- checkpointNoSIS();
- }
- //System.out.println("IW: done writeLiveDocs for info=" + rld.info);
+ if (rld.refCount() == 0) {
+ // This happens if the segment was just merged away, while a buffered deletes packet was still applying deletes/updates to it.
+ assert readerMap.containsKey(rld.info) == false: "seg=" + rld.info + " has refCount 0 but still unexpectedly exists in the reader pool";
+ } else {
-// System.out.println("[" + Thread.currentThread().getName() + "] ReaderPool.release: drop readers " + rld.info);
- rld.dropReaders();
- readerMap.remove(rld.info);
+ // Pool still holds a ref:
+ assert rld.refCount() > 0: "refCount=" + rld.refCount() + " reader=" + rld.info;
+
+ if (!poolReaders && rld.refCount() == 1 && readerMap.containsKey(rld.info)) {
+ // This is the last ref to this RLD, and we're not
+ // pooling, so remove it:
+ if (rld.writeLiveDocs(directory)) {
+ // Make sure we only write del docs for a live segment:
+ assert assertInfoLive == false || assertInfoIsLive(rld.info);
+ // Must checkpoint because we just
+ // created new _X_N.del and field updates files;
+ // don't call IW.checkpoint because that also
+ // increments SIS.version, which we do not want to
+ // do here: it was done previously (after we
+ // invoked BDS.applyDeletes), whereas here all we
+ // did was move the state to disk:
+ checkpointNoSIS();
+ }
+
+ rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
+
+ if (rld.getNumDVUpdates() == 0) {
+ rld.dropReaders();
+ readerMap.remove(rld.info);
+ } else {
+ // We are forced to pool this segment until its deletes fully apply (no delGen gaps)
+ }
+ }
}
}
@@ -588,6 +628,96 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
dropAll(false);
}
+ void writeAllDocValuesUpdates() throws IOException {
+ Collection<ReadersAndUpdates> copy;
+ synchronized (this) {
+ copy = new HashSet<>(readerMap.values());
+ }
+ boolean any = false;
+ for (ReadersAndUpdates rld : copy) {
+ any |= rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
+ }
+ if (any) {
+ checkpoint();
+ }
+ }
+
+ void writeDocValuesUpdates(List<SegmentCommitInfo> infos) throws IOException {
+ boolean any = false;
+ for (SegmentCommitInfo info : infos) {
+ ReadersAndUpdates rld = get(info, false);
+ if (rld != null) {
+ any |= rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
+ }
+ }
+ if (any) {
+ checkpoint();
+ }
+ }
+
+ private final AtomicBoolean writeDocValuesLock = new AtomicBoolean();
+
+ void writeSomeDocValuesUpdates() throws IOException {
+
+ assert Thread.holdsLock(IndexWriter.this) == false;
+
+ if (writeDocValuesLock.compareAndSet(false, true)) {
+ try {
+
+ LiveIndexWriterConfig config = getConfig();
+ double mb = config.getRAMBufferSizeMB();
+ // If the reader pool is > 50% of our IW buffer, then write the updates:
+ if (mb != IndexWriterConfig.DISABLE_AUTO_FLUSH) {
+ long startNS = System.nanoTime();
+
+ long ramBytesUsed = ramBytesUsed();
+ if (ramBytesUsed > 0.5 * mb * 1024 * 1024) {
+ if (infoStream.isEnabled("BD")) {
+ infoStream.message("BD", String.format(Locale.ROOT, "now write some pending DV updates: %.2f MB used vs IWC Buffer %.2f MB",
+ ramBytesUsed/1024./1024., mb));
+ }
+
+ // Sort by largest ramBytesUsed:
+ PriorityQueue<ReadersAndUpdates> queue = new PriorityQueue<>(readerMap.size(), (a, b) -> Long.compare(b.ramBytesUsed.get(), a.ramBytesUsed.get()));
+ synchronized (this) {
+ for (ReadersAndUpdates rld : readerMap.values()) {
+ queue.add(rld);
+ }
+ }
+
+ int count = 0;
+ while (ramBytesUsed > 0.5 * mb * 1024 * 1024) {
+ ReadersAndUpdates rld = queue.poll();
+ if (rld == null) {
+ break;
+ }
+
+ // We need to do before/after because not all RAM in this RAU is used by DV updates, and
+ // not all of those bytes can be written here:
+ long bytesUsedBefore = rld.ramBytesUsed.get();
+
+ // Only acquire IW lock on each write, since this is a time consuming operation. This way
+ // other threads get a chance to run in between our writes.
+ synchronized (IndexWriter.this) {
+ rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
+ }
+ long bytesUsedAfter = rld.ramBytesUsed.get();
+ ramBytesUsed -= bytesUsedBefore - bytesUsedAfter;
+ count++;
+ }
+
+ if (infoStream.isEnabled("BD")) {
+ infoStream.message("BD", String.format(Locale.ROOT, "done write some DV updates for %d segments: now %.2f MB used vs IWC Buffer %.2f MB; took %.2f sec",
+ count, ramBytesUsed()/1024./1024., mb, ((System.nanoTime() - startNS)/1000000000.)));
+ }
+ }
+ }
+ } finally {
+ writeDocValuesLock.set(false);
+ }
+ }
+ }
+
/** Remove all our references to readers, and commits
* any pending changes. */
synchronized void dropAll(boolean doSave) throws IOException {
@@ -599,7 +729,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
try {
if (doSave && rld.writeLiveDocs(directory)) {
// Make sure we only write del docs and field updates for a live segment:
- assert infoIsLive(rld.info);
+ assert assertInfoIsLive(rld.info);
// Must checkpoint because we just
// created new _X_N.del and field updates files;
// don't call IW.checkpoint because that also
@@ -654,9 +784,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
final ReadersAndUpdates rld = readerMap.get(info);
if (rld != null) {
assert rld.info == info;
- if (rld.writeLiveDocs(directory)) {
+ boolean changed = rld.writeLiveDocs(directory);
+
+ changed |= rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
+
+ if (changed) {
// Make sure we only write del docs for a live segment:
- assert infoIsLive(info);
+ assert assertInfoIsLive(info);
// Must checkpoint because we just
// created new _X_N.del and field updates files;
@@ -667,10 +801,22 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// did was move the state to disk:
checkpointNoSIS();
}
+
}
}
}
+ public synchronized boolean anyChanges() {
+ for (ReadersAndUpdates rld : readerMap.values()) {
+ // NOTE: we don't check for pending deletes because deletes carry over in RAM to NRT readers
+ if (rld.getNumDVUpdates() != 0) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
/**
* Obtain a ReadersAndLiveDocs instance from the
* readerPool. If create is true, you must later call
@@ -685,14 +831,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
ReadersAndUpdates rld = readerMap.get(info);
if (rld == null) {
- if (!create) {
+ if (create == false) {
return null;
}
rld = new ReadersAndUpdates(IndexWriter.this, info);
// Steal initial reference:
readerMap.put(info, rld);
} else {
- assert rld.info == info: "rld.info=" + rld.info + " info=" + info + " isLive?=" + infoIsLive(rld.info) + " vs " + infoIsLive(info);
+ assert rld.info == info: "rld.info=" + rld.info + " info=" + info + " isLive?=" + assertInfoIsLive(rld.info) + " vs " + assertInfoIsLive(info);
}
if (create) {
@@ -809,7 +955,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
mergeScheduler.setInfoStream(infoStream);
codec = config.getCodec();
- bufferedUpdatesStream = new BufferedUpdatesStream(infoStream);
+ bufferedUpdatesStream = new BufferedUpdatesStream(this);
poolReaders = config.getReaderPooling();
OpenMode mode = config.getOpenMode();
@@ -1446,38 +1592,38 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
ReadersAndUpdates rld = readerPool.get(info, false);
if (rld != null) {
synchronized(bufferedUpdatesStream) {
- rld.initWritableLiveDocs();
if (rld.delete(docID)) {
final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount();
if (fullDelCount == rld.info.info.maxDoc()) {
- // If a merge has already registered for this
- // segment, we leave it in the readerPool; the
- // merge will skip merging it and will then drop
- // it once it's done:
- if (!mergingSegments.contains(rld.info)) {
- segmentInfos.remove(rld.info);
- readerPool.drop(rld.info);
- checkpoint();
- }
+ dropDeletedSegment(rld.info);
+ checkpoint();
}
// Must bump changeCount so if no other changes
// happened, we still commit this change:
changed();
}
- //System.out.println(" yes " + info.info.name + " " + docID);
return docWriter.deleteQueue.getNextSequenceNumber();
}
- } else {
- //System.out.println(" no rld " + info.info.name + " " + docID);
}
- } else {
- //System.out.println(" no seg " + info.info.name + " " + docID);
}
return -1;
}
+ /** Drops a segment that has 100% deleted documents. */
+ synchronized void dropDeletedSegment(SegmentCommitInfo info) throws IOException {
+ // If a merge has already registered for this
+ // segment, we leave it in the readerPool; the
+ // merge will skip merging it and will then drop
+ // it once it's done:
+ if (mergingSegments.contains(info) == false) {
+ segmentInfos.remove(info);
+ pendingNumDocs.addAndGet(-info.info.maxDoc());
+ readerPool.drop(info);
+ }
+ }
+
/**
* Deletes the document(s) containing any of the
* terms. All given deletes are applied and flushed atomically
@@ -1881,20 +2027,20 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
public void forceMerge(int maxNumSegments, boolean doWait) throws IOException {
ensureOpen();
- if (maxNumSegments < 1)
+ if (maxNumSegments < 1) {
throw new IllegalArgumentException("maxNumSegments must be >= 1; got " + maxNumSegments);
+ }
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "forceMerge: index now " + segString());
infoStream.message("IW", "now flush at forceMerge");
}
-
flush(true, true);
-
synchronized(this) {
resetMergeExceptions();
segmentsToMerge.clear();
for(SegmentCommitInfo info : segmentInfos) {
+ assert info != null;
segmentsToMerge.put(info, Boolean.TRUE);
}
mergeMaxNumSegments = maxNumSegments;
@@ -1903,12 +2049,18 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// merge:
for(final MergePolicy.OneMerge merge : pendingMerges) {
merge.maxNumSegments = maxNumSegments;
- segmentsToMerge.put(merge.info, Boolean.TRUE);
+ if (merge.info != null) {
+ // TODO: explain why this is sometimes still null
+ segmentsToMerge.put(merge.info, Boolean.TRUE);
+ }
}
for (final MergePolicy.OneMerge merge: runningMerges) {
merge.maxNumSegments = maxNumSegments;
- segmentsToMerge.put(merge.info, Boolean.TRUE);
+ if (merge.info != null) {
+ // TODO: explain why this is sometimes still null
+ segmentsToMerge.put(merge.info, Boolean.TRUE);
+ }
}
}
@@ -2076,7 +2228,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
maybeMerge(config.getMergePolicy(), MergeTrigger.EXPLICIT, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
- private final void maybeMerge(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments) throws IOException {
+ final void maybeMerge(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments) throws IOException {
ensureOpen(false);
boolean newMergesFound = updatePendingMerges(mergePolicy, trigger, maxNumSegments);
mergeScheduler.merge(this, trigger, newMergesFound);
@@ -2103,7 +2255,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
final MergePolicy.MergeSpecification spec;
if (maxNumSegments != UNBOUNDED_MAX_MERGE_SEGMENTS) {
assert trigger == MergeTrigger.EXPLICIT || trigger == MergeTrigger.MERGE_FINISHED :
- "Expected EXPLICT or MERGE_FINISHED as trigger even with maxNumSegments set but was: " + trigger.name();
+ "Expected EXPLICT or MERGE_FINISHED as trigger even with maxNumSegments set but was: " + trigger.name();
+
spec = mergePolicy.findForcedMerges(segmentInfos, maxNumSegments, Collections.unmodifiableMap(segmentsToMerge), this);
newMergesFound = spec != null;
if (newMergesFound) {
@@ -2212,6 +2365,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
mergeScheduler.close();
bufferedUpdatesStream.clear();
+
docWriter.close(); // mark it as closed first to prevent subsequent indexing actions/flushes
docWriter.abort(this); // don't sync on IW here
synchronized(this) {
@@ -2496,49 +2650,63 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
segmentInfos.changed();
}
- synchronized void publishFrozenUpdates(FrozenBufferedUpdates packet) {
+ synchronized void publishFrozenUpdates(FrozenBufferedUpdates packet) throws IOException {
assert packet != null && packet.any();
- synchronized (bufferedUpdatesStream) {
- bufferedUpdatesStream.push(packet);
- }
+ bufferedUpdatesStream.push(packet);
+ docWriter.putEvent(new DocumentsWriter.ResolveUpdatesEvent(packet));
}
-
+
/**
* Atomically adds the segment private delete packet and publishes the flushed
* segments SegmentInfo to the index writer.
*/
- void publishFlushedSegment(SegmentCommitInfo newSegment,
- FrozenBufferedUpdates packet, FrozenBufferedUpdates globalPacket) throws IOException {
+ synchronized void publishFlushedSegment(SegmentCommitInfo newSegment,
+ FrozenBufferedUpdates packet, FrozenBufferedUpdates globalPacket,
+ Sorter.DocMap sortMap) throws IOException {
try {
- synchronized (this) {
- // Lock order IW -> BDS
- ensureOpen(false);
- synchronized (bufferedUpdatesStream) {
- if (infoStream.isEnabled("IW")) {
- infoStream.message("IW", "publishFlushedSegment");
- }
+ // Lock order IW -> BDS
+ ensureOpen(false);
+
+ if (infoStream.isEnabled("IW")) {
+ infoStream.message("IW", "publishFlushedSegment " + newSegment);
+ }
+
+ if (globalPacket != null && globalPacket.any()) {
+ // Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock:
+ bufferedUpdatesStream.push(globalPacket);
+ docWriter.putEvent(new DocumentsWriter.ResolveUpdatesEvent(globalPacket));
+ }
+
+ // Publishing the segment must be sync'd on IW -> BDS to make the sure
+ // that no merge prunes away the seg. private delete packet
+ final long nextGen;
+ if (packet != null && packet.any()) {
+ nextGen = bufferedUpdatesStream.push(packet);
+
+ // Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock:
+ docWriter.putEvent(new DocumentsWriter.ResolveUpdatesEvent(packet));
- if (globalPacket != null && globalPacket.any()) {
- bufferedUpdatesStream.push(globalPacket);
- }
- // Publishing the segment must be synched on IW -> BDS to make the sure
- // that no merge prunes away the seg. private delete packet
- final long nextGen;
- if (packet != null && packet.any()) {
- nextGen = bufferedUpdatesStream.push(packet);
- } else {
- // Since we don't have a delete packet to apply we can get a new
- // generation right away
- nextGen = bufferedUpdatesStream.getNextGen();
- }
- if (infoStream.isEnabled("IW")) {
- infoStream.message("IW", "publish sets newSegment delGen=" + nextGen + " seg=" + segString(newSegment));
- }
- newSegment.setBufferedDeletesGen(nextGen);
- segmentInfos.add(newSegment);
- checkpoint();
- }
+ } else {
+ // Since we don't have a delete packet to apply we can get a new
+ // generation right away
+ nextGen = bufferedUpdatesStream.getNextGen();
+ // No deletes/updates here, so marked finished immediately:
+ bufferedUpdatesStream.finishedSegment(nextGen);
+ }
+ if (infoStream.isEnabled("IW")) {
+ infoStream.message("IW", "publish sets newSegment delGen=" + nextGen + " seg=" + segString(newSegment));
}
+ newSegment.setBufferedDeletesGen(nextGen);
+ segmentInfos.add(newSegment);
+ checkpoint();
+
+ if (packet != null && packet.any() && sortMap != null) {
+ // TODO: not great we do this heavyish op while holding IW's monitor lock,
+ // but it only applies if you are using sorted indices and updating doc values:
+ ReadersAndUpdates rld = readerPool.get(newSegment, true);
+ rld.sortMap = sortMap;
+ }
+
} finally {
flushCount.incrementAndGet();
doAfterFlush();
@@ -2924,7 +3092,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
/** Copies the segment files as-is into the IndexWriter's directory. */
private SegmentCommitInfo copySegmentAsIs(SegmentCommitInfo info, String segName, IOContext context) throws IOException {
- //System.out.println("copy seg=" + info.info.name + " version=" + info.info.getVersion());
// Same SI as before but we change directory and name
SegmentInfo newInfo = new SegmentInfo(directoryOrig, info.info.getVersion(), info.info.getMinVersion(), segName, info.info.maxDoc(),
info.info.getUseCompoundFile(), info.info.getCodec(),
@@ -2991,16 +3158,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
@Override
public final long prepareCommit() throws IOException {
ensureOpen();
- boolean[] doMaybeMerge = new boolean[1];
- pendingSeqNo = prepareCommitInternal(doMaybeMerge);
+ pendingSeqNo = prepareCommitInternal();
// we must do this outside of the commitLock else we can deadlock:
- if (doMaybeMerge[0]) {
+ if (maybeMerge.getAndSet(false)) {
maybeMerge(config.getMergePolicy(), MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
return pendingSeqNo;
}
- private long prepareCommitInternal(boolean[] doMaybeMerge) throws IOException {
+ private long prepareCommitInternal() throws IOException {
startCommitTime = System.nanoTime();
synchronized(commitLock) {
ensureOpen(false);
@@ -3020,7 +3186,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
doBeforeFlush();
testPoint("startDoFlush");
SegmentInfos toCommit = null;
- boolean anySegmentsFlushed = false;
+ boolean anyChanges = false;
long seqNo;
// This is copied from doFlush, except it's modified to
@@ -3035,19 +3201,23 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
try {
seqNo = docWriter.flushAllThreads();
if (seqNo < 0) {
- anySegmentsFlushed = true;
+ anyChanges = true;
seqNo = -seqNo;
}
- if (!anySegmentsFlushed) {
+ if (anyChanges == false) {
// prevent double increment since docWriter#doFlush increments the flushcount
// if we flushed anything.
flushCount.incrementAndGet();
}
- processEvents(false, true);
+
+ // cannot pass triggerMerges=true here else it can lead to deadlock:
+ processEvents(false, false);
+
flushSuccess = true;
+ applyAllDeletesAndUpdates();
+
synchronized(this) {
- maybeApplyDeletes(true);
readerPool.commit(segmentInfos);
@@ -3106,8 +3276,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
boolean success = false;
try {
- if (anySegmentsFlushed) {
- doMaybeMerge[0] = true;
+ if (anyChanges) {
+ maybeMerge.set(true);
}
startCommit(toCommit);
success = true;
@@ -3228,8 +3398,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
infoStream.message("IW", "commit: start");
}
- boolean[] doMaybeMerge = new boolean[1];
-
long seqNo;
synchronized(commitLock) {
@@ -3243,7 +3411,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "commit: now prepare");
}
- seqNo = prepareCommitInternal(doMaybeMerge);
+ seqNo = prepareCommitInternal();
} else {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "commit: already prepared");
@@ -3255,7 +3423,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
// we must do this outside of the commitLock else we can deadlock:
- if (doMaybeMerge[0]) {
+ if (maybeMerge.getAndSet(false)) {
maybeMerge(mergePolicy, MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
@@ -3417,8 +3585,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
processEvents(false, true);
}
}
+
+ if (applyAllDeletes) {
+ applyAllDeletesAndUpdates();
+ }
+
+ anyChanges |= maybeMerge.getAndSet(false);
+
synchronized(this) {
- anyChanges |= maybeApplyDeletes(applyAllDeletes);
doAfterFlush();
success = true;
return anyChanges;
@@ -3436,48 +3610,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
}
- final synchronized boolean maybeApplyDeletes(boolean applyAllDeletes) throws IOException {
- if (applyAllDeletes) {
- if (infoStream.isEnabled("IW")) {
- infoStream.message("IW", "apply all deletes during flush");
- }
- return applyAllDeletesAndUpdates();
- } else if (infoStream.isEnabled("IW")) {
- infoStream.message("IW", "don't apply deletes now delTermCount=" + bufferedUpdatesStream.numTerms() + " bytesUsed=" + bufferedUpdatesStream.ramBytesUsed());
- }
-
- return false;
- }
-
- final synchronized boolean applyAllDeletesAndUpdates() throws IOException {
+ final void applyAllDeletesAndUpdates() throws IOException {
+ assert Thread.holdsLock(this) == false;
flushDeletesCount.incrementAndGet();
- final BufferedUpdatesStream.ApplyDeletesResult result;
if (infoStream.isEnabled("IW")) {
- infoStream.message("IW", "now apply all deletes for all segments maxDoc=" + (docWriter.getNumDocs() + segmentInfos.totalMaxDoc()));
- }
- result = bufferedUpdatesStream.applyDeletesAndUpdates(readerPool, segmentInfos.asList());
- if (result.anyDeletes) {
- checkpoint();
+ infoStream.message("IW", "now apply all deletes for all segments buffered updates bytesUsed=" + bufferedUpdatesStream.ramBytesUsed() + " reader pool bytesUsed=" + readerPool.ramBytesUsed());
}
- if (!keepFullyDeletedSegments && result.allDeleted != null) {
- if (infoStream.isEnabled("IW")) {
- infoStream.message("IW", "drop 100% deleted segments: " + segString(result.allDeleted));
- }
- for (SegmentCommitInfo info : result.allDeleted) {
- // If a merge has already registered for this
- // segment, we leave it in the readerPool; the
- // merge will skip merging it and will then drop
- // it once it's done:
- if (!mergingSegments.contains(info)) {
- segmentInfos.remove(info);
- pendingNumDocs.addAndGet(-info.info.maxDoc());
- readerPool.drop(info);
- }
- }
- checkpoint();
- }
- bufferedUpdatesStream.prune(segmentInfos);
- return result.anyDeletes;
+ bufferedUpdatesStream.waitApplyAll();
}
// for testing only
@@ -3514,41 +3653,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
private static class MergedDeletesAndUpdates {
ReadersAndUpdates mergedDeletesAndUpdates = null;
- boolean initializedWritableLiveDocs = false;
MergedDeletesAndUpdates() {}
- final void init(ReaderPool readerPool, MergePolicy.OneMerge merge, boolean initWritableLiveDocs) throws IOException {
+ final void init(ReaderPool readerPool, MergePolicy.OneMerge merge) throws IOException {
if (mergedDeletesAndUpdates == null) {
mergedDeletesAndUpdates = readerPool.get(merge.info, true);
}
- if (initWritableLiveDocs && !initializedWritableLiveDocs) {
- mergedDeletesAndUpdates.initWritableLiveDocs();
- this.initializedWritableLiveDocs = true;
- }
- }
-
- }
-
- private void maybeApplyMergedDVUpdates(MergePolicy.OneMerge merge, MergeState mergeState,
- MergedDeletesAndUpdates holder, String[] mergingFields, DocValuesFieldUpdates[] dvFieldUpdates,
- DocValuesFieldUpdates.Iterator[] updatesIters, int segment, int curDoc) throws IOException {
- int newDoc = -1;
- for (int idx = 0; idx < mergingFields.length; idx++) {
- DocValuesFieldUpdates.Iterator updatesIter = updatesIters[idx];
- if (updatesIter.doc() == curDoc) { // document has an update
- if (holder.mergedDeletesAndUpdates == null) {
- holder.init(readerPool, merge, false);
- }
- if (newDoc == -1) { // map once per all field updates, but only if there are any updates
- newDoc = mergeState.docMaps[segment].get(curDoc);
- }
- DocValuesFieldUpdates dvUpdates = dvFieldUpdates[idx];
- dvUpdates.add(newDoc, updatesIter.value());
- updatesIter.nextDoc(); // advance to next document
- } else {
- assert updatesIter.doc() > curDoc : "field=" + mergingFields[idx] + " updateDoc=" + updatesIter.doc() + " curDoc=" + curDoc;
- }
}
}
@@ -3564,6 +3675,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*/
synchronized private ReadersAndUpdates commitMergedDeletesAndUpdates(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException {
+ mergeFinishedGen.incrementAndGet();
+
testPoint("startCommitMergeDeletes");
final List<SegmentCommitInfo> sourceSegments = merge.segments;
@@ -3576,9 +3689,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// started merging:
long minGen = Long.MAX_VALUE;
- // Lazy init (only when we find a delete to carry over):
- final MergedDeletesAndUpdates holder = new MergedDeletesAndUpdates();
- final DocValuesFieldUpdates.Container mergedDVUpdates = new DocValuesFieldUpdates.Container();
+ // Lazy init (only when we find a delete or update to carry over):
+ final ReadersAndUpdates mergedDeletesAndUpdates = readerPool.get(merge.info, true);
+
+ // field -> delGen -> dv field updates
+ Map<String,Map<Long,DocValuesFieldUpdates>> mappedDVUpdates = new HashMap<>();
+
+ boolean anyDVUpdates = false;
assert sourceSegments.size() == mergeState.docMaps.length;
for (int i = 0; i < sourceSegments.size(); i++) {
@@ -3587,36 +3704,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
final int maxDoc = info.info.maxDoc();
final Bits prevLiveDocs = merge.readers.get(i).getLiveDocs();
final ReadersAndUpdates rld = readerPool.get(info, false);
- // We hold a ref so it should still be in the pool:
+ // We hold a ref, from when we opened the readers during mergeInit, so it better still be in the pool:
assert rld != null: "seg=" + info.info.name;
final Bits currentLiveDocs = rld.getLiveDocs();
- final Map<String,DocValuesFieldUpdates> mergingFieldUpdates = rld.getMergingFieldUpdates();
- final String[] mergingFields;
- final DocValuesFieldUpdates[] dvFieldUpdates;
- final DocValuesFieldUpdates.Iterator[] updatesIters;
- if (mergingFieldUpdates.isEmpty()) {
- mergingFields = null;
- updatesIters = null;
- dvFieldUpdates = null;
- } else {
- mergingFields = new String[mergingFieldUpdates.size()];
- dvFieldUpdates = new DocValuesFieldUpdates[mergingFieldUpdates.size()];
- updatesIters = new DocValuesFieldUpdates.Iterator[mergingFieldUpdates.size()];
- int idx = 0;
- for (Entry<String,DocValuesFieldUpdates> e : mergingFieldUpdates.entrySet()) {
- String field = e.getKey();
- DocValuesFieldUpdates updates = e.getValue();
- mergingFields[idx] = field;
- dvFieldUpdates[idx] = mergedDVUpdates.getUpdates(field, updates.type);
- if (dvFieldUpdates[idx] == null) {
- dvFieldUpdates[idx] = mergedDVUpdates.newUpdates(field, updates.type, mergeState.segmentInfo.maxDoc());
- }
- updatesIters[idx] = updates.iterator();
- updatesIters[idx].nextDoc(); // advance to first update doc
- ++idx;
- }
- }
-// System.out.println("[" + Thread.currentThread().getName() + "] IW.commitMergedDeletes: info=" + info + ", mergingUpdates=" + mergingUpdates);
+
+ MergeState.DocMap segDocMap = mergeState.docMaps[i];
+ MergeState.DocMap segLeafDocMap = mergeState.leafDocMaps[i];
if (prevLiveDocs != null) {
@@ -3648,26 +3741,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
assert currentLiveDocs.get(j) == false;
} else if (currentLiveDocs.get(j) == false) {
// the document was deleted while we were merging:
- if (holder.mergedDeletesAndUpdates == null || holder.initializedWritableLiveDocs == false) {
- holder.init(readerPool, merge, true);
- }
- holder.mergedDeletesAndUpdates.delete(mergeState.docMaps[i].get(mergeState.leafDocMaps[i].get(j)));
- if (mergingFields != null) { // advance all iters beyond the deleted document
- skipDeletedDoc(updatesIters, j);
- }
- } else if (mergingFields != null) {
- maybeApplyMergedDVUpdates(merge, mergeState, holder, mergingFields, dvFieldUpdates, updatesIters, i, j);
- }
- }
- } else if (mergingFields != null) {
- // need to check each non-deleted document if it has any updates
- for (int j = 0; j < maxDoc; j++) {
- if (prevLiveDocs.get(j)) {
- // document isn't deleted, check if any of the fields have an update to it
- maybeApplyMergedDVUpdates(merge, mergeState, holder, mergingFields, dvFieldUpdates, updatesIters, i, j);
- } else {
- // advance all iters beyond the deleted document
- skipDeletedDoc(updatesIters, j);
+ mergedDeletesAndUpdates.delete(segDocMap.get(segLeafDocMap.get(j)));
}
}
}
@@ -3677,52 +3751,83 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// does:
for (int j = 0; j < maxDoc; j++) {
if (currentLiveDocs.get(j) == false) {
- if (holder.mergedDeletesAndUpdates == null || !holder.initializedWritableLiveDocs) {
- holder.init(readerPool, merge, true);
+ mergedDeletesAndUpdates.delete(segDocMap.get(segLeafDocMap.get(j)));
+ }
+ }
+ }
+
+ // Now carry over all doc values updates that were resolved while we were merging, remapping the docIDs to the newly merged docIDs.
+ // We only carry over packets that finished resolving; if any are still running (concurrently) they will detect that our merge completed
+ // and re-resolve against the newly merged segment:
+
+ Map<String,List<DocValuesFieldUpdates>> mergingDVUpdates = rld.getMergingDVUpdates();
+
+ for (Map.Entry<String,List<DocValuesFieldUpdates>> ent : mergingDVUpdates.entrySet()) {
+
+ String field = ent.getKey();
+
+ Map<Long,DocValuesFieldUpdates> mappedField = mappedDVUpdates.get(field);
+ if (mappedField == null) {
+ mappedField = new HashMap<>();
+ mappedDVUpdates.put(field, mappedField);
+ }
+
+ for (DocValuesFieldUpdates updates : ent.getValue()) {
+
+ if (bufferedUpdatesStream.stillRunning(updates.delGen)) {
+ continue;
+ }
+
+ // sanity check:
+ assert field.equals(updates.field);
+
+ DocValuesFieldUpdates mappedUpdates = mappedField.get(updates.delGen);
+ if (mappedUpdates == null) {
+ switch (updates.type) {
+ case NUMERIC:
+ mappedUpdates = new NumericDocValuesFieldUpdates(updates.delGen, updates.field, merge.info.info.maxDoc());
+ break;
+ case BINARY:
+ mappedUpdates = new BinaryDocValuesFieldUpdates(updates.delGen, updates.field, merge.info.info.maxDoc());
+ break;
+ default:
+ throw new AssertionError();
}
- holder.mergedDeletesAndUpdates.delete(mergeState.docMaps[i].get(mergeState.leafDocMaps[i].get(j)));
- if (mergingFields != null) { // advance all iters beyond the deleted document
- skipDeletedDoc(updatesIters, j);
+ mappedField.put(updates.delGen, mappedUpdates);
+ }
+
+ DocValuesFieldUpdates.Iterator it = updates.iterator();
+ int doc;
+ while ((doc = it.nextDoc()) != NO_MORE_DOCS) {
+ int mappedDoc = segDocMap.get(segLeafDocMap.get(doc));
+ if (mappedDoc != -1) {
+ // not deleted
+ mappedUpdates.add(mappedDoc, it.value());
+ anyDVUpdates = true;
}
- } else if (mergingFields != null) {
- maybeApplyMergedDVUpdates(merge, mergeState, holder, mergingFields, dvFieldUpdates, updatesIters, i, j);
}
}
- } else if (mergingFields != null) {
- // no deletions before or after, but there were updates
- for (int j = 0; j < maxDoc; j++) {
- maybeApplyMergedDVUpdates(merge, mergeState, holder, mergingFields, dvFieldUpdates, updatesIters, i, j);
- }
}
}
- if (mergedDVUpdates.any()) {
-// System.out.println("[" + Thread.currentThread().getName() + "] IW.commitMergedDeletes: mergedDeletes.info=" + mergedDeletes.info + ", mergedFieldUpdates=" + mergedFieldUpdates);
- boolean success = false;
- try {
- // if any error occurs while writing the field updates we should release
- // the info, otherwise it stays in the pool but is considered not "live"
- // which later causes false exceptions in pool.dropAll().
- // NOTE: currently this is the only place which throws a true
- // IOException. If this ever changes, we need to extend that try/finally
- // block to the rest of the method too.
- holder.mergedDeletesAndUpdates.writeFieldUpdates(directory, mergedDVUpdates);
- success = true;
- } finally {
- if (!success) {
- holder.mergedDeletesAndUpdates.dropChanges();
- readerPool.drop(merge.info);
+ if (anyDVUpdates) {
+ // Persist the merged DV updates onto the RAU for the merged segment:
+ for(Map<Long,DocValuesFieldUpdates> d : mappedDVUpdates.values()) {
+ for (DocValuesFieldUpdates updates : d.values()) {
+ updates.finish();
+ mergedDeletesAndUpdates.addDVUpdate(updates);
}
}
}
-
+
if (infoStream.isEnabled("IW")) {
- if (holder.mergedDeletesAndUpdates == null) {
+ if (mergedDeletesAndUpdates == null) {
infoStream.message("IW", "no new deletes or field updates since merge started");
} else {
- String msg = holder.mergedDeletesAndUpdates.getPendingDeleteCount() + " new deletes";
- if (mergedDVUpdates.any()) {
- msg += " and " + mergedDVUpdates.size() + " new field updates";
+ String msg = mergedDeletesAndUpdates.getPendingDeleteCount() + " new deletes";
+ if (anyDVUpdates) {
+ msg += " and " + mergedDeletesAndUpdates.getNumDVUpdates() + " new field updates";
+ msg += " (" + mergedDeletesAndUpdates.ramBytesUsed.get() + ") bytes";
}
msg += " since merge started";
infoStream.message("IW", msg);
@@ -3731,7 +3836,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
merge.info.setBufferedDeletesGen(minGen);
- return holder.mergedDeletesAndUpdates;
+ return mergedDeletesAndUpdates;
}
synchronized private boolean commitMerge(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException {
@@ -3775,7 +3880,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
final ReadersAndUpdates mergedUpdates = merge.info.info.maxDoc() == 0 ? null : commitMergedDeletesAndUpdates(merge, mergeState);
-// System.out.println("[" + Thread.currentThread().getName() + "] IW.commitMerge: mergedDeletes=" + mergedDeletes);
// If the doc store we are using has been closed and
// is in now compound format (but wasn't when we
@@ -3922,9 +4026,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
try {
try {
mergeInit(merge);
- //if (merge.info != null) {
- //System.out.println("MERGE: " + merge.info.info.name);
- //}
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now merge\n merge=" + segString(merge.segments) + "\n index=" + segString());
@@ -4064,7 +4165,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
/** Does initial setup for a merge, which is fast but holds
* the synchronized lock on IndexWriter instance. */
- final synchronized void mergeInit(MergePolicy.OneMerge merge) throws IOException {
+ final void mergeInit(MergePolicy.OneMerge merge) throws IOException {
+
+ // Make sure any deletes that must be resolved before we commit the merge are complete:
+ bufferedUpdatesStream.waitApplyForMerge(merge.segments);
+
boolean success = false;
try {
_mergeInit(merge);
@@ -4110,29 +4215,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
infoStream.message("IW", "now apply deletes for " + merge.segments.size() + " merging segments");
}
- // Lock order: IW -> BD
- final BufferedUpdatesStream.ApplyDeletesResult result = bufferedUpdatesStream.applyDeletesAndUpdates(readerPool, merge.segments);
+ // Must move the pending doc values updates to disk now, else the newly merged segment will not see them:
+ // TODO: we could fix merging to pull the merged DV iterator so we don't have to move these updates to disk first, i.e. just carry them
+ // in memory:
+ readerPool.writeDocValuesUpdates(merge.segments);
- if (result.anyDeletes) {
- checkpoint();
- }
-
- if (!keepFullyDeletedSegments && result.allDeleted != null) {
- if (infoStream.isEnabled("IW")) {
- infoStream.message("IW", "drop 100% deleted segments: " + result.allDeleted);
- }
- for(SegmentCommitInfo info : result.allDeleted) {
- segmentInfos.remove(info);
- pendingNumDocs.addAndGet(-info.info.maxDoc());
- if (merge.segments.contains(info)) {
- mergingSegments.remove(info);
- merge.segments.remove(info);
- }
- readerPool.drop(info);
- }
- checkpoint();
- }
-
// Bind a new segment name here so even with
// ConcurrentMergePolicy we keep deterministic segment
// names.
@@ -4145,11 +4232,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
setDiagnostics(si, SOURCE_MERGE, details);
merge.setMergeInfo(new SegmentCommitInfo(si, 0, -1L, -1L, -1L));
-// System.out.println("[" + Thread.currentThread().getName() + "] IW._mergeInit: " + segString(merge.segments) + " into " + si);
-
- // Lock order: IW -> BD
- bufferedUpdatesStream.prune(segmentInfos);
-
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "merge seg=" + merge.info.info.name + " " + segString(merge.segments));
}
@@ -4204,7 +4286,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
final int numSegments = merge.readers.size();
Throwable th = null;
- boolean drop = !suppressExceptions;
+ boolean drop = suppressExceptions == false;
for (int i = 0; i < numSegments; i++) {
final SegmentReader sr = merge.readers.get(i);
@@ -4278,59 +4360,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// commit merged deletes
final ReadersAndUpdates rld = readerPool.get(info, true);
- // Carefully pull the most recent live docs and reader
- SegmentReader reader;
- final Bits liveDocs;
- final int delCount;
-
- synchronized (this) {
- // Must sync to ensure BufferedDeletesStream cannot change liveDocs,
- // pendingDeleteCount and field updates while we pull a copy:
- reader = rld.getReaderForMerge(context);
- liveDocs = rld.getReadOnlyLiveDocs();
- delCount = rld.getPendingDeleteCount() + info.getDelCount();
-
- assert reader != null;
- assert rld.verifyDocCounts();
-
- if (infoStream.isEnabled("IW")) {
- if (rld.getPendingDeleteCount() != 0) {
- infoStream.message("IW", "seg=" + segString(info) + " delCount=" + info.getDelCount() + " pendingDelCount=" + rld.getPendingDeleteCount());
- } else if (info.getDelCount() != 0) {
- infoStream.message("IW", "seg=" + segString(info) + " delCount=" + info.getDelCount());
- } else {
- infoStream.message("IW", "seg=" + segString(info) + " no deletes");
- }
- }
- }
-
- // Deletes might have happened after we pulled the merge reader and
- // before we got a read-only copy of the segment's actual live docs
- // (taking pending deletes into account). In that case we need to
- // make a new reader with updated live docs and del count.
- if (reader.numDeletedDocs() != delCount) {
- // fix the reader's live docs and del count
- assert delCount > reader.numDeletedDocs(); // beware of zombies
-
- SegmentReader newReader;
+ SegmentReader reader = rld.getReaderForMerge(context);
+ int delCount = reader.numDeletedDocs();
- synchronized (this) {
- // We must also sync on IW here, because another thread could be writing
- // new DV updates / remove old gen field infos files causing FNFE:
- newReader = new SegmentReader(info, reader, liveDocs, info.info.maxDoc() - delCount);
- }
-
- boolean released = false;
- try {
- rld.release(reader);
- released = true;
- } finally {
- if (!released) {
- newReader.decRef();
- }
- }
-
- reader = newReader;
+ if (infoStream.isEnabled("IW")) {
+ infoStream.message("IW", "seg=" + segString(info) + " reader=" + reader);
}
merge.readers.add(reader);
@@ -4338,8 +4372,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
segUpto++;
}
-// System.out.println("[" + Thread.currentThread().getName() + "] IW.mergeMiddle: merging " + merge.getMergeReaders());
-
// Let the merge wrap readers
List<CodecReader> mergeReaders = new ArrayList<>();
for (SegmentReader reader : merge.readers) {
@@ -4411,7 +4443,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Very important to do this before opening the reader
// because codec must know if prox was written for
// this segment:
- //System.out.println("merger set hasProx=" + merger.hasProx() + " seg=" + merge.info.name);
boolean useCompoundFile;
synchronized (this) { // Guard segmentInfos
useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, merge.info, this);
@@ -4601,7 +4632,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
}
- private boolean keepFullyDeletedSegments;
+ boolean keepFullyDeletedSegments;
/** Only for testing.
*
@@ -4714,8 +4745,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
infoStream.message("IW", "startCommit: wrote pending segments file \"" + IndexFileNames.fileNameFromGeneration(IndexFileNames.PENDING_SEGMENTS, "", toSync.getGeneration()) + "\"");
}
- //System.out.println("DONE prepareCommit");
-
pendingCommitSet = true;
pendingCommit = toSync;
}
@@ -4863,9 +4892,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
synchronized boolean nrtIsCurrent(SegmentInfos infos) {
- //System.out.println("IW.nrtIsCurrent " + (infos.version == segmentInfos.version && !docWriter.anyChanges() && !bufferedDeletesStream.any()));
ensureOpen();
- boolean isCurrent = infos.getVersion() == segmentInfos.getVersion() && !docWriter.anyChanges() && !bufferedUpdatesStream.any();
+ boolean isCurrent = infos.getVersion() == segmentInfos.getVersion()
+ && docWriter.anyChanges() == false
+ && bufferedUpdatesStream.any() == false
+ && readerPool.anyChanges() == false;
if (infoStream.isEnabled("IW")) {
if (isCurrent == false) {
infoStream.message("IW", "nrtIsCurrent: infoVersion matches: " + (infos.getVersion() == segmentInfos.getVersion()) + "; DW changes: " + docWriter.anyChanges() + "; BD changes: "+ bufferedUpdatesStream.any());
@@ -4977,9 +5008,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
try {
purge(forcePurge);
} finally {
- if (applyAllDeletesAndUpdates()) {
- maybeMerge(config.getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
- }
flushCount.incrementAndGet();
}
}
@@ -5017,20 +5045,22 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
}
- private boolean processEvents(boolean triggerMerge, boolean forcePurge) throws IOException {
- return processEvents(eventQueue, triggerMerge, forcePurge);
+ private void processEvents(boolean triggerMerge, boolean forcePurge) throws IOException {
+ processEvents(eventQueue, triggerMerge, forcePurge);
+ if (triggerMerge) {
+ maybeMerge(getConfig().getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
+ }
}
- private boolean processEvents(Queue<Event> queue, boolean triggerMerge, boolean forcePurge) throws IOException {
+ private void processEvents(Queue<Event> queue, boolean triggerMerge, boolean forcePurge) throws IOException {
boolean processed = false;
if (tragedy == null) {
Event event;
- while((event = queue.poll()) != null) {
+ while ((event = queue.poll()) != null) {
processed = true;
event.process(this, triggerMerge, forcePurge);
}
}
- return processed;
}
/**
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
index 0fdbc3e..1377a95 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
@@ -91,8 +91,12 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
*/
public final static double DEFAULT_RAM_BUFFER_SIZE_MB = 16.0;
- /** Default setting for {@link #setReaderPooling}. */
- public final static boolean DEFAULT_READER_POOLING = false;
+ /** Default setting (true) for {@link #setReaderPooling}. */
+ // We changed this default to true with concurrent deletes/updates (LUCENE-7868),
+ // because we will otherwise need to open and close segment readers more frequently.
+ // False is still supported, but will have worse performance since readers will
+ // be forced to aggressively move all state to disk.
+ public final static boolean DEFAULT_READER_POOLING = true;
/** Default value is 1945. Change using {@link #setRAMPerThreadHardLimitMB(int)} */
public static final int DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB = 1945;
@@ -323,7 +327,6 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
* Expert: Controls when segments are flushed to disk during indexing.
* The {@link FlushPolicy} initialized during {@link IndexWriter} instantiation and once initialized
* the given instance is bound to this {@link IndexWriter} and should not be used with another writer.
- * @see #setMaxBufferedDeleteTerms(int)
* @see #setMaxBufferedDocs(int)
* @see #setRAMBufferSizeMB(double)
*/
@@ -375,11 +378,6 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
}
@Override
- public int getMaxBufferedDeleteTerms() {
- return super.getMaxBufferedDeleteTerms();
- }
-
- @Override
public int getMaxBufferedDocs() {
return super.getMaxBufferedDocs();
}
@@ -425,11 +423,6 @@ public final class IndexWriterConfig extends LiveIndexWriterConfig {
}
@Override
- public IndexWriterConfig setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
- return (IndexWriterConfig) super.setMaxBufferedDeleteTerms(maxBufferedDeleteTerms);
- }
-
- @Override
public IndexWriterConfig setMaxBufferedDocs(int maxBufferedDocs) {
return (IndexWriterConfig) super.setMaxBufferedDocs(maxBufferedDocs);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
index d9e1bc7..cff1074 100644
--- a/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
+++ b/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
@@ -42,7 +42,6 @@ public class LiveIndexWriterConfig {
private volatile int maxBufferedDocs;
private volatile double ramBufferSizeMB;
- private volatile int maxBufferedDeleteTerms;
private volatile IndexReaderWarmer mergedSegmentWarmer;
// modified by IndexWriterConfig
@@ -109,7 +108,6 @@ public class LiveIndexWriterConfig {
this.analyzer = analyzer;
ramBufferSizeMB = IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB;
maxBufferedDocs = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DOCS;
- maxBufferedDeleteTerms = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DELETE_TERMS;
mergedSegmentWarmer = null;
delPolicy = new KeepOnlyLastCommitDeletionPolicy();
commit = null;
@@ -136,43 +134,6 @@ public class LiveIndexWriterConfig {
}
/**
- * Determines the maximum number of delete-by-term operations that will be
- * buffered before both the buffered in-memory delete terms and queries are
- * applied and flushed.
- * <p>
- * Disabled by default (writer flushes by RAM usage).
- * <p>
- * NOTE: This setting won't trigger a segment flush.
- *
- * <p>
- * Takes effect immediately, but only the next time a document is added,
- * updated or deleted. Also, if you only delete-by-query, this setting has no
- * effect, i.e. delete queries are buffered until the next segment is flushed.
- *
- * @throws IllegalArgumentException
- * if maxBufferedDeleteTerms is enabled but smaller than 1
- *
- * @see #setRAMBufferSizeMB
- */
- public LiveIndexWriterConfig setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
- if (maxBufferedDeleteTerms != IndexWriterConfig.DISABLE_AUTO_FLUSH && maxBufferedDeleteTerms < 1) {
- throw new IllegalArgumentException("maxBufferedDeleteTerms must at least be 1 when enabled");
- }
- this.maxBufferedDeleteTerms = maxBufferedDeleteTerms;
- return this;
- }
-
- /**
- * Returns the number of buffered deleted terms that will trigger a flush of all
- * buffered deletes if enabled.
- *
- * @see #setMaxBufferedDeleteTerms(int)
- */
- public int getMaxBufferedDeleteTerms() {
- return maxBufferedDeleteTerms;
- }
-
- /**
* Determines the amount of RAM that may be used for buffering added documents
* and deletions before they are flushed to the Directory. Generally for
* faster indexing performance it's best to flush by RAM usage instead of
@@ -195,12 +156,8 @@ public class LiveIndexWriterConfig {
* <b>NOTE</b>: the account of RAM usage for pending deletions is only
* approximate. Specifically, if you delete by Query, Lucene currently has no
* way to measure the RAM usage of individual Queries so the accounting will
- * under-estimate and you should compensate by either calling commit()
- * periodically yourself, or by using {@link #setMaxBufferedDeleteTerms(int)}
- * to flush and apply buffered deletes by count instead of RAM usage (for each
- * buffered delete Query a constant number of bytes is used to estimate RAM
- * usage). Note that enabling {@link #setMaxBufferedDeleteTerms(int)} will not
- * trigger any segment flushes.
+ * under-estimate and you should compensate by either calling commit() or refresh()
+ * periodically yourself.
* <p>
* <b>NOTE</b>: It's not guaranteed that all memory resident documents are
* flushed once this limit is exceeded. Depending on the configured
@@ -476,7 +433,6 @@ public class LiveIndexWriterConfig {
sb.append("analyzer=").append(analyzer == null ? "null" : analyzer.getClass().getName()).append("\n");
sb.append("ramBufferSizeMB=").append(getRAMBufferSizeMB()).append("\n");
sb.append("maxBufferedDocs=").append(getMaxBufferedDocs()).append("\n");
- sb.append("maxBufferedDeleteTerms=").append(getMaxBufferedDeleteTerms()).append("\n");
sb.append("mergedSegmentWarmer=").append(getMergedSegmentWarmer()).append("\n");
sb.append("delPolicy=").append(getIndexDeletionPolicy().getClass().getName()).append("\n");
IndexCommit commit = getIndexCommit();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/MergedPrefixCodedTermsIterator.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergedPrefixCodedTermsIterator.java b/lucene/core/src/java/org/apache/lucene/index/MergedPrefixCodedTermsIterator.java
deleted file mode 100644
index cd14eec..0000000
--- a/lucene/core/src/java/org/apache/lucene/index/MergedPrefixCodedTermsIterator.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.lucene.index;
-
-
-import java.util.List;
-
-import org.apache.lucene.index.PrefixCodedTerms.TermIterator;
-import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.util.PriorityQueue;
-
-/** Merges multiple {@link FieldTermIterator}s */
-class MergedPrefixCodedTermsIterator extends FieldTermIterator {
-
- private static class TermMergeQueue extends PriorityQueue<TermIterator> {
- TermMergeQueue(int size) {
- super(size);
- }
-
- @Override
- protected boolean lessThan(TermIterator a, TermIterator b) {
- int cmp = a.bytes.compareTo(b.bytes);
- if (cmp < 0) {
- return true;
- } else if (cmp > 0) {
- return false;
- } else {
- return a.delGen() > b.delGen();
- }
- }
- }
-
- private static class FieldMergeQueue extends PriorityQueue<TermIterator> {
- FieldMergeQueue(int size) {
- super(size);
- }
-
- @Override
- protected boolean lessThan(TermIterator a, TermIterator b) {
- return a.field.compareTo(b.field) < 0;
- }
- }
-
- final TermMergeQueue termQueue;
- final FieldMergeQueue fieldQueue;
-
- public MergedPrefixCodedTermsIterator(List<PrefixCodedTerms> termsList) {
- fieldQueue = new FieldMergeQueue(termsList.size());
- for (PrefixCodedTerms terms : termsList) {
- TermIterator iter = terms.iterator();
- iter.next();
- if (iter.field != null) {
- fieldQueue.add(iter);
- }
- }
-
- termQueue = new TermMergeQueue(termsList.size());
- }
-
- String field;
-
- @Override
- public BytesRef next() {
- if (termQueue.size() == 0) {
- // No more terms in current field:
- if (fieldQueue.size() == 0) {
- // No more fields:
- field = null;
- return null;
- }
-
- // Transfer all iterators on the next field into the term queue:
- TermIterator top = fieldQueue.pop();
- termQueue.add(top);
- field = top.field;
- assert field != null;
-
- while (fieldQueue.size() != 0 && fieldQueue.top().field.equals(top.field)) {
- TermIterator iter = fieldQueue.pop();
- assert iter.field.equals(field);
- // TODO: a little bit evil; we do this so we can == on field down below:
- iter.field = field;
- termQueue.add(iter);
- }
-
- return termQueue.top().bytes;
- } else {
- TermIterator top = termQueue.top();
- if (top.next() == null) {
- termQueue.pop();
- } else if (top.field() != field) {
- // Field changed
- termQueue.pop();
- fieldQueue.add(top);
- } else {
- termQueue.updateTop();
- }
- if (termQueue.size() == 0) {
- // Recurse (just once) to go to next field:
- return next();
- } else {
- // Still terms left in this field
- return termQueue.top().bytes;
- }
- }
- }
-
- @Override
- public String field() {
- return field;
- }
-
- @Override
- public long delGen() {
- return termQueue.top().delGen();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java b/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java
index 4dd3cd0..a42754d 100644
--- a/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java
+++ b/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java
@@ -19,6 +19,7 @@ package org.apache.lucene.index;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.InPlaceMergeSorter;
+import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.packed.PackedInts;
import org.apache.lucene.util.packed.PagedGrowableWriter;
import org.apache.lucene.util.packed.PagedMutable;
@@ -40,11 +41,13 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
private long idx = 0; // long so we don't overflow if size == Integer.MAX_VALUE
private int doc = -1;
private Long value = null;
+ private final long delGen;
- Iterator(int size, PagedGrowableWriter values, PagedMutable docs) {
+ Iterator(int size, PagedGrowableWriter values, PagedMutable docs, long delGen) {
this.size = size;
this.values = values;
this.docs = docs;
+ this.delGen = delGen;
}
@Override
@@ -61,6 +64,7 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
doc = (int) docs.get(idx);
++idx;
while (idx < size && docs.get(idx) == doc) {
+ // scan forward to last update to this doc
++idx;
}
// idx points to the "next" element
@@ -72,12 +76,10 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
int doc() {
return doc;
}
-
+
@Override
- void reset() {
- doc = -1;
- value = null;
- idx = 0;
+ long delGen() {
+ return delGen;
}
}
@@ -86,16 +88,26 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
private PagedGrowableWriter values;
private int size;
- public NumericDocValuesFieldUpdates(String field, int maxDoc) {
- super(field, DocValuesType.NUMERIC);
+ public NumericDocValuesFieldUpdates(long delGen, String field, int maxDoc) {
+ super(maxDoc, delGen, field, DocValuesType.NUMERIC);
bitsPerValue = PackedInts.bitsRequired(maxDoc - 1);
docs = new PagedMutable(1, PAGE_SIZE, bitsPerValue, PackedInts.COMPACT);
values = new PagedGrowableWriter(1, PAGE_SIZE, 1, PackedInts.FAST);
- size = 0;
}
-
+
+ @Override
+ public int size() {
+ return size;
+ }
+
@Override
- public void add(int doc, Object value) {
+ public synchronized void add(int doc, Object value) {
+ if (finished) {
+ throw new IllegalStateException("already finished");
+ }
+
+ assert doc < maxDoc;
+
// TODO: if the Sorter interface changes to take long indexes, we can remove that limitation
if (size == Integer.MAX_VALUE) {
throw new IllegalStateException("cannot support more than Integer.MAX_VALUE doc/value entries");
@@ -113,11 +125,20 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
values.set(size, val.longValue());
++size;
}
-
+
@Override
- public Iterator iterator() {
- final PagedMutable docs = this.docs;
- final PagedGrowableWriter values = this.values;
+ public void finish() {
+ if (finished) {
+ throw new IllegalStateException("already finished");
+ }
+ finished = true;
+
+ // shrink wrap
+ if (size < docs.size()) {
+ docs = docs.resize(size);
+ values = values.resize(size);
+ }
+
new InPlaceMergeSorter() {
@Override
protected void swap(int i, int j) {
@@ -129,48 +150,36 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
values.set(j, values.get(i));
values.set(i, tmpVal);
}
-
+
@Override
protected int compare(int i, int j) {
- int x = (int) docs.get(i);
- int y = (int) docs.get(j);
- return (x < y) ? -1 : ((x == y) ? 0 : 1);
+ // increasing docID order:
+ // NOTE: we can have ties here, when the same docID was updated in the same segment, in which case we rely on sort being
+ // stable and preserving original order so the last update to that docID wins
+ return Integer.compare((int) docs.get(i), (int) docs.get(j));
}
}.sort(0, size);
-
- return new Iterator(size, values, docs);
}
-
+
@Override
- public void merge(DocValuesFieldUpdates other) {
- assert other instanceof NumericDocValuesFieldUpdates;
- NumericDocValuesFieldUpdates otherUpdates = (NumericDocValuesFieldUpdates) other;
- if (otherUpdates.size > Integer.MAX_VALUE - size) {
- throw new IllegalStateException(
- "cannot support more than Integer.MAX_VALUE doc/value entries; size="
- + size + " other.size=" + otherUpdates.size);
- }
- docs = docs.grow(size + otherUpdates.size);
- values = values.grow(size + otherUpdates.size);
- for (int i = 0; i < otherUpdates.size; i++) {
- int doc = (int) otherUpdates.docs.get(i);
- docs.set(size, doc);
- values.set(size, otherUpdates.values.get(i));
- ++size;
+ public Iterator iterator() {
+ if (finished == false) {
+ throw new IllegalStateException("call finish first");
}
+ return new Iterator(size, values, docs, delGen);
}
-
+
@Override
public boolean any() {
return size > 0;
}
@Override
- public long ramBytesPerDoc() {
- long bytesPerDoc = (long) Math.ceil((double) (bitsPerValue) / 8);
- final int capacity = estimateCapacity(size);
- bytesPerDoc += (long) Math.ceil((double) values.ramBytesUsed() / capacity); // values
- return bytesPerDoc;
+ public long ramBytesUsed() {
+ return values.ramBytesUsed()
+ + docs.ramBytesUsed()
+ + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER
+ + 2 * RamUsageEstimator.NUM_BYTES_INT
+ + 2 * RamUsageEstimator.NUM_BYTES_OBJECT_REF;
}
-
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/PrefixCodedTerms.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/PrefixCodedTerms.java b/lucene/core/src/java/org/apache/lucene/index/PrefixCodedTerms.java
index df1653b..ba56c2a 100644
--- a/lucene/core/src/java/org/apache/lucene/index/PrefixCodedTerms.java
+++ b/lucene/core/src/java/org/apache/lucene/index/PrefixCodedTerms.java
@@ -129,7 +129,7 @@ public class PrefixCodedTerms implements Accountable {
private TermIterator(long delGen, RAMFile buffer) {
try {
- input = new RAMInputStream("MergedPrefixCodedTermsIterator", buffer);
+ input = new RAMInputStream("PrefixCodedTermsIterator", buffer);
} catch (IOException e) {
throw new RuntimeException(e);
}