You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2017/06/25 02:06:30 UTC
[08/47] lucene-solr:feature/autoscaling: LUCENE-7868: use multiple
threads to concurrently resolve deletes and DV udpates
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java b/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java
index d4dd4a4..630131e 100644
--- a/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java
+++ b/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java
@@ -18,11 +18,17 @@ package org.apache.lucene.index;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
-import java.util.Map;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
import java.util.Map.Entry;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.DocValuesConsumer;
@@ -36,6 +42,7 @@ import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.MutableBits;
// Used by IndexWriter to hold open SegmentReaders (for
@@ -76,8 +83,20 @@ class ReadersAndUpdates {
// That way, when the segment is done merging, IndexWriter can apply the
// updates on the merged segment too.
private boolean isMerging = false;
-
- private final Map<String,DocValuesFieldUpdates> mergingDVUpdates = new HashMap<>();
+
+ // Holds resolved (to docIDs) doc values updates that have not yet been
+ // written to the index
+ private final Map<String,List<DocValuesFieldUpdates>> pendingDVUpdates = new HashMap<>();
+
+ // Holds resolved (to docIDs) doc values updates that were resolved while
+ // this segment was being merged; at the end of the merge we carry over
+ // these updates (remapping their docIDs) to the newly merged segment
+ private final Map<String,List<DocValuesFieldUpdates>> mergingDVUpdates = new HashMap<>();
+
+ // Only set if there are doc values updates against this segment, and the index is sorted:
+ Sorter.DocMap sortMap;
+
+ public final AtomicLong ramBytesUsed = new AtomicLong();
public ReadersAndUpdates(IndexWriter writer, SegmentCommitInfo info) {
this.writer = writer;
@@ -100,12 +119,12 @@ class ReadersAndUpdates {
public void incRef() {
final int rc = refCount.incrementAndGet();
- assert rc > 1;
+ assert rc > 1: "seg=" + info;
}
public void decRef() {
final int rc = refCount.decrementAndGet();
- assert rc >= 0;
+ assert rc >= 0: "seg=" + info;
}
public int refCount() {
@@ -117,6 +136,52 @@ class ReadersAndUpdates {
public synchronized int getPendingDeleteCount() {
return pendingDeleteCount;
}
+
+ private synchronized boolean assertNoDupGen(List<DocValuesFieldUpdates> fieldUpdates, DocValuesFieldUpdates update) {
+ for (int i=0;i<fieldUpdates.size();i++) {
+ DocValuesFieldUpdates oldUpdate = fieldUpdates.get(i);
+ if (oldUpdate.delGen == update.delGen) {
+ throw new AssertionError("duplicate delGen=" + update.delGen + " for seg=" + info);
+ }
+ }
+ return true;
+ }
+
+ /** Adds a new resolved (meaning it maps docIDs to new values) doc values packet. We buffer these in RAM and write to disk when too much
+ * RAM is used or when a merge needs to kick off, or a commit/refresh. */
+ public synchronized void addDVUpdate(DocValuesFieldUpdates update) {
+ if (update.getFinished() == false) {
+ throw new IllegalArgumentException("call finish first");
+ }
+ List<DocValuesFieldUpdates> fieldUpdates = pendingDVUpdates.get(update.field);
+ if (fieldUpdates == null) {
+ fieldUpdates = new ArrayList<>();
+ pendingDVUpdates.put(update.field, fieldUpdates);
+ }
+
+ assert assertNoDupGen(fieldUpdates, update);
+
+ ramBytesUsed.addAndGet(update.ramBytesUsed());
+
+ fieldUpdates.add(update);
+
+ if (isMerging) {
+ fieldUpdates = mergingDVUpdates.get(update.field);
+ if (fieldUpdates == null) {
+ fieldUpdates = new ArrayList<>();
+ mergingDVUpdates.put(update.field, fieldUpdates);
+ }
+ fieldUpdates.add(update);
+ }
+ }
+
+ public synchronized long getNumDVUpdates() {
+ long count = 0;
+ for (List<DocValuesFieldUpdates> updates : pendingDVUpdates.values()) {
+ count += updates.size();
+ }
+ return count;
+ }
// Call only from assert!
public synchronized boolean verifyDocCounts() {
@@ -137,7 +202,7 @@ class ReadersAndUpdates {
}
/** Returns a {@link SegmentReader}. */
- public SegmentReader getReader(IOContext context) throws IOException {
+ public synchronized SegmentReader getReader(IOContext context) throws IOException {
if (reader == null) {
// We steal returned ref:
reader = new SegmentReader(info, writer.segmentInfos.getIndexCreatedVersionMajor(), context);
@@ -156,16 +221,15 @@ class ReadersAndUpdates {
sr.decRef();
}
- public synchronized boolean delete(int docID) {
+ public synchronized boolean delete(int docID) throws IOException {
+ initWritableLiveDocs();
assert liveDocs != null;
- assert Thread.holdsLock(writer);
assert docID >= 0 && docID < liveDocs.length() : "out of bounds: docid=" + docID + " liveDocsLength=" + liveDocs.length() + " seg=" + info.info.name + " maxDoc=" + info.info.maxDoc();
assert !liveDocsShared;
final boolean didDelete = liveDocs.get(docID);
if (didDelete) {
((MutableBits) liveDocs).clear(docID);
pendingDeleteCount++;
- //System.out.println(" new del seg=" + info + " docID=" + docID + " pendingDelCount=" + pendingDeleteCount + " totDelCount=" + (info.info.maxDoc()-liveDocs.count()));
}
return didDelete;
}
@@ -175,7 +239,6 @@ class ReadersAndUpdates {
// TODO: can we somehow use IOUtils here...? problem is
// we are calling .decRef not .close)...
if (reader != null) {
- //System.out.println(" pool.drop info=" + info + " rc=" + reader.getRefCount());
try {
reader.decRef();
} finally {
@@ -207,10 +270,8 @@ class ReadersAndUpdates {
}
}
- public synchronized void initWritableLiveDocs() throws IOException {
- assert Thread.holdsLock(writer);
+ private synchronized void initWritableLiveDocs() throws IOException {
assert info.info.maxDoc() > 0;
- //System.out.println("initWritableLivedocs seg=" + info + " liveDocs=" + liveDocs + " shared=" + shared);
if (liveDocsShared) {
// Copy on write: this means we've cloned a
// SegmentReader sharing the current liveDocs
@@ -218,7 +279,6 @@ class ReadersAndUpdates {
// change it:
LiveDocsFormat liveDocsFormat = info.info.getCodec().liveDocsFormat();
if (liveDocs == null) {
- //System.out.println("create BV seg=" + info);
liveDocs = liveDocsFormat.newLiveDocs(info.info.maxDoc());
} else {
liveDocs = liveDocsFormat.newLiveDocs(liveDocs);
@@ -228,21 +288,16 @@ class ReadersAndUpdates {
}
public synchronized Bits getLiveDocs() {
- assert Thread.holdsLock(writer);
return liveDocs;
}
public synchronized Bits getReadOnlyLiveDocs() {
- //System.out.println("getROLiveDocs seg=" + info);
- assert Thread.holdsLock(writer);
liveDocsShared = true;
- //if (liveDocs != null) {
- //System.out.println(" liveCount=" + liveDocs.count());
- //}
return liveDocs;
}
public synchronized void dropChanges() {
+ assert Thread.holdsLock(writer);
// Discard (don't save) changes when we are dropping
// the reader; this is used only on the sub-readers
// after a successful merge. If deletes had
@@ -258,8 +313,6 @@ class ReadersAndUpdates {
// _X_N updates files) to the directory; returns true if it wrote any file
// and false if there were no new deletes or updates to write:
public synchronized boolean writeLiveDocs(Directory dir) throws IOException {
- assert Thread.holdsLock(writer);
- //System.out.println("rld.writeLiveDocs seg=" + info + " pendingDelCount=" + pendingDeleteCount + " numericUpdates=" + numericUpdates);
if (pendingDeleteCount == 0) {
return false;
}
@@ -304,16 +357,43 @@ class ReadersAndUpdates {
}
@SuppressWarnings("synthetic-access")
- private void handleNumericDVUpdates(FieldInfos infos, Map<String,NumericDocValuesFieldUpdates> updates,
- Directory dir, DocValuesFormat dvFormat, final SegmentReader reader, Map<Integer,Set<String>> fieldFiles) throws IOException {
- for (Entry<String,NumericDocValuesFieldUpdates> e : updates.entrySet()) {
- final String field = e.getKey();
- final NumericDocValuesFieldUpdates fieldUpdates = e.getValue();
+ private synchronized void handleNumericDVUpdates(FieldInfos infos,
+ Directory dir, DocValuesFormat dvFormat, final SegmentReader reader,
+ Map<Integer,Set<String>> fieldFiles, long maxDelGen, InfoStream infoStream) throws IOException {
+
+ for (Entry<String,List<DocValuesFieldUpdates>> ent : pendingDVUpdates.entrySet()) {
+ final String field = ent.getKey();
+ final List<DocValuesFieldUpdates> updates = ent.getValue();
+ if (updates.get(0).type != DocValuesType.NUMERIC) {
+ continue;
+ }
+
+ final List<DocValuesFieldUpdates> updatesToApply = new ArrayList<>();
+ long bytes = 0;
+ for(DocValuesFieldUpdates update : updates) {
+ if (update.delGen <= maxDelGen) {
+ // safe to apply this one
+ bytes += update.ramBytesUsed();
+ updatesToApply.add(update);
+ }
+ }
+ if (updatesToApply.isEmpty()) {
+ // nothing to apply yet
+ continue;
+ }
+
+ if (infoStream.isEnabled("BD")) {
+ infoStream.message("BD", String.format(Locale.ROOT,
+ "now write %d pending numeric DV updates for field=%s, seg=%s, bytes=%.3f MB",
+ updatesToApply.size(),
+ field,
+ info,
+ bytes/1024./1024.));
+ }
final long nextDocValuesGen = info.getNextDocValuesGen();
final String segmentSuffix = Long.toString(nextDocValuesGen, Character.MAX_RADIX);
- final long estUpdatesSize = fieldUpdates.ramBytesPerDoc() * info.info.maxDoc();
- final IOContext updatesContext = new IOContext(new FlushInfo(info.info.maxDoc(), estUpdatesSize));
+ final IOContext updatesContext = new IOContext(new FlushInfo(info.info.maxDoc(), bytes));
final FieldInfo fieldInfo = infos.fieldInfo(field);
assert fieldInfo != null;
fieldInfo.setDocValuesGen(nextDocValuesGen);
@@ -330,11 +410,14 @@ class ReadersAndUpdates {
throw new IllegalArgumentException("wrong fieldInfo");
}
final int maxDoc = reader.maxDoc();
+ DocValuesFieldUpdates.Iterator[] subs = new DocValuesFieldUpdates.Iterator[updatesToApply.size()];
+ for(int i=0;i<subs.length;i++) {
+ subs[i] = updatesToApply.get(i).iterator();
+ }
- final NumericDocValuesFieldUpdates.Iterator updatesIter = fieldUpdates.iterator();
+ final DocValuesFieldUpdates.Iterator updatesIter = DocValuesFieldUpdates.mergedIterator(subs);
final NumericDocValues currentValues = reader.getNumericDocValues(field);
- updatesIter.reset();
// Merge sort of the original doc values with updated doc values:
return new NumericDocValues() {
@@ -394,7 +477,7 @@ class ReadersAndUpdates {
} else {
docIDOut = updateDocID;
if (docIDOut != NO_MORE_DOCS) {
- value = updatesIter.value();
+ value = (Long) updatesIter.value();
}
}
return docIDOut;
@@ -410,16 +493,42 @@ class ReadersAndUpdates {
}
@SuppressWarnings("synthetic-access")
- private void handleBinaryDVUpdates(FieldInfos infos, Map<String,BinaryDocValuesFieldUpdates> updates,
- TrackingDirectoryWrapper dir, DocValuesFormat dvFormat, final SegmentReader reader, Map<Integer,Set<String>> fieldFiles) throws IOException {
- for (Entry<String,BinaryDocValuesFieldUpdates> e : updates.entrySet()) {
- final String field = e.getKey();
- final BinaryDocValuesFieldUpdates fieldUpdates = e.getValue();
+ private synchronized void handleBinaryDVUpdates(FieldInfos infos,
+ TrackingDirectoryWrapper dir, DocValuesFormat dvFormat, final SegmentReader reader,
+ Map<Integer,Set<String>> fieldFiles, long maxDelGen, InfoStream infoStream) throws IOException {
+ for (Entry<String,List<DocValuesFieldUpdates>> ent : pendingDVUpdates.entrySet()) {
+ final String field = ent.getKey();
+ final List<DocValuesFieldUpdates> updates = ent.getValue();
+ if (updates.get(0).type != DocValuesType.BINARY) {
+ continue;
+ }
+
+ final List<DocValuesFieldUpdates> updatesToApply = new ArrayList<>();
+ long bytes = 0;
+ for(DocValuesFieldUpdates update : updates) {
+ if (update.delGen <= maxDelGen) {
+ // safe to apply this one
+ bytes += update.ramBytesUsed();
+ updatesToApply.add(update);
+ }
+ }
+ if (updatesToApply.isEmpty()) {
+ // nothing to apply yet
+ continue;
+ }
+
+ if (infoStream.isEnabled("BD")) {
+ infoStream.message("BD", String.format(Locale.ROOT,
+ "now write %d pending binary DV updates for field=%s, seg=%s, bytes=%.3fMB",
+ updatesToApply.size(),
+ field,
+ info,
+ bytes/1024./1024.));
+ }
final long nextDocValuesGen = info.getNextDocValuesGen();
final String segmentSuffix = Long.toString(nextDocValuesGen, Character.MAX_RADIX);
- final long estUpdatesSize = fieldUpdates.ramBytesPerDoc() * info.info.maxDoc();
- final IOContext updatesContext = new IOContext(new FlushInfo(info.info.maxDoc(), estUpdatesSize));
+ final IOContext updatesContext = new IOContext(new FlushInfo(info.info.maxDoc(), bytes));
final FieldInfo fieldInfo = infos.fieldInfo(field);
assert fieldInfo != null;
fieldInfo.setDocValuesGen(nextDocValuesGen);
@@ -438,8 +547,12 @@ class ReadersAndUpdates {
}
final int maxDoc = reader.maxDoc();
- final BinaryDocValuesFieldUpdates.Iterator updatesIter = fieldUpdates.iterator();
- updatesIter.reset();
+ DocValuesFieldUpdates.Iterator[] subs = new DocValuesFieldUpdates.Iterator[updatesToApply.size()];
+ for(int i=0;i<subs.length;i++) {
+ subs[i] = updatesToApply.get(i).iterator();
+ }
+
+ final DocValuesFieldUpdates.Iterator updatesIter = DocValuesFieldUpdates.mergedIterator(subs);
final BinaryDocValues currentValues = reader.getBinaryDocValues(field);
@@ -500,7 +613,7 @@ class ReadersAndUpdates {
} else {
docIDOut = updateDocID;
if (docIDOut != NO_MORE_DOCS) {
- value = updatesIter.value();
+ value = (BytesRef) updatesIter.value();
}
}
return docIDOut;
@@ -515,7 +628,7 @@ class ReadersAndUpdates {
}
}
- private Set<String> writeFieldInfosGen(FieldInfos fieldInfos, Directory dir, DocValuesFormat dvFormat,
+ private synchronized Set<String> writeFieldInfosGen(FieldInfos fieldInfos, Directory dir, DocValuesFormat dvFormat,
FieldInfosFormat infosFormat) throws IOException {
final long nextFieldInfosGen = info.getNextFieldInfosGen();
final String segmentSuffix = Long.toString(nextFieldInfosGen, Character.MAX_RADIX);
@@ -531,28 +644,52 @@ class ReadersAndUpdates {
return trackingDir.getCreatedFiles();
}
- // Writes field updates (new _X_N updates files) to the directory
- public synchronized void writeFieldUpdates(Directory dir, DocValuesFieldUpdates.Container dvUpdates) throws IOException {
- assert Thread.holdsLock(writer);
- //System.out.println("rld.writeFieldUpdates: seg=" + info + " numericFieldUpdates=" + numericFieldUpdates);
-
- assert dvUpdates.any();
-
- // Do this so we can delete any created files on
- // exception; this saves all codecs from having to do
- // it:
- TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);
+ public synchronized boolean writeFieldUpdates(Directory dir, long maxDelGen, InfoStream infoStream) throws IOException {
+
+ long startTimeNS = System.nanoTime();
+ assert Thread.holdsLock(writer);
+
final Map<Integer,Set<String>> newDVFiles = new HashMap<>();
Set<String> fieldInfosFiles = null;
FieldInfos fieldInfos = null;
+
+ boolean any = false;
+ int count = 0;
+ for (List<DocValuesFieldUpdates> updates : pendingDVUpdates.values()) {
+ // Sort by increasing delGen:
+ Collections.sort(updates, (a, b) -> Long.compare(a.delGen, b.delGen));
+ count += updates.size();
+ for (DocValuesFieldUpdates update : updates) {
+ if (update.delGen <= maxDelGen && update.any()) {
+ any = true;
+ break;
+ }
+ }
+ }
+
+ if (any == false) {
+ // no updates
+ return false;
+ }
+
+ // Do this so we can delete any created files on
+ // exception; this saves all codecs from having to do it:
+ TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);
+
boolean success = false;
try {
final Codec codec = info.info.getCodec();
// reader could be null e.g. for a just merged segment (from
// IndexWriter.commitMergedDeletes).
- final SegmentReader reader = this.reader == null ? new SegmentReader(info, writer.segmentInfos.getIndexCreatedVersionMajor(), IOContext.READONCE) : this.reader;
+ final SegmentReader reader;
+ if (this.reader == null) {
+ reader = new SegmentReader(info, writer.segmentInfos.getIndexCreatedVersionMajor(), IOContext.READONCE);
+ } else {
+ reader = this.reader;
+ }
+
try {
// clone FieldInfos so that we can update their dvGen separately from
// the reader's infos and write them to a new fieldInfos_gen file
@@ -567,38 +704,30 @@ class ReadersAndUpdates {
}
clone.setDocValuesGen(fi.getDocValuesGen());
}
- // create new fields or update existing ones to have NumericDV type
- for (String f : dvUpdates.numericDVUpdates.keySet()) {
- FieldInfo fieldInfo = builder.getOrAdd(f);
- fieldInfo.setDocValuesType(DocValuesType.NUMERIC);
- }
- // create new fields or update existing ones to have BinaryDV type
- for (String f : dvUpdates.binaryDVUpdates.keySet()) {
- FieldInfo fieldInfo = builder.getOrAdd(f);
- fieldInfo.setDocValuesType(DocValuesType.BINARY);
+
+ // create new fields with the right DV type
+ for (List<DocValuesFieldUpdates> updates : pendingDVUpdates.values()) {
+ DocValuesFieldUpdates update = updates.get(0);
+ FieldInfo fieldInfo = builder.getOrAdd(update.field);
+ fieldInfo.setDocValuesType(update.type);
}
fieldInfos = builder.finish();
final DocValuesFormat docValuesFormat = codec.docValuesFormat();
-// System.out.println("[" + Thread.currentThread().getName() + "] RLD.writeFieldUpdates: applying numeric updates; seg=" + info + " updates=" + numericFieldUpdates);
- handleNumericDVUpdates(fieldInfos, dvUpdates.numericDVUpdates, trackingDir, docValuesFormat, reader, newDVFiles);
-
-// System.out.println("[" + Thread.currentThread().getName() + "] RAU.writeFieldUpdates: applying binary updates; seg=" + info + " updates=" + dvUpdates.binaryDVUpdates);
- handleBinaryDVUpdates(fieldInfos, dvUpdates.binaryDVUpdates, trackingDir, docValuesFormat, reader, newDVFiles);
+ handleNumericDVUpdates(fieldInfos, trackingDir, docValuesFormat, reader, newDVFiles, maxDelGen, infoStream);
+ handleBinaryDVUpdates(fieldInfos, trackingDir, docValuesFormat, reader, newDVFiles, maxDelGen, infoStream);
-// System.out.println("[" + Thread.currentThread().getName() + "] RAU.writeFieldUpdates: write fieldInfos; seg=" + info);
fieldInfosFiles = writeFieldInfosGen(fieldInfos, trackingDir, docValuesFormat, codec.fieldInfosFormat());
} finally {
if (reader != this.reader) {
-// System.out.println("[" + Thread.currentThread().getName() + "] RLD.writeLiveDocs: closeReader " + reader);
reader.close();
}
}
success = true;
} finally {
- if (!success) {
+ if (success == false) {
// Advance only the nextWriteFieldInfosGen and nextWriteDocValuesGen, so
// that a 2nd attempt to write will write to a new file
info.advanceNextWriteFieldInfosGen();
@@ -610,27 +739,48 @@ class ReadersAndUpdates {
}
}
}
-
- // copy all the updates to mergingUpdates, so they can later be applied to the merged segment
- if (isMerging) {
- for (Entry<String,NumericDocValuesFieldUpdates> e : dvUpdates.numericDVUpdates.entrySet()) {
- DocValuesFieldUpdates updates = mergingDVUpdates.get(e.getKey());
- if (updates == null) {
- mergingDVUpdates.put(e.getKey(), e.getValue());
+
+ // Prune the now-written DV updates:
+ long bytesFreed = 0;
+ Iterator<Map.Entry<String,List<DocValuesFieldUpdates>>> it = pendingDVUpdates.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String,List<DocValuesFieldUpdates>> ent = it.next();
+ int upto = 0;
+ List<DocValuesFieldUpdates> updates = ent.getValue();
+ for (DocValuesFieldUpdates update : updates) {
+ if (update.delGen > maxDelGen) {
+ // not yet applied
+ updates.set(upto, update);
+ upto++;
} else {
- updates.merge(e.getValue());
+ bytesFreed += update.ramBytesUsed();
}
}
- for (Entry<String,BinaryDocValuesFieldUpdates> e : dvUpdates.binaryDVUpdates.entrySet()) {
- DocValuesFieldUpdates updates = mergingDVUpdates.get(e.getKey());
- if (updates == null) {
- mergingDVUpdates.put(e.getKey(), e.getValue());
- } else {
- updates.merge(e.getValue());
+ if (upto == 0) {
+ it.remove();
+ } else {
+ updates.subList(upto, updates.size()).clear();
+ }
+ }
+
+ long bytes = ramBytesUsed.addAndGet(-bytesFreed);
+ assert bytes >= 0;
+
+ // if there is a reader open, reopen it to reflect the updates
+ if (reader != null) {
+ SegmentReader newReader = new SegmentReader(info, reader, liveDocs, info.info.maxDoc() - info.getDelCount() - pendingDeleteCount);
+ boolean success2 = false;
+ try {
+ reader.decRef();
+ reader = newReader;
+ success2 = true;
+ } finally {
+ if (success2 == false) {
+ newReader.decRef();
}
}
}
-
+
// writing field updates succeeded
assert fieldInfosFiles != null;
info.setFieldInfosFiles(fieldInfosFiles);
@@ -639,46 +789,75 @@ class ReadersAndUpdates {
// of files, hence we copy from the existing map all fields w/ updates that
// were not updated in this session, and add new mappings for fields that
// were updated now.
- assert !newDVFiles.isEmpty();
+ assert newDVFiles.isEmpty() == false;
for (Entry<Integer,Set<String>> e : info.getDocValuesUpdatesFiles().entrySet()) {
- if (!newDVFiles.containsKey(e.getKey())) {
+ if (newDVFiles.containsKey(e.getKey()) == false) {
newDVFiles.put(e.getKey(), e.getValue());
}
}
info.setDocValuesUpdatesFiles(newDVFiles);
-
+
// wrote new files, should checkpoint()
- writer.checkpoint();
+ writer.checkpointNoSIS();
- // if there is a reader open, reopen it to reflect the updates
- if (reader != null) {
- SegmentReader newReader = new SegmentReader(info, reader, liveDocs, info.info.maxDoc() - info.getDelCount() - pendingDeleteCount);
- boolean reopened = false;
+ if (infoStream.isEnabled("BD")) {
+ infoStream.message("BD", String.format(Locale.ROOT, "done write field updates for seg=%s; took %.3fs; new files: %s",
+ info, (System.nanoTime() - startTimeNS)/1000000000.0, newDVFiles));
+ }
+
+ return true;
+ }
+
+ /** Returns a reader for merge, with the latest doc values updates and deletions. */
+ synchronized SegmentReader getReaderForMerge(IOContext context) throws IOException {
+
+ // This ensures any newly resolved doc value updates while we are merging are
+ // saved for re-applying after this segment is done merging:
+ isMerging = true;
+
+ assert mergingDVUpdates.isEmpty();
+
+ // We must carry over any still-pending DV updates because they were not
+ // successfully written, e.g. because there was a hole in the delGens,
+ // or they arrived after we wrote all DVs for merge but before we set
+ // isMerging here:
+ for (Map.Entry<String, List<DocValuesFieldUpdates>> ent : pendingDVUpdates.entrySet()) {
+ List<DocValuesFieldUpdates> mergingUpdates = mergingDVUpdates.get(ent.getKey());
+ if (mergingUpdates == null) {
+ mergingUpdates = new ArrayList<>();
+ mergingDVUpdates.put(ent.getKey(), mergingUpdates);
+ }
+ mergingUpdates.addAll(ent.getValue());
+ }
+
+ SegmentReader reader = getReader(context);
+ int delCount = pendingDeleteCount + info.getDelCount();
+ if (delCount != reader.numDeletedDocs()) {
+
+ // beware of zombies:
+ assert delCount > reader.numDeletedDocs(): "delCount=" + delCount + " reader.numDeletedDocs()=" + reader.numDeletedDocs();
+
+ assert liveDocs != null;
+
+ // Create a new reader with the latest live docs:
+ SegmentReader newReader = new SegmentReader(info, reader, liveDocs, info.info.maxDoc() - delCount);
+ boolean success = false;
try {
reader.decRef();
- reader = newReader;
- reopened = true;
+ success = true;
} finally {
- if (!reopened) {
+ if (success == false) {
newReader.decRef();
}
}
+ reader = newReader;
}
- }
- /**
- * Returns a reader for merge. This method applies field updates if there are
- * any and marks that this segment is currently merging.
- */
- synchronized SegmentReader getReaderForMerge(IOContext context) throws IOException {
- assert Thread.holdsLock(writer);
- // must execute these two statements as atomic operation, otherwise we
- // could lose updates if e.g. another thread calls writeFieldUpdates in
- // between, or the updates are applied to the obtained reader, but then
- // re-applied in IW.commitMergedDeletes (unnecessary work and potential
- // bugs).
- isMerging = true;
- return getReader(context);
+ liveDocsShared = true;
+
+ assert verifyDocCounts();
+
+ return reader;
}
/**
@@ -689,12 +868,13 @@ class ReadersAndUpdates {
mergingDVUpdates.clear();
isMerging = false;
}
-
- /** Returns updates that came in while this segment was merging. */
- public synchronized Map<String,DocValuesFieldUpdates> getMergingFieldUpdates() {
+
+ public synchronized Map<String,List<DocValuesFieldUpdates>> getMergingDVUpdates() {
+ // We must atomically (in single sync'd block) clear isMerging when we return the DV updates otherwise we can lose updates:
+ isMerging = false;
return mergingDVUpdates;
}
-
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -703,5 +883,4 @@ class ReadersAndUpdates {
sb.append(" liveDocsShared=").append(liveDocsShared);
return sb.toString();
}
-
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/SegmentCommitInfo.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/SegmentCommitInfo.java b/lucene/core/src/java/org/apache/lucene/index/SegmentCommitInfo.java
index b1084a6..661283b 100644
--- a/lucene/core/src/java/org/apache/lucene/index/SegmentCommitInfo.java
+++ b/lucene/core/src/java/org/apache/lucene/index/SegmentCommitInfo.java
@@ -70,6 +70,10 @@ public class SegmentCommitInfo {
private volatile long sizeInBytes = -1;
+ // NOTE: only used in-RAM by IW to track buffered deletes;
+ // this is never written to/read from the Directory
+ private long bufferedDeletesGen = -1;
+
/**
* Sole constructor.
*
@@ -236,17 +240,17 @@ public class SegmentCommitInfo {
return files;
}
- // NOTE: only used in-RAM by IW to track buffered deletes;
- // this is never written to/read from the Directory
- private long bufferedDeletesGen;
-
long getBufferedDeletesGen() {
return bufferedDeletesGen;
}
void setBufferedDeletesGen(long v) {
- bufferedDeletesGen = v;
- sizeInBytes = -1;
+ if (bufferedDeletesGen == -1) {
+ bufferedDeletesGen = v;
+ sizeInBytes = -1;
+ } else {
+ throw new IllegalStateException("buffered deletes gen should only be set once");
+ }
}
/** Returns true if there are any deletions for the
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java b/lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java
index ce2d448..2d2e786 100644
--- a/lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java
+++ b/lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java
@@ -92,7 +92,6 @@ final class SegmentCoreReaders {
final Codec codec = si.info.getCodec();
final Directory cfsDir; // confusing name: if (cfs) it's the cfsdir, otherwise it's the segment's directory.
-
boolean success = false;
try {
@@ -164,7 +163,6 @@ final class SegmentCoreReaders {
void decRef() throws IOException {
if (ref.decrementAndGet() == 0) {
-// System.err.println("--- closing core readers");
Throwable th = null;
try {
IOUtils.close(termVectorsLocal, fieldsReaderLocal, fields, termVectorsReaderOrig, fieldsReaderOrig,
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/SegmentInfo.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/SegmentInfo.java b/lucene/core/src/java/org/apache/lucene/index/SegmentInfo.java
index 1c02441..5e6d773 100644
--- a/lucene/core/src/java/org/apache/lucene/index/SegmentInfo.java
+++ b/lucene/core/src/java/org/apache/lucene/index/SegmentInfo.java
@@ -172,7 +172,7 @@ public final class SegmentInfo {
/** Return all files referenced by this SegmentInfo. */
public Set<String> files() {
if (setFiles == null) {
- throw new IllegalStateException("files were not computed yet");
+ throw new IllegalStateException("files were not computed yet; segment=" + name + " maxDoc=" + maxDoc);
}
return Collections.unmodifiableSet(setFiles);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/SegmentReader.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/SegmentReader.java b/lucene/core/src/java/org/apache/lucene/index/SegmentReader.java
index c8235d5..bd82156 100644
--- a/lucene/core/src/java/org/apache/lucene/index/SegmentReader.java
+++ b/lucene/core/src/java/org/apache/lucene/index/SegmentReader.java
@@ -158,15 +158,22 @@ public final class SegmentReader extends CodecReader {
* init most recent DocValues for the current commit
*/
private DocValuesProducer initDocValuesProducer() throws IOException {
- final Directory dir = core.cfsReader != null ? core.cfsReader : si.info.dir;
- if (!fieldInfos.hasDocValues()) {
+ if (fieldInfos.hasDocValues() == false) {
return null;
- } else if (si.hasFieldUpdates()) {
- return new SegmentDocValuesProducer(si, dir, core.coreFieldInfos, fieldInfos, segDocValues);
} else {
- // simple case, no DocValues updates
- return segDocValues.getDocValuesProducer(-1L, si, dir, fieldInfos);
+ Directory dir;
+ if (core.cfsReader != null) {
+ dir = core.cfsReader;
+ } else {
+ dir = si.info.dir;
+ }
+ if (si.hasFieldUpdates()) {
+ return new SegmentDocValuesProducer(si, dir, core.coreFieldInfos, fieldInfos, segDocValues);
+ } else {
+ // simple case, no DocValues updates
+ return segDocValues.getDocValuesProducer(-1L, si, dir, fieldInfos);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/SerialMergeScheduler.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/SerialMergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/SerialMergeScheduler.java
index 5a8f98b..ce68247 100644
--- a/lucene/core/src/java/org/apache/lucene/index/SerialMergeScheduler.java
+++ b/lucene/core/src/java/org/apache/lucene/index/SerialMergeScheduler.java
@@ -32,11 +32,11 @@ public class SerialMergeScheduler extends MergeScheduler {
* multiple threads, only one merge may run at a time. */
@Override
synchronized public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException {
-
while(true) {
MergePolicy.OneMerge merge = writer.getNextMerge();
- if (merge == null)
+ if (merge == null) {
break;
+ }
writer.merge(merge);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java
index 668f1ec..c020a6d 100644
--- a/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
@@ -237,26 +238,20 @@ public class TieredMergePolicy extends MergePolicy {
private class SegmentByteSizeDescending implements Comparator<SegmentCommitInfo> {
- private final IndexWriter writer;
+ private final Map<SegmentCommitInfo, Long> sizeInBytes;
- SegmentByteSizeDescending(IndexWriter writer) {
- this.writer = writer;
+ SegmentByteSizeDescending(Map<SegmentCommitInfo, Long> sizeInBytes) {
+ this.sizeInBytes = sizeInBytes;
}
+
@Override
public int compare(SegmentCommitInfo o1, SegmentCommitInfo o2) {
- try {
- final long sz1 = size(o1, writer);
- final long sz2 = size(o2, writer);
- if (sz1 > sz2) {
- return -1;
- } else if (sz2 > sz1) {
- return 1;
- } else {
- return o1.info.name.compareTo(o2.info.name);
- }
- } catch (IOException ioe) {
- throw new RuntimeException(ioe);
+ // Sort by largest size:
+ int cmp = Long.compare(sizeInBytes.get(o2), sizeInBytes.get(o1));
+ if (cmp == 0) {
+ cmp = o1.info.name.compareTo(o2.info.name);
}
+ return cmp;
}
}
@@ -277,6 +272,14 @@ public class TieredMergePolicy extends MergePolicy {
abstract String getExplanation();
}
+ private Map<SegmentCommitInfo,Long> getSegmentSizes(IndexWriter writer, Collection<SegmentCommitInfo> infos) throws IOException {
+ Map<SegmentCommitInfo,Long> sizeInBytes = new HashMap<>();
+ for (SegmentCommitInfo info : infos) {
+ sizeInBytes.put(info, size(info, writer));
+ }
+ return sizeInBytes;
+ }
+
@Override
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos infos, IndexWriter writer) throws IOException {
if (verbose(writer)) {
@@ -289,13 +292,19 @@ public class TieredMergePolicy extends MergePolicy {
final Collection<SegmentCommitInfo> toBeMerged = new HashSet<>();
final List<SegmentCommitInfo> infosSorted = new ArrayList<>(infos.asList());
- Collections.sort(infosSorted, new SegmentByteSizeDescending(writer));
+
+ // The size can change concurrently while we are running here, because deletes
+ // are now applied concurrently, and this can piss off TimSort! So we
+ // call size() once per segment and sort by that:
+ Map<SegmentCommitInfo,Long> sizeInBytes = getSegmentSizes(writer, infos.asList());
+
+ Collections.sort(infosSorted, new SegmentByteSizeDescending(sizeInBytes));
// Compute total index bytes & print details about the index
long totIndexBytes = 0;
long minSegmentBytes = Long.MAX_VALUE;
for(SegmentCommitInfo info : infosSorted) {
- final long segBytes = size(info, writer);
+ final long segBytes = sizeInBytes.get(info);
if (verbose(writer)) {
String extra = merging.contains(info) ? " [merging]" : "";
if (segBytes >= maxMergedSegmentBytes/2.0) {
@@ -315,7 +324,7 @@ public class TieredMergePolicy extends MergePolicy {
// of the maxSegmentCount:
int tooBigCount = 0;
while (tooBigCount < infosSorted.size()) {
- long segBytes = size(infosSorted.get(tooBigCount), writer);
+ long segBytes = sizeInBytes.get(infosSorted.get(tooBigCount));
if (segBytes < maxMergedSegmentBytes/2.0) {
break;
}
@@ -355,7 +364,7 @@ public class TieredMergePolicy extends MergePolicy {
for(int idx = tooBigCount; idx<infosSorted.size(); idx++) {
final SegmentCommitInfo info = infosSorted.get(idx);
if (merging.contains(info)) {
- mergingBytes += size(info, writer);
+ mergingBytes += sizeInBytes.get(info);
} else if (!toBeMerged.contains(info)) {
eligible.add(info);
}
@@ -388,7 +397,7 @@ public class TieredMergePolicy extends MergePolicy {
boolean hitTooLarge = false;
for(int idx = startIdx;idx<eligible.size() && candidate.size() < maxMergeAtOnce;idx++) {
final SegmentCommitInfo info = eligible.get(idx);
- final long segBytes = size(info, writer);
+ final long segBytes = sizeInBytes.get(info);
if (totAfterMergeBytes + segBytes > maxMergedSegmentBytes) {
hitTooLarge = true;
@@ -408,7 +417,7 @@ public class TieredMergePolicy extends MergePolicy {
// segments, and already pre-excluded the too-large segments:
assert candidate.size() > 0;
- final MergeScore score = score(candidate, hitTooLarge, mergingBytes, writer);
+ final MergeScore score = score(candidate, hitTooLarge, mergingBytes, writer, sizeInBytes);
if (verbose(writer)) {
message(" maybe=" + writer.segString(candidate) + " score=" + score.getScore() + " " + score.getExplanation() + " tooLarge=" + hitTooLarge + " size=" + String.format(Locale.ROOT, "%.3f MB", totAfterMergeBytes/1024./1024.), writer);
}
@@ -447,12 +456,12 @@ public class TieredMergePolicy extends MergePolicy {
}
/** Expert: scores one merge; subclasses can override. */
- protected MergeScore score(List<SegmentCommitInfo> candidate, boolean hitTooLarge, long mergingBytes, IndexWriter writer) throws IOException {
+ protected MergeScore score(List<SegmentCommitInfo> candidate, boolean hitTooLarge, long mergingBytes, IndexWriter writer, Map<SegmentCommitInfo, Long> sizeInBytes) throws IOException {
long totBeforeMergeBytes = 0;
long totAfterMergeBytes = 0;
long totAfterMergeBytesFloored = 0;
for(SegmentCommitInfo info : candidate) {
- final long segBytes = size(info, writer);
+ final long segBytes = sizeInBytes.get(info);
totAfterMergeBytes += segBytes;
totAfterMergeBytesFloored += floorSize(segBytes);
totBeforeMergeBytes += info.sizeInBytes();
@@ -472,7 +481,7 @@ public class TieredMergePolicy extends MergePolicy {
// over time:
skew = 1.0/maxMergeAtOnce;
} else {
- skew = ((double) floorSize(size(candidate.get(0), writer)))/totAfterMergeBytesFloored;
+ skew = ((double) floorSize(sizeInBytes.get(candidate.get(0))))/totAfterMergeBytesFloored;
}
// Strongly favor merges with less skew (smaller
@@ -519,7 +528,7 @@ public class TieredMergePolicy extends MergePolicy {
final Boolean isOriginal = segmentsToMerge.get(info);
if (isOriginal != null) {
segmentIsOriginal = isOriginal;
- if (!merging.contains(info)) {
+ if (merging.contains(info) == false) {
eligible.add(info);
} else {
forceMergeRunning = true;
@@ -531,6 +540,11 @@ public class TieredMergePolicy extends MergePolicy {
return null;
}
+ // The size can change concurrently while we are running here, because deletes
+ // are now applied concurrently, and this can piss off TimSort! So we
+ // call size() once per segment and sort by that:
+ Map<SegmentCommitInfo,Long> sizeInBytes = getSegmentSizes(writer, eligible);
+
if ((maxSegmentCount > 1 && eligible.size() <= maxSegmentCount) ||
(maxSegmentCount == 1 && eligible.size() == 1 && (!segmentIsOriginal || isMerged(infos, eligible.get(0), writer)))) {
if (verbose(writer)) {
@@ -539,7 +553,7 @@ public class TieredMergePolicy extends MergePolicy {
return null;
}
- Collections.sort(eligible, new SegmentByteSizeDescending(writer));
+ Collections.sort(eligible, new SegmentByteSizeDescending(sizeInBytes));
if (verbose(writer)) {
message("eligible=" + eligible, writer);
@@ -595,7 +609,12 @@ public class TieredMergePolicy extends MergePolicy {
return null;
}
- Collections.sort(eligible, new SegmentByteSizeDescending(writer));
+ // The size can change concurrently while we are running here, because deletes
+ // are now applied concurrently, and this can piss off TimSort! So we
+ // call size() once per segment and sort by that:
+ Map<SegmentCommitInfo,Long> sizeInBytes = getSegmentSizes(writer, infos.asList());
+
+ Collections.sort(eligible, new SegmentByteSizeDescending(sizeInBytes));
if (verbose(writer)) {
message("eligible=" + eligible, writer);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/java/org/apache/lucene/util/packed/AbstractPagedMutable.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/util/packed/AbstractPagedMutable.java b/lucene/core/src/java/org/apache/lucene/util/packed/AbstractPagedMutable.java
index c5fac1e..e73863b 100644
--- a/lucene/core/src/java/org/apache/lucene/util/packed/AbstractPagedMutable.java
+++ b/lucene/core/src/java/org/apache/lucene/util/packed/AbstractPagedMutable.java
@@ -84,7 +84,7 @@ abstract class AbstractPagedMutable<T extends AbstractPagedMutable<T>> extends L
@Override
public final long get(long index) {
- assert index >= 0 && index < size;
+ assert index >= 0 && index < size: "index=" + index + " size=" + size;
final int pageIndex = pageIndex(index);
final int indexInPage = indexInPage(index);
return subMutables[pageIndex].get(indexInPage);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/test/org/apache/lucene/index/TestBinaryDocValuesUpdates.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestBinaryDocValuesUpdates.java b/lucene/core/src/test/org/apache/lucene/index/TestBinaryDocValuesUpdates.java
index ed2b66f..80ae804 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestBinaryDocValuesUpdates.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestBinaryDocValuesUpdates.java
@@ -17,7 +17,9 @@
package org.apache.lucene.index;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -31,10 +33,13 @@ import org.apache.lucene.codecs.asserting.AssertingDocValuesFormat;
import org.apache.lucene.document.BinaryDocValuesField;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.SortedDocValuesField;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.document.StringField;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.SortField;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.util.Bits;
@@ -42,7 +47,6 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
-import org.junit.Test;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
@@ -204,12 +208,15 @@ public class TestBinaryDocValuesUpdates extends LuceneTestCase {
writer.commit();
reader1 = DirectoryReader.open(dir);
}
+ System.out.println("TEST: isNRT=" + isNRT + " reader1=" + reader1);
// update doc
writer.updateBinaryDocValue(new Term("id", "doc-0"), "val", toBytes(10)); // update doc-0's value to 10
if (!isNRT) {
writer.commit();
}
+
+ System.out.println("TEST: now reopen");
// reopen reader and assert only it sees the update
final DirectoryReader reader2 = DirectoryReader.openIfChanged(reader1);
@@ -545,7 +552,7 @@ public class TestBinaryDocValuesUpdates extends LuceneTestCase {
long value = rnd + 1;
writer.updateBinaryDocValue(new Term("key", "doc"), "bdv", toBytes(value));
- if (random.nextDouble() < 0.2) { // randomly delete some docs
+ if (random.nextDouble() < 0.2) { // randomly delete one doc
writer.deleteDocuments(new Term("id", Integer.toString(random.nextInt(docid))));
}
@@ -623,6 +630,140 @@ public class TestBinaryDocValuesUpdates extends LuceneTestCase {
dir.close();
}
+ static class OneSortDoc implements Comparable<OneSortDoc> {
+ public BytesRef value;
+ public final long sortValue;
+ public final int id;
+ public boolean deleted;
+
+ public OneSortDoc(int id, BytesRef value, long sortValue) {
+ this.value = value;
+ this.sortValue = sortValue;
+ this.id = id;
+ }
+
+ @Override
+ public int compareTo(OneSortDoc other) {
+ int cmp = Long.compare(sortValue, other.sortValue);
+ if (cmp == 0) {
+ cmp = Integer.compare(id, other.id);
+ assert cmp != 0;
+ }
+ return cmp;
+ }
+ }
+
+ public void testSortedIndex() throws Exception {
+ Directory dir = newDirectory();
+ IndexWriterConfig iwc = newIndexWriterConfig();
+ iwc.setIndexSort(new Sort(new SortField("sort", SortField.Type.LONG)));
+ RandomIndexWriter w = new RandomIndexWriter(random(), dir, iwc);
+
+ int valueRange = TestUtil.nextInt(random(), 1, 1000);
+ int sortValueRange = TestUtil.nextInt(random(), 1, 1000);
+
+ int refreshChance = TestUtil.nextInt(random(), 5, 200);
+ int deleteChance = TestUtil.nextInt(random(), 2, 100);
+
+ int idUpto = 0;
+ int deletedCount = 0;
+
+ List<OneSortDoc> docs = new ArrayList<>();
+ DirectoryReader r = w.getReader();
+
+ int numIters = atLeast(1000);
+ for(int iter=0;iter<numIters;iter++) {
+ BytesRef value = toBytes((long) random().nextInt(valueRange));
+ if (docs.isEmpty() || random().nextInt(3) == 1) {
+ int id = docs.size();
+ // add new doc
+ Document doc = new Document();
+ doc.add(newStringField("id", Integer.toString(id), Field.Store.YES));
+ doc.add(new BinaryDocValuesField("number", value));
+ int sortValue = random().nextInt(sortValueRange);
+ doc.add(new NumericDocValuesField("sort", sortValue));
+ if (VERBOSE) {
+ System.out.println("TEST: iter=" + iter + " add doc id=" + id + " sortValue=" + sortValue + " value=" + value);
+ }
+ w.addDocument(doc);
+
+ docs.add(new OneSortDoc(id, value, sortValue));
+ } else {
+ // update existing doc value
+ int idToUpdate = random().nextInt(docs.size());
+ if (VERBOSE) {
+ System.out.println("TEST: iter=" + iter + " update doc id=" + idToUpdate + " new value=" + value);
+ }
+ w.updateBinaryDocValue(new Term("id", Integer.toString(idToUpdate)), "number", value);
+
+ docs.get(idToUpdate).value = value;
+ }
+
+ if (random().nextInt(deleteChance) == 0) {
+ int idToDelete = random().nextInt(docs.size());
+ if (VERBOSE) {
+ System.out.println("TEST: delete doc id=" + idToDelete);
+ }
+ w.deleteDocuments(new Term("id", Integer.toString(idToDelete)));
+ if (docs.get(idToDelete).deleted == false) {
+ docs.get(idToDelete).deleted = true;
+ deletedCount++;
+ }
+ }
+
+ if (random().nextInt(refreshChance) == 0) {
+ if (VERBOSE) {
+ System.out.println("TEST: now get reader; old reader=" + r);
+ }
+ DirectoryReader r2 = w.getReader();
+ r.close();
+ r = r2;
+
+ if (VERBOSE) {
+ System.out.println("TEST: got reader=" + r);
+ }
+
+ int liveCount = 0;
+
+ for (LeafReaderContext ctx : r.leaves()) {
+ LeafReader leafReader = ctx.reader();
+ BinaryDocValues values = leafReader.getBinaryDocValues("number");
+ NumericDocValues sortValues = leafReader.getNumericDocValues("sort");
+ Bits liveDocs = leafReader.getLiveDocs();
+
+ long lastSortValue = Long.MIN_VALUE;
+ for (int i=0;i<leafReader.maxDoc();i++) {
+
+ Document doc = leafReader.document(i);
+ OneSortDoc sortDoc = docs.get(Integer.parseInt(doc.get("id")));
+
+ assertEquals(i, values.nextDoc());
+ assertEquals(i, sortValues.nextDoc());
+
+ if (liveDocs != null && liveDocs.get(i) == false) {
+ assertTrue(sortDoc.deleted);
+ continue;
+ }
+ assertFalse(sortDoc.deleted);
+
+ assertEquals(sortDoc.value, values.binaryValue());
+
+ long sortValue = sortValues.longValue();
+ assertEquals(sortDoc.sortValue, sortValue);
+
+ assertTrue(sortValue >= lastSortValue);
+ lastSortValue = sortValue;
+ liveCount++;
+ }
+ }
+
+ assertEquals(docs.size() - deletedCount, liveCount);
+ }
+ }
+
+ IOUtils.close(r, w, dir);
+ }
+
public void testManyReopensAndFields() throws Exception {
Directory dir = newDirectory();
final Random random = random();
@@ -1283,7 +1424,6 @@ public class TestBinaryDocValuesUpdates extends LuceneTestCase {
dir.close();
}
- @Test
public void testIOContext() throws Exception {
// LUCENE-5591: make sure we pass an IOContext with an approximate
// segmentSize in FlushInfo
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java
index c60f54d..8991aea 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java
@@ -34,8 +34,8 @@ import org.apache.lucene.util.ThreadInterruptedException;
*/
public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
- public void testUpdateDelteSlices() {
- DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue();
+ public void testUpdateDelteSlices() throws Exception {
+ DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue(null);
final int size = 200 + random().nextInt(500) * RANDOM_MULTIPLIER;
Integer[] ids = new Integer[size];
for (int i = 0; i < ids.length; i++) {
@@ -70,11 +70,11 @@ public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
}
assertEquals(j+1, queue.numGlobalTermDeletes());
}
- assertEquals(uniqueValues, bd1.terms.keySet());
- assertEquals(uniqueValues, bd2.terms.keySet());
+ assertEquals(uniqueValues, bd1.deleteTerms.keySet());
+ assertEquals(uniqueValues, bd2.deleteTerms.keySet());
HashSet<Term> frozenSet = new HashSet<>();
BytesRefBuilder bytesRef = new BytesRefBuilder();
- TermIterator iter = queue.freezeGlobalBuffer(null).termIterator();
+ TermIterator iter = queue.freezeGlobalBuffer(null).deleteTerms.iterator();
while (iter.next() != null) {
bytesRef.copyBytes(iter.bytes);
frozenSet.add(new Term(iter.field(), bytesRef.toBytesRef()));
@@ -87,13 +87,12 @@ public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
private void assertAllBetween(int start, int end, BufferedUpdates deletes,
Integer[] ids) {
for (int i = start; i <= end; i++) {
- assertEquals(Integer.valueOf(end), deletes.terms.get(new Term("id",
- ids[i].toString())));
+ assertEquals(Integer.valueOf(end), deletes.deleteTerms.get(new Term("id", ids[i].toString())));
}
}
public void testClear() {
- DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue();
+ DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue(null);
assertFalse(queue.anyChanges());
queue.clear();
assertFalse(queue.anyChanges());
@@ -115,8 +114,8 @@ public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
}
- public void testAnyChanges() {
- DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue();
+ public void testAnyChanges() throws Exception {
+ DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue(null);
final int size = 200 + random().nextInt(500) * RANDOM_MULTIPLIER;
int termsSinceFreeze = 0;
int queriesSinceFreeze = 0;
@@ -132,8 +131,8 @@ public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
assertTrue(queue.anyChanges());
if (random().nextInt(5) == 0) {
FrozenBufferedUpdates freezeGlobalBuffer = queue.freezeGlobalBuffer(null);
- assertEquals(termsSinceFreeze, freezeGlobalBuffer.terms.size());
- assertEquals(queriesSinceFreeze, freezeGlobalBuffer.queries.length);
+ assertEquals(termsSinceFreeze, freezeGlobalBuffer.deleteTerms.size());
+ assertEquals(queriesSinceFreeze, freezeGlobalBuffer.deleteQueries.length);
queriesSinceFreeze = 0;
termsSinceFreeze = 0;
assertFalse(queue.anyChanges());
@@ -141,10 +140,8 @@ public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
}
}
- public void testPartiallyAppliedGlobalSlice() throws SecurityException,
- NoSuchFieldException, IllegalArgumentException, IllegalAccessException,
- InterruptedException {
- final DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue();
+ public void testPartiallyAppliedGlobalSlice() throws Exception {
+ final DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue(null);
ReentrantLock lock = queue.globalBufferLock;
lock.lock();
Thread t = new Thread() {
@@ -161,12 +158,12 @@ public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
assertTrue("changes in global buffer", queue.anyChanges());
FrozenBufferedUpdates freezeGlobalBuffer = queue.freezeGlobalBuffer(null);
assertTrue(freezeGlobalBuffer.any());
- assertEquals(1, freezeGlobalBuffer.terms.size());
+ assertEquals(1, freezeGlobalBuffer.deleteTerms.size());
assertFalse("all changes applied", queue.anyChanges());
}
- public void testStressDeleteQueue() throws InterruptedException {
- DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue();
+ public void testStressDeleteQueue() throws Exception {
+ DocumentsWriterDeleteQueue queue = new DocumentsWriterDeleteQueue(null);
Set<Term> uniqueValues = new HashSet<>();
final int size = 10000 + random().nextInt(500) * RANDOM_MULTIPLIER;
Integer[] ids = new Integer[size];
@@ -192,13 +189,13 @@ public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
queue.updateSlice(slice);
BufferedUpdates deletes = updateThread.deletes;
slice.apply(deletes, BufferedUpdates.MAX_INT);
- assertEquals(uniqueValues, deletes.terms.keySet());
+ assertEquals(uniqueValues, deletes.deleteTerms.keySet());
}
queue.tryApplyGlobalSlice();
Set<Term> frozenSet = new HashSet<>();
BytesRefBuilder builder = new BytesRefBuilder();
- TermIterator iter = queue.freezeGlobalBuffer(null).termIterator();
+ TermIterator iter = queue.freezeGlobalBuffer(null).deleteTerms.iterator();
while (iter.next() != null) {
builder.copyBytes(iter.bytes);
frozenSet.add(new Term(iter.field(), builder.toBytesRef()));
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
index aa2901c..ca5aba8 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
@@ -74,11 +74,9 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
iwc.setIndexerThreadPool(threadPool);
iwc.setRAMBufferSizeMB(maxRamMB);
iwc.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH);
- iwc.setMaxBufferedDeleteTerms(IndexWriterConfig.DISABLE_AUTO_FLUSH);
IndexWriter writer = new IndexWriter(dir, iwc);
flushPolicy = (MockDefaultFlushPolicy) writer.getConfig().getFlushPolicy();
assertFalse(flushPolicy.flushOnDocCount());
- assertFalse(flushPolicy.flushOnDeleteTerms());
assertTrue(flushPolicy.flushOnRAM());
DocumentsWriter docsWriter = writer.getDocsWriter();
assertNotNull(docsWriter);
@@ -131,11 +129,9 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
iwc.setIndexerThreadPool(threadPool);
iwc.setMaxBufferedDocs(2 + atLeast(10));
iwc.setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH);
- iwc.setMaxBufferedDeleteTerms(IndexWriterConfig.DISABLE_AUTO_FLUSH);
IndexWriter writer = new IndexWriter(dir, iwc);
flushPolicy = (MockDefaultFlushPolicy) writer.getConfig().getFlushPolicy();
assertTrue(flushPolicy.flushOnDocCount());
- assertFalse(flushPolicy.flushOnDeleteTerms());
assertFalse(flushPolicy.flushOnRAM());
DocumentsWriter docsWriter = writer.getDocsWriter();
assertNotNull(docsWriter);
@@ -201,8 +197,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
assertEquals(" all flushes must be due", 0, flushControl.flushBytes());
assertEquals(numDocumentsToIndex, writer.numDocs());
assertEquals(numDocumentsToIndex, writer.maxDoc());
- if (flushPolicy.flushOnRAM() && !flushPolicy.flushOnDocCount()
- && !flushPolicy.flushOnDeleteTerms()) {
+ if (flushPolicy.flushOnRAM() && !flushPolicy.flushOnDocCount()) {
final long maxRAMBytes = (long) (iwc.getRAMBufferSizeMB() * 1024. * 1024.);
assertTrue("peak bytes without flush exceeded watermark",
flushPolicy.peakBytesWithoutFlush <= maxRAMBytes);
@@ -239,7 +234,6 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
dir.setThrottling(MockDirectoryWrapper.Throttling.SOMETIMES);
IndexWriterConfig iwc = newIndexWriterConfig(analyzer);
iwc.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH);
- iwc.setMaxBufferedDeleteTerms(IndexWriterConfig.DISABLE_AUTO_FLUSH);
FlushPolicy flushPolicy = new FlushByRamOrCountsPolicy();
iwc.setFlushPolicy(flushPolicy);
@@ -346,10 +340,6 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
final ThreadState toFlush;
if (state.flushPending) {
toFlush = state;
- } else if (flushOnDeleteTerms()
- && state.dwpt.pendingUpdates.numTermDeletes.get() >= indexWriterConfig
- .getMaxBufferedDeleteTerms()) {
- toFlush = state;
} else {
toFlush = null;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/test/org/apache/lucene/index/TestForceMergeForever.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestForceMergeForever.java b/lucene/core/src/test/org/apache/lucene/index/TestForceMergeForever.java
index 0379395..8626fe5 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestForceMergeForever.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestForceMergeForever.java
@@ -56,8 +56,11 @@ public class TestForceMergeForever extends LuceneTestCase {
final Directory d = newDirectory();
MockAnalyzer analyzer = new MockAnalyzer(random());
analyzer.setMaxTokenLength(TestUtil.nextInt(random(), 1, IndexWriter.MAX_TERM_LENGTH));
+ IndexWriterConfig iwc = newIndexWriterConfig(analyzer);
+ // SMS can cause this test to run indefinitely long:
+ iwc.setMergeScheduler(new ConcurrentMergeScheduler());
- final MyIndexWriter w = new MyIndexWriter(d, newIndexWriterConfig(analyzer));
+ final MyIndexWriter w = new MyIndexWriter(d, iwc);
// Try to make an index that requires merging:
w.getConfig().setMaxBufferedDocs(TestUtil.nextInt(random(), 2, 11));
@@ -85,7 +88,7 @@ public class TestForceMergeForever extends LuceneTestCase {
@Override
public void run() {
try {
- while (!doStop.get()) {
+ while (doStop.get() == false) {
w.updateDocument(new Term("docid", "" + random().nextInt(numStartDocs)),
docs.nextDoc());
// Force deletes to apply
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
index 6897f06..9538e03 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
@@ -357,67 +357,6 @@ public class TestIndexWriter extends LuceneTestCase {
dir.close();
}
- public void testChangingRAMBuffer2() throws IOException {
- Directory dir = newDirectory();
- IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())));
- writer.getConfig().setMaxBufferedDocs(10);
- writer.getConfig().setMaxBufferedDeleteTerms(10);
- writer.getConfig().setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH);
-
- for(int j=1;j<52;j++) {
- Document doc = new Document();
- doc.add(new Field("field", "aaa" + j, storedTextType));
- writer.addDocument(doc);
- }
-
- int lastFlushCount = -1;
- for(int j=1;j<52;j++) {
- writer.deleteDocuments(new Term("field", "aaa" + j));
- TestUtil.syncConcurrentMerges(writer);
- int flushCount = writer.getFlushCount();
-
- if (j == 1)
- lastFlushCount = flushCount;
- else if (j < 10) {
- // No new files should be created
- assertEquals(flushCount, lastFlushCount);
- } else if (10 == j) {
- assertTrue("" + j, flushCount > lastFlushCount);
- lastFlushCount = flushCount;
- writer.getConfig().setRAMBufferSizeMB(0.000001);
- writer.getConfig().setMaxBufferedDeleteTerms(1);
- } else if (j < 20) {
- assertTrue(flushCount > lastFlushCount);
- lastFlushCount = flushCount;
- } else if (20 == j) {
- writer.getConfig().setRAMBufferSizeMB(16);
- writer.getConfig().setMaxBufferedDeleteTerms(IndexWriterConfig.DISABLE_AUTO_FLUSH);
- lastFlushCount = flushCount;
- } else if (j < 30) {
- assertEquals(flushCount, lastFlushCount);
- } else if (30 == j) {
- writer.getConfig().setRAMBufferSizeMB(0.000001);
- writer.getConfig().setMaxBufferedDeleteTerms(IndexWriterConfig.DISABLE_AUTO_FLUSH);
- writer.getConfig().setMaxBufferedDeleteTerms(1);
- } else if (j < 40) {
- assertTrue(flushCount> lastFlushCount);
- lastFlushCount = flushCount;
- } else if (40 == j) {
- writer.getConfig().setMaxBufferedDeleteTerms(10);
- writer.getConfig().setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH);
- lastFlushCount = flushCount;
- } else if (j < 50) {
- assertEquals(flushCount, lastFlushCount);
- writer.getConfig().setMaxBufferedDeleteTerms(10);
- writer.getConfig().setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH);
- } else if (50 == j) {
- assertTrue(flushCount > lastFlushCount);
- }
- }
- writer.close();
- dir.close();
- }
-
public void testEnablingNorms() throws IOException {
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random()))
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
index 2014c16..464966a 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
@@ -62,7 +62,6 @@ public class TestIndexWriterConfig extends LuceneTestCase {
assertEquals(OpenMode.CREATE_OR_APPEND, conf.getOpenMode());
// we don't need to assert this, it should be unspecified
assertTrue(IndexSearcher.getDefaultSimilarity() == conf.getSimilarity());
- assertEquals(IndexWriterConfig.DEFAULT_MAX_BUFFERED_DELETE_TERMS, conf.getMaxBufferedDeleteTerms());
assertEquals(IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB, conf.getRAMBufferSizeMB(), 0.0);
assertEquals(IndexWriterConfig.DEFAULT_MAX_BUFFERED_DOCS, conf.getMaxBufferedDocs());
assertEquals(IndexWriterConfig.DEFAULT_READER_POOLING, conf.getReaderPooling());
@@ -176,7 +175,7 @@ public class TestIndexWriterConfig extends LuceneTestCase {
assertEquals(IndexWriterConfig.DISABLE_AUTO_FLUSH, IndexWriterConfig.DEFAULT_MAX_BUFFERED_DELETE_TERMS);
assertEquals(IndexWriterConfig.DISABLE_AUTO_FLUSH, IndexWriterConfig.DEFAULT_MAX_BUFFERED_DOCS);
assertEquals(16.0, IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB, 0.0);
- assertEquals(false, IndexWriterConfig.DEFAULT_READER_POOLING);
+ assertEquals(true, IndexWriterConfig.DEFAULT_READER_POOLING);
assertEquals(true, IndexWriterConfig.DEFAULT_USE_COMPOUND_FILE_SYSTEM);
}
@@ -233,10 +232,6 @@ public class TestIndexWriterConfig extends LuceneTestCase {
assertTrue(DocumentsWriterPerThread.defaultIndexingChain == conf.getIndexingChain());
expectThrows(IllegalArgumentException.class, () -> {
- conf.setMaxBufferedDeleteTerms(0);
- });
-
- expectThrows(IllegalArgumentException.class, () -> {
conf.setMaxBufferedDocs(1);
});
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/58105a20/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
index 975f5f0..8bc3f42 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
@@ -43,9 +43,10 @@ import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.IOUtils;
-import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
+import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
+import org.junit.Ignore;
@SuppressCodecs("SimpleText") // too slow here
public class TestIndexWriterDelete extends LuceneTestCase {
@@ -59,8 +60,7 @@ public class TestIndexWriterDelete extends LuceneTestCase {
String[] text = { "Amsterdam", "Venice" };
Directory dir = newDirectory();
- IndexWriter modifier = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.WHITESPACE, false))
- .setMaxBufferedDeleteTerms(1));
+ IndexWriter modifier = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.WHITESPACE, false)));
FieldType custom1 = new FieldType();
custom1.setStored(true);
@@ -99,8 +99,7 @@ public class TestIndexWriterDelete extends LuceneTestCase {
Directory dir = newDirectory();
IndexWriter modifier = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.WHITESPACE, false))
- .setMaxBufferedDocs(2)
- .setMaxBufferedDeleteTerms(2));
+ .setMaxBufferedDocs(2));
int id = 0;
int value = 100;
@@ -129,20 +128,6 @@ public class TestIndexWriterDelete extends LuceneTestCase {
dir.close();
}
- public void testMaxBufferedDeletes() throws IOException {
- Directory dir = newDirectory();
- IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.WHITESPACE, false))
- .setMaxBufferedDeleteTerms(1));
-
- writer.addDocument(new Document());
- writer.deleteDocuments(new Term("foobar", "1"));
- writer.deleteDocuments(new Term("foobar", "1"));
- writer.deleteDocuments(new Term("foobar", "1"));
- assertEquals(3, writer.getFlushDeletesCount());
- writer.close();
- dir.close();
- }
-
// test when delete terms only apply to ram segments
public void testRAMDeletes() throws IOException {
for(int t=0;t<2;t++) {
@@ -151,8 +136,7 @@ public class TestIndexWriterDelete extends LuceneTestCase {
}
Directory dir = newDirectory();
IndexWriter modifier = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.WHITESPACE, false))
- .setMaxBufferedDocs(4)
- .setMaxBufferedDeleteTerms(4));
+ .setMaxBufferedDocs(4));
int id = 0;
int value = 100;
@@ -189,8 +173,7 @@ public class TestIndexWriterDelete extends LuceneTestCase {
public void testBothDeletes() throws IOException {
Directory dir = newDirectory();
IndexWriter modifier = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.WHITESPACE, false))
- .setMaxBufferedDocs(100)
- .setMaxBufferedDeleteTerms(100));
+ .setMaxBufferedDocs(100));
int id = 0;
int value = 100;
@@ -223,8 +206,7 @@ public class TestIndexWriterDelete extends LuceneTestCase {
public void testBatchDeletes() throws IOException {
Directory dir = newDirectory();
IndexWriter modifier = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.WHITESPACE, false))
- .setMaxBufferedDocs(2)
- .setMaxBufferedDeleteTerms(2));
+ .setMaxBufferedDocs(2));
int id = 0;
int value = 100;
@@ -263,11 +245,13 @@ public class TestIndexWriterDelete extends LuceneTestCase {
}
// test deleteAll()
- public void testDeleteAll() throws IOException {
+ public void testDeleteAllSimple() throws IOException {
+ if (VERBOSE) {
+ System.out.println("TEST: now start");
+ }
Directory dir = newDirectory();
IndexWriter modifier = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.WHITESPACE, false))
- .setMaxBufferedDocs(2)
- .setMaxBufferedDeleteTerms(2));
+ .setMaxBufferedDocs(2));
int id = 0;
int value = 100;
@@ -275,6 +259,9 @@ public class TestIndexWriterDelete extends LuceneTestCase {
for (int i = 0; i < 7; i++) {
addDoc(modifier, ++id, value);
}
+ if (VERBOSE) {
+ System.out.println("TEST: now commit");
+ }
modifier.commit();
IndexReader reader = DirectoryReader.open(dir);
@@ -285,6 +272,9 @@ public class TestIndexWriterDelete extends LuceneTestCase {
addDoc(modifier, 99, value);
// Delete all
+ if (VERBOSE) {
+ System.out.println("TEST: now delete all");
+ }
modifier.deleteAll();
// Delete all shouldn't be on disk yet
@@ -295,6 +285,9 @@ public class TestIndexWriterDelete extends LuceneTestCase {
// Add a doc and update a doc (after the deleteAll, before the commit)
addDoc(modifier, 101, value);
updateDoc(modifier, 102, value);
+ if (VERBOSE) {
+ System.out.println("TEST: now 2nd commit");
+ }
// commit the delete all
modifier.commit();
@@ -308,7 +301,6 @@ public class TestIndexWriterDelete extends LuceneTestCase {
dir.close();
}
-
public void testDeleteAllNoDeadLock() throws IOException, InterruptedException {
Directory dir = newDirectory();
final RandomIndexWriter modifier = new RandomIndexWriter(random(), dir);
@@ -350,22 +342,36 @@ public class TestIndexWriterDelete extends LuceneTestCase {
}
latch.countDown();
while(!doneLatch.await(1, TimeUnit.MILLISECONDS)) {
+ if (VERBOSE) {
+ System.out.println("\nTEST: now deleteAll");
+ }
modifier.deleteAll();
if (VERBOSE) {
System.out.println("del all");
}
}
+
+ if (VERBOSE) {
+ System.out.println("\nTEST: now final deleteAll");
+ }
modifier.deleteAll();
for (Thread thread : threads) {
thread.join();
}
+ if (VERBOSE) {
+ System.out.println("\nTEST: now close");
+ }
modifier.close();
+
DirectoryReader reader = DirectoryReader.open(dir);
- assertEquals(reader.maxDoc(), 0);
- assertEquals(reader.numDocs(), 0);
- assertEquals(reader.numDeletedDocs(), 0);
+ if (VERBOSE) {
+ System.out.println("\nTEST: got reader=" + reader);
+ }
+ assertEquals(0, reader.maxDoc());
+ assertEquals(0, reader.numDocs());
+ assertEquals(0, reader.numDeletedDocs(), 0);
reader.close();
dir.close();
@@ -375,8 +381,7 @@ public class TestIndexWriterDelete extends LuceneTestCase {
public void testDeleteAllRollback() throws IOException {
Directory dir = newDirectory();
IndexWriter modifier = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.WHITESPACE, false))
- .setMaxBufferedDocs(2)
- .setMaxBufferedDeleteTerms(2));
+ .setMaxBufferedDocs(2));
int id = 0;
int value = 100;
@@ -411,8 +416,7 @@ public class TestIndexWriterDelete extends LuceneTestCase {
public void testDeleteAllNRT() throws IOException {
Directory dir = newDirectory();
IndexWriter modifier = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.WHITESPACE, false))
- .setMaxBufferedDocs(2)
- .setMaxBufferedDeleteTerms(2));
+ .setMaxBufferedDocs(2));
int id = 0;
int value = 100;
@@ -526,7 +530,6 @@ public class TestIndexWriterDelete extends LuceneTestCase {
IndexWriter modifier = new IndexWriter(dir,
newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.WHITESPACE, false))
.setMaxBufferedDocs(1000)
- .setMaxBufferedDeleteTerms(1000)
.setMergeScheduler(new ConcurrentMergeScheduler()));
((ConcurrentMergeScheduler) modifier.getConfig().getMergeScheduler()).setSuppressExceptions();
@@ -701,6 +704,7 @@ public class TestIndexWriterDelete extends LuceneTestCase {
startDir.close();
}
+ @Ignore
// This test tests that buffered deletes are cleared when
// an Exception is hit during flush.
public void testErrorAfterApplyDeletes() throws IOException {
@@ -722,6 +726,10 @@ public class TestIndexWriterDelete extends LuceneTestCase {
// don't fail during merging
return;
}
+ if (VERBOSE) {
+ System.out.println("FAIL EVAL:");
+ }
+ new Throwable().printStackTrace(System.out);
if (sawMaybe && !failed) {
boolean seen = false;
StackTraceElement[] trace = new Exception().getStackTrace();
@@ -768,7 +776,6 @@ public class TestIndexWriterDelete extends LuceneTestCase {
MockDirectoryWrapper dir = newMockDirectory();
IndexWriter modifier = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random(), MockTokenizer.WHITESPACE, false))
- .setMaxBufferedDeleteTerms(2)
.setReaderPooling(false)
.setMergePolicy(newLogMergePolicy()));
@@ -966,8 +973,14 @@ public class TestIndexWriterDelete extends LuceneTestCase {
final int inc = Math.min(left, TestUtil.nextInt(random(), 1, 20));
final int limit = upto + inc;
while(upto < limit) {
+ if (VERBOSE) {
+ System.out.println("TEST: delete id=" + ids.get(upto));
+ }
w.deleteDocuments(new Term("id", ""+ids.get(upto++)));
}
+ if (VERBOSE) {
+ System.out.println("\nTEST: now open reader");
+ }
final IndexReader r = w.getReader();
assertEquals(NUM_DOCS - upto, r.numDocs());
r.close();
@@ -994,9 +1007,8 @@ public class TestIndexWriterDelete extends LuceneTestCase {
}
};
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig(analyzer)
- .setRAMBufferSizeMB(1.0)
- .setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH)
- .setMaxBufferedDeleteTerms(IndexWriterConfig.DISABLE_AUTO_FLUSH));
+ .setRAMBufferSizeMB(4.0)
+ .setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH));
Document doc = new Document();
doc.add(newTextField("field", "go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20", Field.Store.NO));
int num = atLeast(3);
@@ -1077,49 +1089,6 @@ public class TestIndexWriterDelete extends LuceneTestCase {
dir.close();
}
- // LUCENE-3340: make sure deletes that we don't apply
- // during flush (ie are just pushed into the stream) are
- // in fact later flushed due to their RAM usage:
- public void testFlushPushedDeletesByCount() throws Exception {
- Directory dir = newDirectory();
- // Cannot use RandomIndexWriter because we don't want to
- // ever call commit() for this test:
- final int flushAtDelCount = atLeast(1020);
- IndexWriter w = new IndexWriter(dir,
- newIndexWriterConfig(new MockAnalyzer(random()))
- .setMaxBufferedDeleteTerms(flushAtDelCount)
- .setMaxBufferedDocs(1000)
- .setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH)
- .setMergePolicy(NoMergePolicy.INSTANCE)
- .setReaderPooling(false));
- int count = 0;
- while(true) {
- Document doc = new Document();
- doc.add(new StringField("id", count+"", Field.Store.NO));
- final Term delTerm;
- if (count == 1010) {
- // This is the only delete that applies
- delTerm = new Term("id", ""+0);
- } else {
- // These get buffered, taking up RAM, but delete
- // nothing when applied:
- delTerm = new Term("id", "x" + count);
- }
- w.updateDocument(delTerm, doc);
- // Eventually segment 0 should get a del docs:
- // TODO: fix this test
- if (slowFileExists(dir, "_0_1.del") || slowFileExists(dir, "_0_1.liv")) {
- break;
- }
- count++;
- if (count > flushAtDelCount) {
- fail("delete's were not applied at count=" + flushAtDelCount);
- }
- }
- w.close();
- dir.close();
- }
-
// Make sure buffered (pushed) deletes don't use up so
// much RAM that it forces long tail of tiny segments:
@Nightly
@@ -1298,6 +1267,7 @@ public class TestIndexWriterDelete extends LuceneTestCase {
}
// First one triggers, but does not reflect, the merge:
+ System.out.println("TEST: now get reader");
DirectoryReader.open(w).close();
IndexReader r = DirectoryReader.open(w);
assertEquals(1, r.leaves().size());
@@ -1315,25 +1285,33 @@ public class TestIndexWriterDelete extends LuceneTestCase {
mp.setMinMergeDocs(1);
iwc.setMergePolicy(mp);
iwc.setMergeScheduler(new SerialMergeScheduler());
- iwc.setMaxBufferedDeleteTerms(18);
IndexWriter w = new IndexWriter(dir, iwc);
for(int i=0;i<38;i++) {
+ if (VERBOSE) {
+ System.out.println("TEST: add doc " + i);
+ }
Document doc = new Document();
doc.add(newStringField("id", ""+i, Field.Store.NO));
w.addDocument(doc);
}
+ if (VERBOSE) {
+ System.out.println("TEST: commit1");
+ }
w.commit();
+ // Deleting 18 out of the 20 docs in the first segment make it the same "level" as the other 9 which should cause a merge to kick off:
for(int i=0;i<18;i++) {
w.deleteDocuments(new Term("id", ""+i));
}
- w.commit();
+ if (VERBOSE) {
+ System.out.println("TEST: commit2");
+ }
+ w.close();
DirectoryReader r = DirectoryReader.open(dir);
assertEquals(1, r.leaves().size());
r.close();
- w.close();
dir.close();
}
@@ -1345,7 +1323,6 @@ public class TestIndexWriterDelete extends LuceneTestCase {
mp.setMinMergeDocs(1);
iwc.setMergePolicy(mp);
iwc.setMergeScheduler(new SerialMergeScheduler());
- iwc.setMaxBufferedDeleteTerms(18);
IndexWriter w = new IndexWriter(dir, iwc);
for(int i=0;i<38;i++) {
Document doc = new Document();