You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2017/06/25 02:06:33 UTC
[11/47] lucene-solr:feature/autoscaling: LUCENE-7868: use multiple
threads to concurrently resolve deletes and DV udpates
LUCENE-7868: use multiple threads to concurrently resolve deletes and DV udpates
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/58105a20
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/58105a20
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/58105a20
Branch: refs/heads/feature/autoscaling
Commit: 58105a203a19d18a56e09cf69dc0083c1b890315
Parents: 1737fce
Author: Mike McCandless <mi...@apache.org>
Authored: Wed Jun 21 13:47:15 2017 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Wed Jun 21 13:47:15 2017 -0400
----------------------------------------------------------------------
lucene/CHANGES.txt | 7 +
.../index/BinaryDocValuesFieldUpdates.java | 101 +--
.../apache/lucene/index/BufferedUpdates.java | 53 +-
.../lucene/index/BufferedUpdatesStream.java | 698 ++++-----------
.../apache/lucene/index/CoalescedUpdates.java | 109 ---
.../lucene/index/DocValuesFieldUpdates.java | 190 +++--
.../apache/lucene/index/DocValuesUpdate.java | 3 -
.../apache/lucene/index/DocumentsWriter.java | 66 +-
.../index/DocumentsWriterDeleteQueue.java | 29 +-
.../index/DocumentsWriterFlushControl.java | 21 +-
.../lucene/index/DocumentsWriterFlushQueue.java | 33 +-
.../lucene/index/DocumentsWriterPerThread.java | 35 +-
.../index/DocumentsWriterPerThreadPool.java | 11 +-
.../lucene/index/FlushByRamOrCountsPolicy.java | 29 +-
.../org/apache/lucene/index/FlushPolicy.java | 3 -
.../lucene/index/FreqProxTermsWriter.java | 5 +-
.../lucene/index/FrozenBufferedUpdates.java | 841 +++++++++++++++++--
.../apache/lucene/index/IndexFileDeleter.java | 7 +-
.../org/apache/lucene/index/IndexWriter.java | 776 +++++++++--------
.../apache/lucene/index/IndexWriterConfig.java | 19 +-
.../lucene/index/LiveIndexWriterConfig.java | 48 +-
.../index/MergedPrefixCodedTermsIterator.java | 132 ---
.../index/NumericDocValuesFieldUpdates.java | 97 ++-
.../apache/lucene/index/PrefixCodedTerms.java | 2 +-
.../apache/lucene/index/ReadersAndUpdates.java | 413 ++++++---
.../apache/lucene/index/SegmentCommitInfo.java | 16 +-
.../apache/lucene/index/SegmentCoreReaders.java | 2 -
.../org/apache/lucene/index/SegmentInfo.java | 2 +-
.../org/apache/lucene/index/SegmentReader.java | 19 +-
.../lucene/index/SerialMergeScheduler.java | 4 +-
.../apache/lucene/index/TieredMergePolicy.java | 73 +-
.../util/packed/AbstractPagedMutable.java | 2 +-
.../index/TestBinaryDocValuesUpdates.java | 146 +++-
.../index/TestDocumentsWriterDeleteQueue.java | 39 +-
.../index/TestFlushByRamOrCountsPolicy.java | 12 +-
.../lucene/index/TestForceMergeForever.java | 7 +-
.../apache/lucene/index/TestIndexWriter.java | 61 --
.../lucene/index/TestIndexWriterConfig.java | 7 +-
.../lucene/index/TestIndexWriterDelete.java | 153 ++--
.../lucene/index/TestIndexWriterExceptions.java | 253 ------
.../lucene/index/TestIndexWriterReader.java | 1 +
.../lucene/index/TestNRTReaderWithThreads.java | 8 +-
.../index/TestNumericDocValuesUpdates.java | 314 +++++--
.../lucene/index/TestPerSegmentDeletes.java | 7 +-
.../lucene/index/TestPrefixCodedTerms.java | 76 --
.../TestControlledRealTimeReopenThread.java | 4 +-
.../apache/lucene/search/join/TestJoinUtil.java | 16 +-
.../idversion/IDVersionPostingsWriter.java | 4 +-
.../idversion/VersionBlockTreeTermsWriter.java | 4 +-
.../index/BaseDocValuesFormatTestCase.java | 24 +
.../index/BaseIndexFileFormatTestCase.java | 2 +-
.../org/apache/lucene/util/LuceneTestCase.java | 11 -
52 files changed, 2616 insertions(+), 2379 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/CHANGES.txt
----------------------------------------------------------------------
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index d695310..56da726 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -24,6 +24,10 @@ New Features
the term frequency to this value. (Uwe Schindler, Robert Muir, Mike
McCandless)
+* LUCENE-7868: Multiple threads can now resolve deletes and doc values
+ updates concurrently, giving sizable speedups in update-heavy
+ indexing use cases (Simon Willnauer, Mike McCandless)
+
API Changes
* LUCENE-2605: Classic QueryParser no longer splits on whitespace by default.
@@ -76,6 +80,9 @@ API Changes
* LUCENE-7872: TopDocs.totalHits is now a long. (Adrien Grand, hossman)
+* LUCENE-7868: IndexWriterConfig.setMaxBufferedDeleteTerms is
+ removed. (Simon Willnauer, Mike McCandless)
+
Bug Fixes
* LUCENE-7626: IndexWriter will no longer accept broken token offsets
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesFieldUpdates.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesFieldUpdates.java b/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesFieldUpdates.java
index f8cece9..e2700ea 100644
--- a/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesFieldUpdates.java
+++ b/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesFieldUpdates.java
@@ -22,6 +22,7 @@ import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.InPlaceMergeSorter;
+import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.packed.PackedInts;
import org.apache.lucene.util.packed.PagedGrowableWriter;
import org.apache.lucene.util.packed.PagedMutable;
@@ -35,22 +36,24 @@ import org.apache.lucene.util.packed.PagedMutable;
class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
final static class Iterator extends DocValuesFieldUpdates.Iterator {
- private final PagedGrowableWriter offsets;
private final int size;
+ private final PagedGrowableWriter offsets;
private final PagedGrowableWriter lengths;
private final PagedMutable docs;
private long idx = 0; // long so we don't overflow if size == Integer.MAX_VALUE
private int doc = -1;
private final BytesRef value;
private int offset, length;
+ private final long delGen;
Iterator(int size, PagedGrowableWriter offsets, PagedGrowableWriter lengths,
- PagedMutable docs, BytesRef values) {
+ PagedMutable docs, BytesRef values, long delGen) {
this.offsets = offsets;
this.size = size;
this.lengths = lengths;
this.docs = docs;
value = values.clone();
+ this.delGen = delGen;
}
@Override
@@ -69,6 +72,7 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
doc = (int) docs.get(idx);
++idx;
while (idx < size && docs.get(idx) == doc) {
+ // scan forward to last update to this doc
++idx;
}
// idx points to the "next" element
@@ -87,10 +91,8 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
}
@Override
- void reset() {
- doc = -1;
- offset = -1;
- idx = 0;
+ long delGen() {
+ return delGen;
}
}
@@ -100,18 +102,29 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
private int size;
private final int bitsPerValue;
- public BinaryDocValuesFieldUpdates(String field, int maxDoc) {
- super(field, DocValuesType.BINARY);
+ public BinaryDocValuesFieldUpdates(long delGen, String field, int maxDoc) {
+ super(maxDoc, delGen, field, DocValuesType.BINARY);
bitsPerValue = PackedInts.bitsRequired(maxDoc - 1);
docs = new PagedMutable(1, PAGE_SIZE, bitsPerValue, PackedInts.COMPACT);
offsets = new PagedGrowableWriter(1, PAGE_SIZE, 1, PackedInts.FAST);
lengths = new PagedGrowableWriter(1, PAGE_SIZE, 1, PackedInts.FAST);
values = new BytesRefBuilder();
- size = 0;
}
-
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ // NOTE: we fully consume the incoming BytesRef so caller is free to reuse it after we return:
@Override
- public void add(int doc, Object value) {
+ synchronized public void add(int doc, Object value) {
+ if (finished) {
+ throw new IllegalStateException("already finished");
+ }
+
+ assert doc < maxDoc: "doc=" + doc + " maxDoc=" + maxDoc;
+
// TODO: if the Sorter interface changes to take long indexes, we can remove that limitation
if (size == Integer.MAX_VALUE) {
throw new IllegalStateException("cannot support more than Integer.MAX_VALUE doc/value entries");
@@ -134,11 +147,19 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
}
@Override
- public Iterator iterator() {
- final PagedMutable docs = this.docs;
- final PagedGrowableWriter offsets = this.offsets;
- final PagedGrowableWriter lengths = this.lengths;
- final BytesRef values = this.values.get();
+ public void finish() {
+ if (finished) {
+ throw new IllegalStateException("already finished");
+ }
+ finished = true;
+
+ // shrink wrap
+ if (size < docs.size()) {
+ docs = docs.resize(size);
+ offsets = offsets.resize(size);
+ lengths = lengths.resize(size);
+ }
+
new InPlaceMergeSorter() {
@Override
protected void swap(int i, int j) {
@@ -157,36 +178,20 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
@Override
protected int compare(int i, int j) {
- int x = (int) docs.get(i);
- int y = (int) docs.get(j);
- return (x < y) ? -1 : ((x == y) ? 0 : 1);
+ // increasing docID order:
+ // NOTE: we can have ties here, when the same docID was updated in the same segment, in which case we rely on sort being
+ // stable and preserving original order so the last update to that docID wins
+ return Integer.compare((int) docs.get(i), (int) docs.get(j));
}
}.sort(0, size);
-
- return new Iterator(size, offsets, lengths, docs, values);
}
@Override
- public void merge(DocValuesFieldUpdates other) {
- BinaryDocValuesFieldUpdates otherUpdates = (BinaryDocValuesFieldUpdates) other;
- if (otherUpdates.size > Integer.MAX_VALUE - size) {
- throw new IllegalStateException(
- "cannot support more than Integer.MAX_VALUE doc/value entries; size="
- + size + " other.size=" + otherUpdates.size);
- }
- final int newSize = size + otherUpdates.size;
- docs = docs.grow(newSize);
- offsets = offsets.grow(newSize);
- lengths = lengths.grow(newSize);
- for (int i = 0; i < otherUpdates.size; i++) {
- int doc = (int) otherUpdates.docs.get(i);
- docs.set(size, doc);
- offsets.set(size, values.length() + otherUpdates.offsets.get(i)); // correct relative offset
- lengths.set(size, otherUpdates.lengths.get(i));
- ++size;
+ public Iterator iterator() {
+ if (finished == false) {
+ throw new IllegalStateException("call finish first");
}
-
- values.append(otherUpdates.values);
+ return new Iterator(size, offsets, lengths, docs, values.get(), delGen);
}
@Override
@@ -195,13 +200,13 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
}
@Override
- public long ramBytesPerDoc() {
- long bytesPerDoc = (long) Math.ceil((double) (bitsPerValue) / 8); // docs
- final int capacity = estimateCapacity(size);
- bytesPerDoc += (long) Math.ceil((double) offsets.ramBytesUsed() / capacity); // offsets
- bytesPerDoc += (long) Math.ceil((double) lengths.ramBytesUsed() / capacity); // lengths
- bytesPerDoc += (long) Math.ceil((double) values.length() / size); // values
- return bytesPerDoc;
+ public long ramBytesUsed() {
+ return offsets.ramBytesUsed()
+ + lengths.ramBytesUsed()
+ + docs.ramBytesUsed()
+ + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER
+ + 4 * RamUsageEstimator.NUM_BYTES_INT
+ + 5 * RamUsageEstimator.NUM_BYTES_OBJECT_REF
+ + values.bytes().length;
}
-
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java
index 1c3494f..a5a86e6 100644
--- a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java
+++ b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java
@@ -30,11 +30,12 @@ import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.RamUsageEstimator;
-/* Holds buffered deletes and updates, by docID, term or query for a
- * single segment. This is used to hold buffered pending
- * deletes and updates against the to-be-flushed segment. Once the
- * deletes and updates are pushed (on flush in DocumentsWriter), they
- * are converted to a FrozenBufferedUpdates instance. */
+/** Holds buffered deletes and updates, by docID, term or query for a
+ * single segment. This is used to hold buffered pending
+ * deletes and updates against the to-be-flushed segment. Once the
+ * deletes and updates are pushed (on flush in DocumentsWriter), they
+ * are converted to a {@link FrozenBufferedUpdates} instance and
+ * pushed to the {@link BufferedUpdatesStream}. */
// NOTE: instances of this class are accessed either via a private
// instance on DocumentWriterPerThread, or via sync'd code by
@@ -128,10 +129,9 @@ class BufferedUpdates {
final AtomicInteger numNumericUpdates = new AtomicInteger();
final AtomicInteger numBinaryUpdates = new AtomicInteger();
- // TODO: rename thes three: put "deleted" prefix in front:
- final Map<Term,Integer> terms = new HashMap<>();
- final Map<Query,Integer> queries = new HashMap<>();
- final List<Integer> docIDs = new ArrayList<>();
+ final Map<Term,Integer> deleteTerms = new HashMap<>();
+ final Map<Query,Integer> deleteQueries = new HashMap<>();
+ final List<Integer> deleteDocIDs = new ArrayList<>();
// Map<dvField,Map<updateTerm,NumericUpdate>>
// For each field we keep an ordered list of NumericUpdates, key'd by the
@@ -169,19 +169,19 @@ class BufferedUpdates {
@Override
public String toString() {
if (VERBOSE_DELETES) {
- return "gen=" + gen + " numTerms=" + numTermDeletes + ", terms=" + terms
- + ", queries=" + queries + ", docIDs=" + docIDs + ", numericUpdates=" + numericUpdates
+ return "gen=" + gen + " numTerms=" + numTermDeletes + ", deleteTerms=" + deleteTerms
+ + ", deleteQueries=" + deleteQueries + ", deleteDocIDs=" + deleteDocIDs + ", numericUpdates=" + numericUpdates
+ ", binaryUpdates=" + binaryUpdates + ", bytesUsed=" + bytesUsed;
} else {
String s = "gen=" + gen;
if (numTermDeletes.get() != 0) {
- s += " " + numTermDeletes.get() + " deleted terms (unique count=" + terms.size() + ")";
+ s += " " + numTermDeletes.get() + " deleted terms (unique count=" + deleteTerms.size() + ")";
}
- if (queries.size() != 0) {
- s += " " + queries.size() + " deleted queries";
+ if (deleteQueries.size() != 0) {
+ s += " " + deleteQueries.size() + " deleted queries";
}
- if (docIDs.size() != 0) {
- s += " " + docIDs.size() + " deleted docIDs";
+ if (deleteDocIDs.size() != 0) {
+ s += " " + deleteDocIDs.size() + " deleted docIDs";
}
if (numNumericUpdates.get() != 0) {
s += " " + numNumericUpdates.get() + " numeric updates (unique count=" + numericUpdates.size() + ")";
@@ -198,7 +198,7 @@ class BufferedUpdates {
}
public void addQuery(Query query, int docIDUpto) {
- Integer current = queries.put(query, docIDUpto);
+ Integer current = deleteQueries.put(query, docIDUpto);
// increment bytes used only if the query wasn't added so far.
if (current == null) {
bytesUsed.addAndGet(BYTES_PER_DEL_QUERY);
@@ -206,12 +206,12 @@ class BufferedUpdates {
}
public void addDocID(int docID) {
- docIDs.add(Integer.valueOf(docID));
+ deleteDocIDs.add(Integer.valueOf(docID));
bytesUsed.addAndGet(BYTES_PER_DEL_DOCID);
}
public void addTerm(Term term, int docIDUpto) {
- Integer current = terms.get(term);
+ Integer current = deleteTerms.get(term);
if (current != null && docIDUpto < current) {
// Only record the new number if it's greater than the
// current one. This is important because if multiple
@@ -223,7 +223,7 @@ class BufferedUpdates {
return;
}
- terms.put(term, Integer.valueOf(docIDUpto));
+ deleteTerms.put(term, Integer.valueOf(docIDUpto));
// note that if current != null then it means there's already a buffered
// delete on that term, therefore we seem to over-count. this over-counting
// is done to respect IndexWriterConfig.setMaxBufferedDeleteTerms.
@@ -290,11 +290,16 @@ class BufferedUpdates {
bytesUsed.addAndGet(BYTES_PER_BINARY_UPDATE_ENTRY + update.sizeInBytes());
}
}
+
+ void clearDeleteTerms() {
+ deleteTerms.clear();
+ numTermDeletes.set(0);
+ }
void clear() {
- terms.clear();
- queries.clear();
- docIDs.clear();
+ deleteTerms.clear();
+ deleteQueries.clear();
+ deleteDocIDs.clear();
numericUpdates.clear();
binaryUpdates.clear();
numTermDeletes.set(0);
@@ -304,6 +309,6 @@ class BufferedUpdates {
}
boolean any() {
- return terms.size() > 0 || docIDs.size() > 0 || queries.size() > 0 || numericUpdates.size() > 0 || binaryUpdates.size() > 0;
+ return deleteTerms.size() > 0 || deleteDocIDs.size() > 0 || deleteQueries.size() > 0 || numericUpdates.size() > 0 || binaryUpdates.size() > 0;
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
index 9955626..8c0a2be 100644
--- a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
+++ b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
@@ -14,40 +14,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.lucene.index;
+package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
+import java.util.HashSet;
import java.util.List;
import java.util.Locale;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.lucene.search.DocIdSetIterator;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.Scorer;
-import org.apache.lucene.search.Weight;
+import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Accountable;
-import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
-import org.apache.lucene.util.PriorityQueue;
-/* Tracks the stream of {@link BufferedDeletes}.
+/** Tracks the stream of {@link FrozenBufferedUpdates}.
* When DocumentsWriterPerThread flushes, its buffered
- * deletes and updates are appended to this stream. We later
- * apply them (resolve them to the actual
- * docIDs, per segment) when a merge is started
- * (only to the to-be-merged segments). We
- * also apply to all segments when NRT reader is pulled,
- * commit/close is called, or when too many deletes or updates are
+ * deletes and updates are appended to this stream and immediately
+ * resolved (to actual docIDs, per segment) using the indexing
+ * thread that triggered the flush for concurrency. When a
+ * merge kicks off, we sync to ensure all resolving packets
+ * complete. We also apply to all segments when NRT reader is pulled,
+ * commit/close is called, or when too many deletes or updates are
* buffered and must be flushed (by RAM usage or by count).
*
* Each packet is assigned a generation, and each flushed or
@@ -57,23 +50,24 @@ import org.apache.lucene.util.PriorityQueue;
class BufferedUpdatesStream implements Accountable {
- // TODO: maybe linked list?
- private final List<FrozenBufferedUpdates> updates = new ArrayList<>();
+ private final Set<FrozenBufferedUpdates> updates = new HashSet<>();
// Starts at 1 so that SegmentInfos that have never had
// deletes applied (whose bufferedDelGen defaults to 0)
// will be correct:
private long nextGen = 1;
- // used only by assert
- private BytesRef lastDeleteTerm;
-
+ private final FinishedSegments finishedSegments;
private final InfoStream infoStream;
private final AtomicLong bytesUsed = new AtomicLong();
private final AtomicInteger numTerms = new AtomicInteger();
+ private final IndexWriter writer;
+ private boolean closed;
- public BufferedUpdatesStream(InfoStream infoStream) {
- this.infoStream = infoStream;
+ public BufferedUpdatesStream(IndexWriter writer) {
+ this.writer = writer;
+ this.infoStream = writer.infoStream;
+ this.finishedSegments = new FinishedSegments(infoStream);
}
// Appends a new packet of buffered deletes to the stream,
@@ -89,21 +83,27 @@ class BufferedUpdatesStream implements Accountable {
packet.setDelGen(nextGen++);
assert packet.any();
assert checkDeleteStats();
- assert packet.delGen() < nextGen;
- assert updates.isEmpty() || updates.get(updates.size()-1).delGen() < packet.delGen() : "Delete packets must be in order";
+
updates.add(packet);
numTerms.addAndGet(packet.numTermDeletes);
bytesUsed.addAndGet(packet.bytesUsed);
if (infoStream.isEnabled("BD")) {
- infoStream.message("BD", "push deletes " + packet + " segmentPrivate?=" + packet.isSegmentPrivate + " delGen=" + packet.delGen() + " packetCount=" + updates.size() + " totBytesUsed=" + bytesUsed.get());
+ infoStream.message("BD", String.format(Locale.ROOT, "push new packet (%s), packetCount=%d, bytesUsed=%.3f MB", packet, updates.size(), bytesUsed.get()/1024./1024.));
}
assert checkDeleteStats();
+
return packet.delGen();
}
+ public synchronized int getPendingUpdatesCount() {
+ return updates.size();
+ }
+
+ /** Only used by IW.rollback */
public synchronized void clear() {
updates.clear();
nextGen = 1;
+ finishedSegments.clear();
numTerms.set(0);
bytesUsed.set(0);
}
@@ -121,253 +121,148 @@ class BufferedUpdatesStream implements Accountable {
return bytesUsed.get();
}
+ private synchronized void ensureOpen() {
+ if (closed) {
+ throw new AlreadyClosedException("already closed");
+ }
+ }
+
public static class ApplyDeletesResult {
// True if any actual deletes took place:
public final boolean anyDeletes;
- // Current gen, for the merged segment:
- public final long gen;
-
// If non-null, contains segments that are 100% deleted
public final List<SegmentCommitInfo> allDeleted;
- ApplyDeletesResult(boolean anyDeletes, long gen, List<SegmentCommitInfo> allDeleted) {
+ ApplyDeletesResult(boolean anyDeletes, List<SegmentCommitInfo> allDeleted) {
this.anyDeletes = anyDeletes;
- this.gen = gen;
this.allDeleted = allDeleted;
}
}
- // Sorts SegmentInfos from smallest to biggest bufferedDelGen:
- private static final Comparator<SegmentCommitInfo> sortSegInfoByDelGen = new Comparator<SegmentCommitInfo>() {
- @Override
- public int compare(SegmentCommitInfo si1, SegmentCommitInfo si2) {
- return Long.compare(si1.getBufferedDeletesGen(), si2.getBufferedDeletesGen());
- }
- };
-
- /** Resolves the buffered deleted Term/Query/docIDs, into
- * actual deleted docIDs in the liveDocs MutableBits for
- * each SegmentReader. */
- public synchronized ApplyDeletesResult applyDeletesAndUpdates(IndexWriter.ReaderPool pool, List<SegmentCommitInfo> infos) throws IOException {
- final long t0 = System.currentTimeMillis();
+ /** Waits for all in-flight packets, which are already being resolved concurrently
+ * by indexing threads, to finish. Returns true if there were any
+ * new deletes or updates. This is called for refresh, commit. */
+ public void waitApplyAll() throws IOException {
- final long gen = nextGen++;
+ assert Thread.holdsLock(writer) == false;
+
+ final long t0 = System.nanoTime();
- if (infos.size() == 0) {
- return new ApplyDeletesResult(false, gen, null);
+ Set<FrozenBufferedUpdates> waitFor;
+ synchronized (this) {
+ waitFor = new HashSet<>(updates);
}
- // We only init these on demand, when we find our first deletes that need to be applied:
- SegmentState[] segStates = null;
-
- long totDelCount = 0;
- long totTermVisitedCount = 0;
-
- boolean success = false;
-
- ApplyDeletesResult result = null;
-
- try {
- if (infoStream.isEnabled("BD")) {
- infoStream.message("BD", String.format(Locale.ROOT, "applyDeletes: open segment readers took %d msec", System.currentTimeMillis()-t0));
- }
-
- assert checkDeleteStats();
-
- if (!any()) {
- if (infoStream.isEnabled("BD")) {
- infoStream.message("BD", "applyDeletes: no segments; skipping");
- }
- return new ApplyDeletesResult(false, gen, null);
- }
-
- if (infoStream.isEnabled("BD")) {
- infoStream.message("BD", "applyDeletes: infos=" + infos + " packetCount=" + updates.size());
- }
-
- infos = sortByDelGen(infos);
-
- CoalescedUpdates coalescedUpdates = null;
- int infosIDX = infos.size()-1;
- int delIDX = updates.size()-1;
-
- // Backwards merge sort the segment delGens with the packet delGens in the buffered stream:
- while (infosIDX >= 0) {
- final FrozenBufferedUpdates packet = delIDX >= 0 ? updates.get(delIDX) : null;
- final SegmentCommitInfo info = infos.get(infosIDX);
- final long segGen = info.getBufferedDeletesGen();
-
- if (packet != null && segGen < packet.delGen()) {
- if (!packet.isSegmentPrivate && packet.any()) {
- /*
- * Only coalesce if we are NOT on a segment private del packet: the segment private del packet
- * must only apply to segments with the same delGen. Yet, if a segment is already deleted
- * from the SI since it had no more documents remaining after some del packets younger than
- * its segPrivate packet (higher delGen) have been applied, the segPrivate packet has not been
- * removed.
- */
- if (coalescedUpdates == null) {
- coalescedUpdates = new CoalescedUpdates();
- }
- coalescedUpdates.update(packet);
- }
-
- delIDX--;
- } else if (packet != null && segGen == packet.delGen()) {
- assert packet.isSegmentPrivate : "Packet and Segments deletegen can only match on a segment private del packet gen=" + segGen;
+ waitApply(waitFor);
+ }
- if (segStates == null) {
- segStates = openSegmentStates(pool, infos);
- }
+ /** Returns true if this delGen is still running. */
+ public boolean stillRunning(long delGen) {
+ return finishedSegments.stillRunning(delGen);
+ }
- SegmentState segState = segStates[infosIDX];
+ public void finishedSegment(long delGen) {
+ finishedSegments.finishedSegment(delGen);
+ }
+
+ /** Called by indexing threads once they are fully done resolving all deletes for the provided
+ * delGen. We track the completed delGens and record the maximum delGen for which all prior
+ * delGens, inclusive, are completed, so that it's safe for doc values updates to apply and write. */
- // Lock order: IW -> BD -> RP
- assert pool.infoIsLive(info);
- int delCount = 0;
- final DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container();
+ public synchronized void finished(FrozenBufferedUpdates packet) {
+ // TODO: would be a bit more memory efficient to track this per-segment, so when each segment writes it writes all packets finished for
+ // it, rather than only recording here, across all segments. But, more complex code, and more CPU, and maybe not so much impact in
+ // practice?
- // first apply segment-private deletes/updates
- delCount += applyQueryDeletes(packet.queriesIterable(), segState);
- applyDocValuesUpdates(Arrays.asList(packet.numericDVUpdates), segState, dvUpdates);
- applyDocValuesUpdates(Arrays.asList(packet.binaryDVUpdates), segState, dvUpdates);
+ packet.applied.countDown();
- // ... then coalesced deletes/updates, so that if there is an update that appears in both, the coalesced updates (carried from
- // updates ahead of the segment-privates ones) win:
- if (coalescedUpdates != null) {
- delCount += applyQueryDeletes(coalescedUpdates.queriesIterable(), segState);
- applyDocValuesUpdatesList(coalescedUpdates.numericDVUpdates, segState, dvUpdates);
- applyDocValuesUpdatesList(coalescedUpdates.binaryDVUpdates, segState, dvUpdates);
- }
- if (dvUpdates.any()) {
- segState.rld.writeFieldUpdates(info.info.dir, dvUpdates);
- }
+ updates.remove(packet);
+ numTerms.addAndGet(-packet.numTermDeletes);
+ bytesUsed.addAndGet(-packet.bytesUsed);
- totDelCount += delCount;
+ finishedSegment(packet.delGen());
+ }
- /*
- * Since we are on a segment private del packet we must not
- * update the coalescedUpdates here! We can simply advance to the
- * next packet and seginfo.
- */
- delIDX--;
- infosIDX--;
+ /** All frozen packets up to and including this del gen are guaranteed to be finished. */
+ public long getCompletedDelGen() {
+ return finishedSegments.getCompletedDelGen();
+ }
- } else {
- if (coalescedUpdates != null) {
- if (segStates == null) {
- segStates = openSegmentStates(pool, infos);
- }
- SegmentState segState = segStates[infosIDX];
- // Lock order: IW -> BD -> RP
- assert pool.infoIsLive(info);
- int delCount = 0;
- delCount += applyQueryDeletes(coalescedUpdates.queriesIterable(), segState);
- DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container();
- applyDocValuesUpdatesList(coalescedUpdates.numericDVUpdates, segState, dvUpdates);
- applyDocValuesUpdatesList(coalescedUpdates.binaryDVUpdates, segState, dvUpdates);
- if (dvUpdates.any()) {
- segState.rld.writeFieldUpdates(info.info.dir, dvUpdates);
- }
+ /** Waits only for those in-flight packets that apply to these merge segments. This is
+ * called when a merge needs to finish and must ensure all deletes to the merging
+ * segments are resolved. */
+ public void waitApplyForMerge(List<SegmentCommitInfo> mergeInfos) throws IOException {
+ assert Thread.holdsLock(writer) == false;
- totDelCount += delCount;
- }
+ final long t0 = System.nanoTime();
- infosIDX--;
- }
- }
+ long maxDelGen = Long.MIN_VALUE;
+ for (SegmentCommitInfo info : mergeInfos) {
+ maxDelGen = Math.max(maxDelGen, info.getBufferedDeletesGen());
+ }
- // Now apply all term deletes:
- if (coalescedUpdates != null && coalescedUpdates.totalTermCount != 0) {
- if (segStates == null) {
- segStates = openSegmentStates(pool, infos);
+ Set<FrozenBufferedUpdates> waitFor = new HashSet<>();
+ synchronized (this) {
+ for (FrozenBufferedUpdates packet : updates) {
+ if (packet.delGen() <= maxDelGen) {
+ // We must wait for this packet before finishing the merge because its
+ // deletes apply to a subset of the segments being merged:
+ waitFor.add(packet);
}
- totTermVisitedCount += applyTermDeletes(coalescedUpdates, segStates);
- }
-
- assert checkDeleteStats();
-
- success = true;
-
- } finally {
- if (segStates != null) {
- result = closeSegmentStates(pool, segStates, success, gen);
}
}
- if (result == null) {
- result = new ApplyDeletesResult(false, gen, null);
- }
-
if (infoStream.isEnabled("BD")) {
- infoStream.message("BD",
- String.format(Locale.ROOT,
- "applyDeletes took %d msec for %d segments, %d newly deleted docs (query deletes), %d visited terms, allDeleted=%s",
- System.currentTimeMillis()-t0, infos.size(), totDelCount, totTermVisitedCount, result.allDeleted));
+ infoStream.message("BD", "waitApplyForMerge: " + waitFor.size() + " packets, " + mergeInfos.size() + " merging segments");
}
-
- return result;
+
+ waitApply(waitFor);
}
- private List<SegmentCommitInfo> sortByDelGen(List<SegmentCommitInfo> infos) {
- infos = new ArrayList<>(infos);
- // Smaller delGens come first:
- Collections.sort(infos, sortSegInfoByDelGen);
- return infos;
- }
+ private void waitApply(Set<FrozenBufferedUpdates> waitFor) throws IOException {
- synchronized long getNextGen() {
- return nextGen++;
- }
+ long startNS = System.nanoTime();
- // Lock order IW -> BD
- /* Removes any BufferedDeletes that we no longer need to
- * store because all segments in the index have had the
- * deletes applied. */
- public synchronized void prune(SegmentInfos segmentInfos) {
- assert checkDeleteStats();
- long minGen = Long.MAX_VALUE;
- for(SegmentCommitInfo info : segmentInfos) {
- minGen = Math.min(info.getBufferedDeletesGen(), minGen);
+ int packetCount = waitFor.size();
+
+ if (waitFor.isEmpty()) {
+ if (infoStream.isEnabled("BD")) {
+ infoStream.message("BD", "waitApply: no deletes to apply");
+ }
+ return;
}
if (infoStream.isEnabled("BD")) {
- infoStream.message("BD", "prune sis=" + segmentInfos + " minGen=" + minGen + " packetCount=" + updates.size());
+ infoStream.message("BD", "waitApply: " + waitFor.size() + " packets: " + waitFor);
}
- final int limit = updates.size();
- for(int delIDX=0;delIDX<limit;delIDX++) {
- if (updates.get(delIDX).delGen() >= minGen) {
- prune(delIDX);
- assert checkDeleteStats();
- return;
- }
+
+ long totalDelCount = 0;
+ for (FrozenBufferedUpdates packet : waitFor) {
+ // Frozen packets are now resolved, concurrently, by the indexing threads that
+ // create them, by adding a DocumentsWriter.ResolveUpdatesEvent to the events queue,
+ // but if we get here and the packet is not yet resolved, we resolve it now ourselves:
+ packet.apply(writer);
+ totalDelCount += packet.totalDelCount;
}
- // All deletes pruned
- prune(limit);
- assert !any();
- assert checkDeleteStats();
+ if (infoStream.isEnabled("BD")) {
+ infoStream.message("BD",
+ String.format(Locale.ROOT, "waitApply: done %d packets; totalDelCount=%d; totBytesUsed=%d; took %.2f msec",
+ packetCount,
+ totalDelCount,
+ bytesUsed.get(),
+ (System.nanoTime() - startNS) / 1000000.));
+ }
}
- private synchronized void prune(int count) {
- if (count > 0) {
- if (infoStream.isEnabled("BD")) {
- infoStream.message("BD", "pruneDeletes: prune " + count + " packets; " + (updates.size() - count) + " packets remain");
- }
- for(int delIDX=0;delIDX<count;delIDX++) {
- final FrozenBufferedUpdates packet = updates.get(delIDX);
- numTerms.addAndGet(-packet.numTermDeletes);
- assert numTerms.get() >= 0;
- bytesUsed.addAndGet(-packet.bytesUsed);
- assert bytesUsed.get() >= 0;
- }
- updates.subList(0, count).clear();
- }
+ synchronized long getNextGen() {
+ return nextGen++;
}
- static class SegmentState {
+ /** Holds all per-segment internal state used while resolving deletions. */
+ public static final class SegmentState {
final long delGen;
final ReadersAndUpdates rld;
final SegmentReader reader;
@@ -376,7 +271,6 @@ class BufferedUpdatesStream implements Accountable {
TermsEnum termsEnum;
PostingsEnum postingsEnum;
BytesRef term;
- boolean any;
public SegmentState(IndexWriter.ReaderPool pool, SegmentCommitInfo info) throws IOException {
rld = pool.get(info, true);
@@ -392,58 +286,54 @@ class BufferedUpdatesStream implements Accountable {
pool.release(rld);
}
}
- }
-
- /** Does a merge sort by current term across all segments. */
- static class SegmentQueue extends PriorityQueue<SegmentState> {
- public SegmentQueue(int size) {
- super(size);
- }
@Override
- protected boolean lessThan(SegmentState a, SegmentState b) {
- return a.term.compareTo(b.term) < 0;
+ public String toString() {
+ return "SegmentState(" + rld.info + ")";
}
}
/** Opens SegmentReader and inits SegmentState for each segment. */
- private SegmentState[] openSegmentStates(IndexWriter.ReaderPool pool, List<SegmentCommitInfo> infos) throws IOException {
- int numReaders = infos.size();
- SegmentState[] segStates = new SegmentState[numReaders];
+ public SegmentState[] openSegmentStates(IndexWriter.ReaderPool pool, List<SegmentCommitInfo> infos,
+ Set<SegmentCommitInfo> alreadySeenSegments, long delGen) throws IOException {
+ ensureOpen();
+
+ List<SegmentState> segStates = new ArrayList<>();
boolean success = false;
try {
- for(int i=0;i<numReaders;i++) {
- segStates[i] = new SegmentState(pool, infos.get(i));
+ for (SegmentCommitInfo info : infos) {
+ if (info.getBufferedDeletesGen() <= delGen && alreadySeenSegments.contains(info) == false) {
+ segStates.add(new SegmentState(pool, info));
+ alreadySeenSegments.add(info);
+ }
}
success = true;
} finally {
if (success == false) {
- for(int j=0;j<numReaders;j++) {
- if (segStates[j] != null) {
+ for(SegmentState segState : segStates) {
try {
- segStates[j].finish(pool);
+ segState.finish(pool);
} catch (Throwable th) {
// suppress so we keep throwing original exc
}
- }
}
}
}
-
- return segStates;
+
+ return segStates.toArray(new SegmentState[0]);
}
/** Close segment states previously opened with openSegmentStates. */
- private ApplyDeletesResult closeSegmentStates(IndexWriter.ReaderPool pool, SegmentState[] segStates, boolean success, long gen) throws IOException {
- int numReaders = segStates.length;
+ public ApplyDeletesResult closeSegmentStates(IndexWriter.ReaderPool pool, SegmentState[] segStates, boolean success) throws IOException {
+ int count = segStates.length;
Throwable firstExc = null;
List<SegmentCommitInfo> allDeleted = null;
long totDelCount = 0;
- for (int j=0;j<numReaders;j++) {
+
+ for (int j=0;j<count;j++) {
SegmentState segState = segStates[j];
if (success) {
totDelCount += segState.rld.getPendingDeleteCount() - segState.startDelCount;
- segState.reader.getSegmentInfo().setBufferedDeletesGen(gen);
int fullDelCount = segState.rld.info.getDelCount() + segState.rld.getPendingDeleteCount();
assert fullDelCount <= segState.rld.info.info.maxDoc();
if (fullDelCount == segState.rld.info.info.maxDoc()) {
@@ -469,288 +359,70 @@ class BufferedUpdatesStream implements Accountable {
}
if (infoStream.isEnabled("BD")) {
- infoStream.message("BD", "applyDeletes: " + totDelCount + " new deleted documents");
+ infoStream.message("BD", "closeSegmentStates: " + totDelCount + " new deleted documents; pool " + updates.size() + " packets; bytesUsed=" + pool.ramBytesUsed());
}
- return new ApplyDeletesResult(totDelCount > 0, gen, allDeleted);
+ return new ApplyDeletesResult(totDelCount > 0, allDeleted);
}
- /** Merge sorts the deleted terms and all segments to resolve terms to docIDs for deletion. */
- private synchronized long applyTermDeletes(CoalescedUpdates updates, SegmentState[] segStates) throws IOException {
-
- long startNS = System.nanoTime();
-
- int numReaders = segStates.length;
-
- long delTermVisitedCount = 0;
- long segTermVisitedCount = 0;
-
- FieldTermIterator iter = updates.termIterator();
-
- String field = null;
- SegmentQueue queue = null;
-
- BytesRef term;
-
- while ((term = iter.next()) != null) {
-
- if (iter.field() != field) {
- // field changed
- field = iter.field();
-
- queue = new SegmentQueue(numReaders);
-
- long segTermCount = 0;
- for (SegmentState state : segStates) {
- Terms terms = state.reader.terms(field);
- if (terms != null) {
- segTermCount += terms.size();
- state.termsEnum = terms.iterator();
- state.term = state.termsEnum.next();
- if (state.term != null) {
- queue.add(state);
- }
- }
- }
-
- assert checkDeleteTerm(null);
- }
-
- assert checkDeleteTerm(term);
-
- delTermVisitedCount++;
-
- long delGen = iter.delGen();
-
- while (queue.size() != 0) {
-
- // Get next term merged across all segments
- SegmentState state = queue.top();
- segTermVisitedCount++;
-
- int cmp = term.compareTo(state.term);
-
- if (cmp < 0) {
- break;
- } else if (cmp == 0) {
- // fall through
- } else {
- TermsEnum.SeekStatus status = state.termsEnum.seekCeil(term);
- if (status == TermsEnum.SeekStatus.FOUND) {
- // fallthrough
- } else {
- if (status == TermsEnum.SeekStatus.NOT_FOUND) {
- state.term = state.termsEnum.term();
- queue.updateTop();
- } else {
- // No more terms in this segment
- queue.pop();
- }
-
- continue;
- }
- }
-
- assert state.delGen != delGen;
-
- if (state.delGen < delGen) {
+ // only for assert
+ private boolean checkDeleteStats() {
+ int numTerms2 = 0;
+ long bytesUsed2 = 0;
+ for(FrozenBufferedUpdates packet : updates) {
+ numTerms2 += packet.numTermDeletes;
+ bytesUsed2 += packet.bytesUsed;
+ }
+ assert numTerms2 == numTerms.get(): "numTerms2=" + numTerms2 + " vs " + numTerms.get();
+ assert bytesUsed2 == bytesUsed.get(): "bytesUsed2=" + bytesUsed2 + " vs " + bytesUsed;
+ return true;
+ }
- // we don't need term frequencies for this
- final Bits acceptDocs = state.rld.getLiveDocs();
- state.postingsEnum = state.termsEnum.postings(state.postingsEnum, PostingsEnum.NONE);
+ /** Tracks the contiguous range of packets that have finished resolving. We need this because the packets
+ * are concurrently resolved, and we can only write to disk the contiguous completed
+ * packets. */
+ private static class FinishedSegments {
- assert state.postingsEnum != null;
+ /** Largest del gen, inclusive, for which all prior packets have finished applying. */
+ private long completedDelGen;
- while (true) {
- final int docID = state.postingsEnum.nextDoc();
- if (docID == DocIdSetIterator.NO_MORE_DOCS) {
- break;
- }
- if (acceptDocs != null && acceptDocs.get(docID) == false) {
- continue;
- }
- if (!state.any) {
- state.rld.initWritableLiveDocs();
- state.any = true;
- }
+ /** This lets us track the "holes" in the current frontier of applying del
+ * gens; once the holes are filled in we can advance completedDelGen. */
+ private final Set<Long> finishedDelGens = new HashSet<>();
- // NOTE: there is no limit check on the docID
- // when deleting by Term (unlike by Query)
- // because on flush we apply all Term deletes to
- // each segment. So all Term deleting here is
- // against prior segments:
- state.rld.delete(docID);
- }
- }
+ private final InfoStream infoStream;
- state.term = state.termsEnum.next();
- if (state.term == null) {
- queue.pop();
- } else {
- queue.updateTop();
- }
- }
+ public FinishedSegments(InfoStream infoStream) {
+ this.infoStream = infoStream;
}
- if (infoStream.isEnabled("BD")) {
- infoStream.message("BD",
- String.format(Locale.ROOT, "applyTermDeletes took %.1f msec for %d segments and %d packets; %d del terms visited; %d seg terms visited",
- (System.nanoTime()-startNS)/1000000.,
- numReaders,
- updates.terms.size(),
- delTermVisitedCount, segTermVisitedCount));
+ public synchronized void clear() {
+ finishedDelGens.clear();
+ completedDelGen = 0;
}
- return delTermVisitedCount;
- }
+ public synchronized boolean stillRunning(long delGen) {
+ return delGen > completedDelGen && finishedDelGens.contains(delGen) == false;
+ }
- private synchronized void applyDocValuesUpdatesList(List<List<DocValuesUpdate>> updates,
- SegmentState segState, DocValuesFieldUpdates.Container dvUpdatesContainer) throws IOException {
- // we walk backwards through the segments, appending deletion packets to the coalesced updates, so we must apply the packets in reverse
- // so that newer packets override older ones:
- for(int idx=updates.size()-1;idx>=0;idx--) {
- applyDocValuesUpdates(updates.get(idx), segState, dvUpdatesContainer);
+ public synchronized long getCompletedDelGen() {
+ return completedDelGen;
}
- }
- // DocValues updates
- private synchronized void applyDocValuesUpdates(List<DocValuesUpdate> updates,
- SegmentState segState, DocValuesFieldUpdates.Container dvUpdatesContainer) throws IOException {
-
- // TODO: we can process the updates per DV field, from last to first so that
- // if multiple terms affect same document for the same field, we add an update
- // only once (that of the last term). To do that, we can keep a bitset which
- // marks which documents have already been updated. So e.g. if term T1
- // updates doc 7, and then we process term T2 and it updates doc 7 as well,
- // we don't apply the update since we know T1 came last and therefore wins
- // the update.
- // We can also use that bitset as 'liveDocs' to pass to TermEnum.docs(), so
- // that these documents aren't even returned.
-
- String currentField = null;
- TermsEnum termsEnum = null;
- PostingsEnum postingsEnum = null;
-
- for (DocValuesUpdate update : updates) {
- Term term = update.term;
- int limit = update.docIDUpto;
-
- // TODO: we traverse the terms in update order (not term order) so that we
- // apply the updates in the correct order, i.e. if two terms udpate the
- // same document, the last one that came in wins, irrespective of the
- // terms lexical order.
- // we can apply the updates in terms order if we keep an updatesGen (and
- // increment it with every update) and attach it to each NumericUpdate. Note
- // that we cannot rely only on docIDUpto because an app may send two updates
- // which will get same docIDUpto, yet will still need to respect the order
- // those updates arrived.
-
- if (!term.field().equals(currentField)) {
- // if we change the code to process updates in terms order, enable this assert
-// assert currentField == null || currentField.compareTo(term.field()) < 0;
- currentField = term.field();
- Terms terms = segState.reader.terms(currentField);
- if (terms != null) {
- termsEnum = terms.iterator();
+ public synchronized void finishedSegment(long delGen) {
+ finishedDelGens.add(delGen);
+ while (true) {
+ if (finishedDelGens.contains(completedDelGen + 1)) {
+ finishedDelGens.remove(completedDelGen + 1);
+ completedDelGen++;
} else {
- termsEnum = null;
- }
- }
-
- if (termsEnum == null) {
- // no terms in this field
- continue;
- }
-
- if (termsEnum.seekExact(term.bytes())) {
- // we don't need term frequencies for this
- final Bits acceptDocs = segState.rld.getLiveDocs();
- postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE);
-
- DocValuesFieldUpdates dvUpdates = dvUpdatesContainer.getUpdates(update.field, update.type);
- if (dvUpdates == null) {
- dvUpdates = dvUpdatesContainer.newUpdates(update.field, update.type, segState.reader.maxDoc());
- }
- int doc;
- while ((doc = postingsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
- if (doc >= limit) {
- break; // no more docs that can be updated for this term
- }
- if (acceptDocs != null && acceptDocs.get(doc) == false) {
- continue;
- }
- dvUpdates.add(doc, update.value);
+ break;
}
}
- }
- }
-
- public static class QueryAndLimit {
- public final Query query;
- public final int limit;
- public QueryAndLimit(Query query, int limit) {
- this.query = query;
- this.limit = limit;
- }
- }
- // Delete by query
- private static long applyQueryDeletes(Iterable<QueryAndLimit> queriesIter, SegmentState segState) throws IOException {
- long delCount = 0;
- final LeafReaderContext readerContext = segState.reader.getContext();
- for (QueryAndLimit ent : queriesIter) {
- Query query = ent.query;
- int limit = ent.limit;
- final IndexSearcher searcher = new IndexSearcher(readerContext.reader());
- searcher.setQueryCache(null);
- final Weight weight = searcher.createNormalizedWeight(query, false);
- final Scorer scorer = weight.scorer(readerContext);
- if (scorer != null) {
- final DocIdSetIterator it = scorer.iterator();
- final Bits liveDocs = readerContext.reader().getLiveDocs();
- while (true) {
- int doc = it.nextDoc();
- if (doc >= limit) {
- break;
- }
- if (liveDocs != null && liveDocs.get(doc) == false) {
- continue;
- }
-
- if (!segState.any) {
- segState.rld.initWritableLiveDocs();
- segState.any = true;
- }
- if (segState.rld.delete(doc)) {
- delCount++;
- }
- }
+ if (infoStream.isEnabled("BD")) {
+ infoStream.message("BD", "finished packet delGen=" + delGen + " now completedDelGen=" + completedDelGen);
}
}
-
- return delCount;
- }
-
- // used only by assert
- private boolean checkDeleteTerm(BytesRef term) {
- if (term != null) {
- assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) >= 0: "lastTerm=" + lastDeleteTerm + " vs term=" + term;
- }
- // TODO: we re-use term now in our merged iterable, but we shouldn't clone, instead copy for this assert
- lastDeleteTerm = term == null ? null : BytesRef.deepCopyOf(term);
- return true;
- }
-
- // only for assert
- private boolean checkDeleteStats() {
- int numTerms2 = 0;
- long bytesUsed2 = 0;
- for(FrozenBufferedUpdates packet : updates) {
- numTerms2 += packet.numTermDeletes;
- bytesUsed2 += packet.bytesUsed;
- }
- assert numTerms2 == numTerms.get(): "numTerms2=" + numTerms2 + " vs " + numTerms.get();
- assert bytesUsed2 == bytesUsed.get(): "bytesUsed2=" + bytesUsed2 + " vs " + bytesUsed;
- return true;
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java
deleted file mode 100644
index bf92ac1..0000000
--- a/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.lucene.index;
-
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.lucene.index.BufferedUpdatesStream.QueryAndLimit;
-import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
-import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.util.BytesRef;
-
-class CoalescedUpdates {
- final Map<Query,Integer> queries = new HashMap<>();
- final List<PrefixCodedTerms> terms = new ArrayList<>();
- final List<List<DocValuesUpdate>> numericDVUpdates = new ArrayList<>();
- final List<List<DocValuesUpdate>> binaryDVUpdates = new ArrayList<>();
- long totalTermCount;
-
- @Override
- public String toString() {
- // note: we could add/collect more debugging information
- return "CoalescedUpdates(termSets=" + terms.size()
- + ",totalTermCount=" + totalTermCount
- + ",queries=" + queries.size() + ",numericDVUpdates=" + numericDVUpdates.size()
- + ",binaryDVUpdates=" + binaryDVUpdates.size() + ")";
- }
-
- void update(FrozenBufferedUpdates in) {
- totalTermCount += in.terms.size();
- terms.add(in.terms);
-
- for (int queryIdx = 0; queryIdx < in.queries.length; queryIdx++) {
- final Query query = in.queries[queryIdx];
- queries.put(query, BufferedUpdates.MAX_INT);
- }
-
- List<DocValuesUpdate> numericPacket = new ArrayList<>();
- numericDVUpdates.add(numericPacket);
- for (NumericDocValuesUpdate nu : in.numericDVUpdates) {
- NumericDocValuesUpdate clone = new NumericDocValuesUpdate(nu.term, nu.field, (Long) nu.value);
- clone.docIDUpto = Integer.MAX_VALUE;
- numericPacket.add(clone);
- }
-
- List<DocValuesUpdate> binaryPacket = new ArrayList<>();
- binaryDVUpdates.add(binaryPacket);
- for (BinaryDocValuesUpdate bu : in.binaryDVUpdates) {
- BinaryDocValuesUpdate clone = new BinaryDocValuesUpdate(bu.term, bu.field, (BytesRef) bu.value);
- clone.docIDUpto = Integer.MAX_VALUE;
- binaryPacket.add(clone);
- }
- }
-
- public FieldTermIterator termIterator() {
- if (terms.size() == 1) {
- return terms.get(0).iterator();
- } else {
- return new MergedPrefixCodedTermsIterator(terms);
- }
- }
-
- public Iterable<QueryAndLimit> queriesIterable() {
- return new Iterable<QueryAndLimit>() {
-
- @Override
- public Iterator<QueryAndLimit> iterator() {
- return new Iterator<QueryAndLimit>() {
- private final Iterator<Map.Entry<Query,Integer>> iter = queries.entrySet().iterator();
-
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override
- public QueryAndLimit next() {
- final Map.Entry<Query,Integer> ent = iter.next();
- return new QueryAndLimit(ent.getKey(), ent.getValue());
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/DocValuesFieldUpdates.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocValuesFieldUpdates.java b/lucene/core/src/java/org/apache/lucene/index/DocValuesFieldUpdates.java
index 528d4bf..a54bbe9 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocValuesFieldUpdates.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocValuesFieldUpdates.java
@@ -16,15 +16,13 @@
*/
package org.apache.lucene.index;
-
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.lucene.search.DocIdSetIterator;
-import org.apache.lucene.util.packed.PagedGrowableWriter;
+import org.apache.lucene.util.PriorityQueue;
+
+import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
/**
- * Holds updates of a single DocValues field, for a set of documents.
+ * Holds updates of a single DocValues field, for a set of documents within one segment.
*
* @lucene.experimental
*/
@@ -54,100 +52,114 @@ abstract class DocValuesFieldUpdates {
* {@code null} value means that it was unset for this document.
*/
abstract Object value();
-
- /**
- * Reset the iterator's state. Should be called before {@link #nextDoc()}
- * and {@link #value()}.
- */
- abstract void reset();
-
+
+ /** Returns delGen for this packet. */
+ abstract long delGen();
}
- static class Container {
-
- final Map<String,NumericDocValuesFieldUpdates> numericDVUpdates = new HashMap<>();
- final Map<String,BinaryDocValuesFieldUpdates> binaryDVUpdates = new HashMap<>();
-
- boolean any() {
- for (NumericDocValuesFieldUpdates updates : numericDVUpdates.values()) {
- if (updates.any()) {
- return true;
- }
- }
- for (BinaryDocValuesFieldUpdates updates : binaryDVUpdates.values()) {
- if (updates.any()) {
- return true;
+ /** Merge-sorts multiple iterators, one per delGen, favoring the largest delGen that has updates for a given docID. */
+ public static Iterator mergedIterator(Iterator[] subs) {
+
+ if (subs.length == 1) {
+ return subs[0];
+ }
+
+ PriorityQueue<Iterator> queue = new PriorityQueue<Iterator>(subs.length) {
+ @Override
+ protected boolean lessThan(Iterator a, Iterator b) {
+ // sort by smaller docID
+ int cmp = Integer.compare(a.doc(), b.doc());
+ if (cmp == 0) {
+ // then by larger delGen
+ cmp = Long.compare(b.delGen(), a.delGen());
+
+ // delGens are unique across our subs:
+ assert cmp != 0;
+ }
+
+ return cmp < 0;
}
+ };
+
+ for (Iterator sub : subs) {
+ if (sub.nextDoc() != NO_MORE_DOCS) {
+ queue.add(sub);
}
- return false;
}
-
- int size() {
- return numericDVUpdates.size() + binaryDVUpdates.size();
+
+ if (queue.size() == 0) {
+ return null;
}
-
- long ramBytesPerDoc() {
- long ramBytesPerDoc = 0;
- for (NumericDocValuesFieldUpdates updates : numericDVUpdates.values()) {
- ramBytesPerDoc += updates.ramBytesPerDoc();
+
+ return new Iterator() {
+ private int doc;
+
+ private boolean first = true;
+
+ @Override
+ public int nextDoc() {
+ // TODO: can we do away with this first boolean?
+ if (first == false) {
+ // Advance all sub iterators past current doc
+ while (true) {
+ if (queue.size() == 0) {
+ doc = NO_MORE_DOCS;
+ break;
+ }
+ int newDoc = queue.top().doc();
+ if (newDoc != doc) {
+ assert newDoc > doc: "doc=" + doc + " newDoc=" + newDoc;
+ doc = newDoc;
+ break;
+ }
+ if (queue.top().nextDoc() == NO_MORE_DOCS) {
+ queue.pop();
+ } else {
+ queue.updateTop();
+ }
+ }
+ } else {
+ doc = queue.top().doc();
+ first = false;
+ }
+ return doc;
}
- for (BinaryDocValuesFieldUpdates updates : binaryDVUpdates.values()) {
- ramBytesPerDoc += updates.ramBytesPerDoc();
+
+ @Override
+ public int doc() {
+ return doc;
}
- return ramBytesPerDoc;
- }
-
- DocValuesFieldUpdates getUpdates(String field, DocValuesType type) {
- switch (type) {
- case NUMERIC:
- return numericDVUpdates.get(field);
- case BINARY:
- return binaryDVUpdates.get(field);
- default:
- throw new IllegalArgumentException("unsupported type: " + type);
+
+ @Override
+ public Object value() {
+ return queue.top().value();
}
- }
-
- DocValuesFieldUpdates newUpdates(String field, DocValuesType type, int maxDoc) {
- switch (type) {
- case NUMERIC:
- assert numericDVUpdates.get(field) == null;
- NumericDocValuesFieldUpdates numericUpdates = new NumericDocValuesFieldUpdates(field, maxDoc);
- numericDVUpdates.put(field, numericUpdates);
- return numericUpdates;
- case BINARY:
- assert binaryDVUpdates.get(field) == null;
- BinaryDocValuesFieldUpdates binaryUpdates = new BinaryDocValuesFieldUpdates(field, maxDoc);
- binaryDVUpdates.put(field, binaryUpdates);
- return binaryUpdates;
- default:
- throw new IllegalArgumentException("unsupported type: " + type);
+
+ @Override
+ public long delGen() {
+ throw new UnsupportedOperationException();
}
- }
-
- @Override
- public String toString() {
- return "numericDVUpdates=" + numericDVUpdates + " binaryDVUpdates=" + binaryDVUpdates;
- }
+ };
}
-
+
final String field;
final DocValuesType type;
-
- protected DocValuesFieldUpdates(String field, DocValuesType type) {
+ final long delGen;
+ protected boolean finished;
+ protected final int maxDoc;
+
+ protected DocValuesFieldUpdates(int maxDoc, long delGen, String field, DocValuesType type) {
+ this.maxDoc = maxDoc;
+ this.delGen = delGen;
this.field = field;
if (type == null) {
throw new NullPointerException("DocValuesType must not be null");
}
this.type = type;
}
-
- /**
- * Returns the estimated capacity of a {@link PagedGrowableWriter} given the
- * actual number of stored elements.
- */
- protected static int estimateCapacity(int size) {
- return (int) Math.ceil((double) size / PAGE_SIZE) * PAGE_SIZE;
+
+ public boolean getFinished() {
+ return finished;
}
/**
@@ -160,19 +172,17 @@ abstract class DocValuesFieldUpdates {
* Returns an {@link Iterator} over the updated documents and their
* values.
*/
+ // TODO: also use this for merging, instead of having to write through to disk first
public abstract Iterator iterator();
-
- /**
- * Merge with another {@link DocValuesFieldUpdates}. This is called for a
- * segment which received updates while it was being merged. The given updates
- * should override whatever updates are in that instance.
- */
- public abstract void merge(DocValuesFieldUpdates other);
+ /** Freezes internal data structures and sorts updates by docID for efficient iteration. */
+ public abstract void finish();
+
/** Returns true if this instance contains any updates. */
public abstract boolean any();
- /** Returns approximate RAM bytes used per document. */
- public abstract long ramBytesPerDoc();
+ /** Returns approximate RAM bytes used. */
+ public abstract long ramBytesUsed();
+ public abstract int size();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java b/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java
index 1c85f33..a66f930 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java
@@ -85,7 +85,6 @@ abstract class DocValuesUpdate {
long valueSizeInBytes() {
return RAW_VALUE_SIZE_IN_BYTES + ((BytesRef) value).bytes.length;
}
-
}
/** An in-place update to a numeric DocValues field */
@@ -99,7 +98,5 @@ abstract class DocValuesUpdate {
long valueSizeInBytes() {
return Long.BYTES;
}
-
}
-
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index 2807517..27e28c0 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -107,7 +107,7 @@ final class DocumentsWriter implements Closeable, Accountable {
private final AtomicInteger numDocsInRAM = new AtomicInteger(0);
// TODO: cut over to BytesRefHash in BufferedDeletes
- volatile DocumentsWriterDeleteQueue deleteQueue = new DocumentsWriterDeleteQueue();
+ volatile DocumentsWriterDeleteQueue deleteQueue;
private final DocumentsWriterFlushQueue ticketQueue = new DocumentsWriterFlushQueue();
/*
* we preserve changes during a full flush since IW might not checkout before
@@ -129,6 +129,7 @@ final class DocumentsWriter implements Closeable, Accountable {
this.directory = directory;
this.config = config;
this.infoStream = config.getInfoStream();
+ this.deleteQueue = new DocumentsWriterDeleteQueue(infoStream);
this.perThreadPool = config.getIndexerThreadPool();
flushPolicy = config.getFlushPolicy();
this.writer = writer;
@@ -141,10 +142,10 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
long seqNo = deleteQueue.addDelete(queries);
flushControl.doOnDelete();
+ lastSeqNo = Math.max(lastSeqNo, seqNo);
if (applyAllDeletes(deleteQueue)) {
seqNo = -seqNo;
}
- lastSeqNo = Math.max(lastSeqNo, seqNo);
return seqNo;
}
@@ -160,10 +161,10 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
long seqNo = deleteQueue.addDelete(terms);
flushControl.doOnDelete();
+ lastSeqNo = Math.max(lastSeqNo, seqNo);
if (applyAllDeletes(deleteQueue)) {
seqNo = -seqNo;
}
- lastSeqNo = Math.max(lastSeqNo, seqNo);
return seqNo;
}
@@ -171,20 +172,21 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
long seqNo = deleteQueue.addDocValuesUpdates(updates);
flushControl.doOnDelete();
+ lastSeqNo = Math.max(lastSeqNo, seqNo);
if (applyAllDeletes(deleteQueue)) {
seqNo = -seqNo;
}
- lastSeqNo = Math.max(lastSeqNo, seqNo);
return seqNo;
}
DocumentsWriterDeleteQueue currentDeleteSession() {
return deleteQueue;
}
-
+
+ /** If buffered deletes are using too much heap, resolve them and write disk and return true. */
private boolean applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
if (flushControl.getAndResetApplyAllDeletes()) {
- if (deleteQueue != null && !flushControl.isFullFlush()) {
+ if (deleteQueue != null) {
ticketQueue.addDeletes(deleteQueue);
}
putEvent(ApplyDeletesEvent.INSTANCE); // apply deletes event forces a purge
@@ -200,7 +202,6 @@ final class DocumentsWriter implements Closeable, Accountable {
return ticketQueue.tryPurge(writer);
}
}
-
/** Returns how many docs are currently buffered in RAM. */
int getNumDocs() {
@@ -246,11 +247,13 @@ final class DocumentsWriter implements Closeable, Accountable {
}
/** Returns how many documents were aborted. */
- synchronized long lockAndAbortAll(IndexWriter indexWriter) {
+ synchronized long lockAndAbortAll(IndexWriter indexWriter) throws IOException {
assert indexWriter.holdsFullFlushLock();
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "lockAndAbortAll");
}
+ // Make sure we move all pending tickets into the flush queue:
+ ticketQueue.forcePurge(indexWriter);
long abortedDocCount = 0;
boolean success = false;
try {
@@ -578,9 +581,7 @@ final class DocumentsWriter implements Closeable, Accountable {
flushingDWPT = flushControl.nextPendingFlush();
}
- if (hasEvents) {
- putEvent(MergePendingEvent.INSTANCE);
- }
+
// If deletes alone are consuming > 1/2 our RAM
// buffer, force them all to apply now. This is to
// prevent too-frequent flushing of a long tail of
@@ -589,9 +590,9 @@ final class DocumentsWriter implements Closeable, Accountable {
if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) {
hasEvents = true;
- if (!this.applyAllDeletes(deleteQueue)) {
+ if (applyAllDeletes(deleteQueue) == false) {
if (infoStream.isEnabled("DW")) {
- infoStream.message("DW", String.format(Locale.ROOT, "force apply deletes bytesUsed=%.1f MB vs ramBuffer=%.1f MB",
+ infoStream.message("DW", String.format(Locale.ROOT, "force apply deletes after flush bytesUsed=%.1f MB vs ramBuffer=%.1f MB",
flushControl.getDeleteBytesUsed()/(1024.*1024.),
ramBufferSizeMB));
}
@@ -654,7 +655,7 @@ final class DocumentsWriter implements Closeable, Accountable {
}
// If a concurrent flush is still in flight wait for it
flushControl.waitForFlush();
- if (!anythingFlushed && flushingDeleteQueue.anyChanges()) { // apply deletes if we did not flush any document
+ if (anythingFlushed == false && flushingDeleteQueue.anyChanges()) { // apply deletes if we did not flush any document
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen global deletes");
}
@@ -695,7 +696,7 @@ final class DocumentsWriter implements Closeable, Accountable {
return config;
}
- private void putEvent(Event event) {
+ void putEvent(Event event) {
events.add(event);
}
@@ -704,6 +705,25 @@ final class DocumentsWriter implements Closeable, Accountable {
return flushControl.ramBytesUsed();
}
+ static final class ResolveUpdatesEvent implements Event {
+
+ private final FrozenBufferedUpdates packet;
+
+ ResolveUpdatesEvent(FrozenBufferedUpdates packet) {
+ this.packet = packet;
+ }
+
+ @Override
+ public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
+ try {
+ packet.apply(writer);
+ } catch (Throwable t) {
+ writer.tragicEvent(t, "applyUpdatesPacket");
+ }
+ writer.flushDeletesCount.incrementAndGet();
+ }
+ }
+
static final class ApplyDeletesEvent implements Event {
static final Event INSTANCE = new ApplyDeletesEvent();
private int instCount = 0;
@@ -717,21 +737,7 @@ final class DocumentsWriter implements Closeable, Accountable {
writer.applyDeletesAndPurge(true); // we always purge!
}
}
-
- static final class MergePendingEvent implements Event {
- static final Event INSTANCE = new MergePendingEvent();
- private int instCount = 0;
- private MergePendingEvent() {
- assert instCount == 0;
- instCount++;
- }
-
- @Override
- public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
- writer.doAfterSegmentFlushed(triggerMerge, forcePurge);
- }
- }
-
+
static final class ForcedPurgeEvent implements Event {
static final Event INSTANCE = new ForcedPurgeEvent();
private int instCount = 0;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
index db0e571..c4a0845 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
@@ -16,6 +16,7 @@
*/
package org.apache.lucene.index;
+import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
@@ -25,6 +26,7 @@ import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.InfoStream;
/**
* {@link DocumentsWriterDeleteQueue} is a non-blocking linked pending deletes
@@ -85,19 +87,22 @@ final class DocumentsWriterDeleteQueue implements Accountable {
/** Generates the sequence number that IW returns to callers changing the index, showing the effective serialization of all operations. */
private final AtomicLong nextSeqNo;
+ private final InfoStream infoStream;
+
// for asserts
long maxSeqNo = Long.MAX_VALUE;
- DocumentsWriterDeleteQueue() {
+ DocumentsWriterDeleteQueue(InfoStream infoStream) {
// seqNo must start at 1 because some APIs negate this to also return a boolean
- this(0, 1);
+ this(infoStream, 0, 1);
}
- DocumentsWriterDeleteQueue(long generation, long startSeqNo) {
- this(new BufferedUpdates("global"), generation, startSeqNo);
+ DocumentsWriterDeleteQueue(InfoStream infoStream, long generation, long startSeqNo) {
+ this(infoStream, new BufferedUpdates("global"), generation, startSeqNo);
}
- DocumentsWriterDeleteQueue(BufferedUpdates globalBufferedUpdates, long generation, long startSeqNo) {
+ DocumentsWriterDeleteQueue(InfoStream infoStream, BufferedUpdates globalBufferedUpdates, long generation, long startSeqNo) {
+ this.infoStream = infoStream;
this.globalBufferedUpdates = globalBufferedUpdates;
this.generation = generation;
this.nextSeqNo = new AtomicLong(startSeqNo);
@@ -189,7 +194,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
}
}
- FrozenBufferedUpdates freezeGlobalBuffer(DeleteSlice callerSlice) {
+ FrozenBufferedUpdates freezeGlobalBuffer(DeleteSlice callerSlice) throws IOException {
globalBufferLock.lock();
/*
* Here we freeze the global buffer so we need to lock it, apply all
@@ -209,9 +214,13 @@ final class DocumentsWriterDeleteQueue implements Accountable {
globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT);
}
- final FrozenBufferedUpdates packet = new FrozenBufferedUpdates(globalBufferedUpdates, false);
- globalBufferedUpdates.clear();
- return packet;
+ if (globalBufferedUpdates.any()) {
+ final FrozenBufferedUpdates packet = new FrozenBufferedUpdates(infoStream, globalBufferedUpdates, null);
+ globalBufferedUpdates.clear();
+ return packet;
+ } else {
+ return null;
+ }
} finally {
globalBufferLock.unlock();
}
@@ -426,7 +435,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
globalBufferLock.lock();
try {
forceApplyGlobalSlice();
- return globalBufferedUpdates.terms.size();
+ return globalBufferedUpdates.deleteTerms.size();
} finally {
globalBufferLock.unlock();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
index a5b4b7c..047fb9c 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
@@ -424,22 +424,16 @@ final class DocumentsWriterFlushControl implements Accountable {
};
}
-
-
synchronized void doOnDelete() {
// pass null this is a global delete no update
flushPolicy.onDelete(this, null);
}
- /**
- * Returns the number of delete terms in the global pool
- */
- public int getNumGlobalTermDeletes() {
- return documentsWriter.deleteQueue.numGlobalTermDeletes() + bufferedUpdatesStream.numTerms();
- }
-
+ /** Returns heap bytes currently consumed by buffered deletes/updates that would be
+ * freed if we pushed all deletes. This does not include bytes consumed by
+ * already pushed delete/update packets. */
public long getDeleteBytesUsed() {
- return documentsWriter.deleteQueue.ramBytesUsed() + bufferedUpdatesStream.ramBytesUsed();
+ return documentsWriter.deleteQueue.ramBytesUsed();
}
@Override
@@ -501,7 +495,7 @@ final class DocumentsWriterFlushControl implements Accountable {
seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.getActiveThreadStateCount() + 2;
flushingQueue.maxSeqNo = seqNo+1;
- DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1, seqNo+1);
+ DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(infoStream, flushingQueue.generation+1, seqNo+1);
documentsWriter.deleteQueue = newQueue;
}
@@ -648,8 +642,7 @@ final class DocumentsWriterFlushControl implements Accountable {
}
for (BlockedFlush blockedFlush : blockedFlushes) {
try {
- flushingWriters
- .put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes));
+ flushingWriters.put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes));
documentsWriter.subtractFlushedNumDocs(blockedFlush.dwpt.getNumDocsInRAM());
blockedFlush.dwpt.abort();
} catch (Throwable ex) {
@@ -720,6 +713,4 @@ final class DocumentsWriterFlushControl implements Accountable {
public InfoStream getInfoStream() {
return infoStream;
}
-
-
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java
index 2c62487..df1b38c 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java
@@ -60,7 +60,7 @@ class DocumentsWriterFlushQueue {
assert numTickets >= 0;
}
- synchronized SegmentFlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) {
+ synchronized SegmentFlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) throws IOException {
// Each flush is assigned a ticket in the order they acquire the ticketQueue
// lock
incTickets();
@@ -168,11 +168,11 @@ class DocumentsWriterFlushQueue {
protected boolean published = false;
protected FlushTicket(FrozenBufferedUpdates frozenUpdates) {
- assert frozenUpdates != null;
this.frozenUpdates = frozenUpdates;
}
protected abstract void publish(IndexWriter writer) throws IOException;
+
protected abstract boolean canPublish();
/**
@@ -186,33 +186,31 @@ class DocumentsWriterFlushQueue {
assert newSegment != null;
assert newSegment.segmentInfo != null;
final FrozenBufferedUpdates segmentUpdates = newSegment.segmentUpdates;
- //System.out.println("FLUSH: " + newSegment.segmentInfo.info.name);
if (indexWriter.infoStream.isEnabled("DW")) {
- indexWriter.infoStream.message("DW", "publishFlushedSegment seg-private updates=" + segmentUpdates);
+ indexWriter.infoStream.message("DW", "publishFlushedSegment seg-private updates=" + segmentUpdates);
}
if (segmentUpdates != null && indexWriter.infoStream.isEnabled("DW")) {
- indexWriter.infoStream.message("DW", "flush: push buffered seg private updates: " + segmentUpdates);
+ indexWriter.infoStream.message("DW", "flush: push buffered seg private updates: " + segmentUpdates);
}
// now publish!
- indexWriter.publishFlushedSegment(newSegment.segmentInfo, segmentUpdates, globalPacket);
+ indexWriter.publishFlushedSegment(newSegment.segmentInfo, segmentUpdates, globalPacket, newSegment.sortMap);
}
protected final void finishFlush(IndexWriter indexWriter, FlushedSegment newSegment, FrozenBufferedUpdates bufferedUpdates)
throws IOException {
- // Finish the flushed segment and publish it to IndexWriter
- if (newSegment == null) {
- assert bufferedUpdates != null;
- if (bufferedUpdates != null && bufferedUpdates.any()) {
- indexWriter.publishFrozenUpdates(bufferedUpdates);
- if (indexWriter.infoStream.isEnabled("DW")) {
- indexWriter.infoStream.message("DW", "flush: push buffered updates: " + bufferedUpdates);
- }
+ // Finish the flushed segment and publish it to IndexWriter
+ if (newSegment == null) {
+ if (bufferedUpdates != null && bufferedUpdates.any()) {
+ indexWriter.publishFrozenUpdates(bufferedUpdates);
+ if (indexWriter.infoStream.isEnabled("DW")) {
+ indexWriter.infoStream.message("DW", "flush: push buffered updates: " + bufferedUpdates);
}
- } else {
- publishFlushedSegment(indexWriter, newSegment, bufferedUpdates);
}
+ } else {
+ publishFlushedSegment(indexWriter, newSegment, bufferedUpdates);
}
+ }
}
static final class GlobalDeletesTicket extends FlushTicket {
@@ -220,6 +218,7 @@ class DocumentsWriterFlushQueue {
protected GlobalDeletesTicket(FrozenBufferedUpdates frozenUpdates) {
super(frozenUpdates);
}
+
@Override
protected void publish(IndexWriter writer) throws IOException {
assert !published : "ticket was already publised - can not publish twice";
@@ -264,4 +263,4 @@ class DocumentsWriterFlushQueue {
return segment != null || failed;
}
}
-}
\ No newline at end of file
+}