You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by rm...@apache.org on 2013/10/21 20:58:44 UTC
svn commit: r1534320 [13/39] - in /lucene/dev/branches/lucene4956: ./
dev-tools/ dev-tools/idea/.idea/ dev-tools/idea/lucene/expressions/
dev-tools/idea/solr/contrib/velocity/ dev-tools/maven/
dev-tools/maven/lucene/ dev-tools/maven/lucene/expressions/...
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java Mon Oct 21 18:58:24 2013
@@ -30,15 +30,19 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
+import org.apache.lucene.index.FieldInfo.DocValuesType;
import org.apache.lucene.index.FieldInfos.FieldNumbers;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.index.MergePolicy.MergeTrigger;
import org.apache.lucene.index.MergeState.CheckAbort;
+import org.apache.lucene.index.NumericFieldUpdates.UpdatesIterator;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.CompoundFileDirectory;
@@ -182,7 +186,7 @@ import org.apache.lucene.util.ThreadInte
* referenced by the "front" of the index). For this, IndexFileDeleter
* keeps track of the last non commit checkpoint.
*/
-public class IndexWriter implements Closeable, TwoPhaseCommit {
+public class IndexWriter implements Closeable, TwoPhaseCommit{
private static final int UNBOUNDED_MAX_MERGE_SEGMENTS = -1;
@@ -226,7 +230,8 @@ public class IndexWriter implements Clos
final SegmentInfos segmentInfos; // the segments
final FieldNumbers globalFieldNumberMap;
- private DocumentsWriter docWriter;
+ private final DocumentsWriter docWriter;
+ private final Queue<Event> eventQueue;
final IndexFileDeleter deleter;
// used by forceMerge to note those needing merging
@@ -360,7 +365,7 @@ public class IndexWriter implements Clos
synchronized (fullFlushLock) {
boolean success = false;
try {
- anySegmentFlushed = docWriter.flushAllThreads();
+ anySegmentFlushed = docWriter.flushAllThreads(this);
if (!anySegmentFlushed) {
// prevent double increment since docWriter#doFlush increments the flushcount
// if we flushed anything.
@@ -430,6 +435,7 @@ public class IndexWriter implements Clos
final ReadersAndLiveDocs rld = readerMap.get(info);
if (rld != null) {
assert info == rld.info;
+// System.out.println("[" + Thread.currentThread().getName() + "] ReaderPool.drop: " + info);
readerMap.remove(info);
rld.dropReaders();
}
@@ -446,6 +452,10 @@ public class IndexWriter implements Clos
}
public synchronized void release(ReadersAndLiveDocs rld) throws IOException {
+ release(rld, true);
+ }
+
+ public synchronized void release(ReadersAndLiveDocs rld, boolean assertInfoLive) throws IOException {
// Matches incRef in get:
rld.decRef();
@@ -456,14 +466,22 @@ public class IndexWriter implements Clos
if (!poolReaders && rld.refCount() == 1) {
// This is the last ref to this RLD, and we're not
// pooling, so remove it:
+// System.out.println("[" + Thread.currentThread().getName() + "] ReaderPool.release: " + rld.info);
if (rld.writeLiveDocs(directory)) {
// Make sure we only write del docs for a live segment:
- assert infoIsLive(rld.info);
- // Must checkpoint w/ deleter, because we just
- // created created new _X_N.del file.
- deleter.checkpoint(segmentInfos, false);
+ assert assertInfoLive == false || infoIsLive(rld.info);
+ // Must checkpoint because we just
+ // created new _X_N.del and field updates files;
+ // don't call IW.checkpoint because that also
+ // increments SIS.version, which we do not want to
+ // do here: it was done previously (after we
+ // invoked BDS.applyDeletes), whereas here all we
+ // did was move the state to disk:
+ checkpointNoSIS();
}
+ //System.out.println("IW: done writeLiveDocs for info=" + rld.info);
+// System.out.println("[" + Thread.currentThread().getName() + "] ReaderPool.release: drop readers " + rld.info);
rld.dropReaders();
readerMap.remove(rld.info);
}
@@ -479,14 +497,21 @@ public class IndexWriter implements Clos
try {
if (doSave && rld.writeLiveDocs(directory)) {
- // Make sure we only write del docs for a live segment:
+ // Make sure we only write del docs and field updates for a live segment:
assert infoIsLive(rld.info);
- // Must checkpoint w/ deleter, because we just
- // created created new _X_N.del file.
- deleter.checkpoint(segmentInfos, false);
+ // Must checkpoint because we just
+ // created new _X_N.del and field updates files;
+ // don't call IW.checkpoint because that also
+ // increments SIS.version, which we do not want to
+ // do here: it was done previously (after we
+ // invoked BDS.applyDeletes), whereas here all we
+ // did was move the state to disk:
+ checkpointNoSIS();
}
} catch (Throwable t) {
- if (priorE != null) {
+ if (doSave) {
+ IOUtils.reThrow(t);
+ } else if (priorE == null) {
priorE = t;
}
}
@@ -504,15 +529,15 @@ public class IndexWriter implements Clos
try {
rld.dropReaders();
} catch (Throwable t) {
- if (priorE != null) {
+ if (doSave) {
+ IOUtils.reThrow(t);
+ } else if (priorE == null) {
priorE = t;
}
}
}
assert readerMap.size() == 0;
- if (priorE != null) {
- throw new RuntimeException(priorE);
- }
+ IOUtils.reThrow(priorE);
}
/**
@@ -529,9 +554,15 @@ public class IndexWriter implements Clos
if (rld.writeLiveDocs(directory)) {
// Make sure we only write del docs for a live segment:
assert infoIsLive(info);
- // Must checkpoint w/ deleter, because we just
- // created created new _X_N.del file.
- deleter.checkpoint(segmentInfos, false);
+
+ // Must checkpoint because we just
+ // created new _X_N.del and field updates files;
+ // don't call IW.checkpoint because that also
+ // increments SIS.version, which we do not want to
+ // do here: it was done previously (after we
+ // invoked BDS.applyDeletes), whereas here all we
+ // did was move the state to disk:
+ checkpointNoSIS();
}
}
}
@@ -730,7 +761,9 @@ public class IndexWriter implements Clos
// start with previous field numbers, but new FieldInfos
globalFieldNumberMap = getFieldNumberMap();
- docWriter = new DocumentsWriter(codec, config, directory, this, globalFieldNumberMap, bufferedDeletesStream);
+ config.getFlushPolicy().init(config);
+ docWriter = new DocumentsWriter(this, config, directory);
+ eventQueue = docWriter.eventQueue();
// Default deleter (for backwards compatibility) is
// KeepOnlyLastCommitDeleter:
@@ -771,27 +804,6 @@ public class IndexWriter implements Clos
}
}
- private FieldInfos getFieldInfos(SegmentInfo info) throws IOException {
- Directory cfsDir = null;
- try {
- if (info.getUseCompoundFile()) {
- cfsDir = new CompoundFileDirectory(info.dir,
- IndexFileNames.segmentFileName(info.name, "", IndexFileNames.COMPOUND_FILE_EXTENSION),
- IOContext.READONCE,
- false);
- } else {
- cfsDir = info.dir;
- }
- return info.getCodec().fieldInfosFormat().getFieldInfosReader().read(cfsDir,
- info.name,
- IOContext.READONCE);
- } finally {
- if (info.getUseCompoundFile() && cfsDir != null) {
- cfsDir.close();
- }
- }
- }
-
/**
* Loads or returns the already loaded the global field number map for this {@link SegmentInfos}.
* If this {@link SegmentInfos} has no global field number map the returned instance is empty
@@ -800,7 +812,7 @@ public class IndexWriter implements Clos
final FieldNumbers map = new FieldNumbers();
for(SegmentInfoPerCommit info : segmentInfos) {
- for(FieldInfo fi : getFieldInfos(info.info)) {
+ for(FieldInfo fi : SegmentReader.readFieldInfos(info)) {
map.addOrGet(fi.name, fi.number, fi.getDocValuesType());
}
}
@@ -961,7 +973,7 @@ public class IndexWriter implements Clos
if (doFlush) {
flush(waitForMerges, true);
} else {
- docWriter.abort(); // already closed -- never sync on IW
+ docWriter.abort(this); // already closed -- never sync on IW
}
} finally {
@@ -1015,17 +1027,18 @@ public class IndexWriter implements Clos
commitInternal();
}
- if (infoStream.isEnabled("IW")) {
- infoStream.message("IW", "at close: " + segString());
- }
- // used by assert below
- final DocumentsWriter oldWriter = docWriter;
synchronized(this) {
+ // commitInternal calls ReaderPool.commit, which
+ // writes any pending liveDocs from ReaderPool, so
+ // it's safe to drop all readers now:
readerPool.dropAll(true);
- docWriter = null;
deleter.close();
}
+ if (infoStream.isEnabled("IW")) {
+ infoStream.message("IW", "at close: " + segString());
+ }
+
if (writeLock != null) {
writeLock.release(); // release write lock
writeLock = null;
@@ -1033,7 +1046,7 @@ public class IndexWriter implements Clos
synchronized(this) {
closed = true;
}
- assert oldWriter.perThreadPool.numDeactivatedThreadStates() == oldWriter.perThreadPool.getMaxThreadStates();
+ assert docWriter.perThreadPool.numDeactivatedThreadStates() == docWriter.perThreadPool.getMaxThreadStates() : "" + docWriter.perThreadPool.numDeactivatedThreadStates() + " " + docWriter.perThreadPool.getMaxThreadStates();
} catch (OutOfMemoryError oom) {
handleOOM(oom, "closeInternal");
} finally {
@@ -1068,14 +1081,7 @@ public class IndexWriter implements Clos
* @see #numDocs */
public synchronized int maxDoc() {
ensureOpen();
- int count;
- if (docWriter != null)
- count = docWriter.getNumDocs();
- else
- count = 0;
-
- count += segmentInfos.totalDocCount();
- return count;
+ return docWriter.getNumDocs() + segmentInfos.totalDocCount();
}
/** Returns total number of docs in this index, including
@@ -1086,12 +1092,7 @@ public class IndexWriter implements Clos
* @see #numDocs */
public synchronized int numDocs() {
ensureOpen();
- int count;
- if (docWriter != null)
- count = docWriter.getNumDocs();
- else
- count = 0;
-
+ int count = docWriter.getNumDocs();
for (final SegmentInfoPerCommit info : segmentInfos) {
count += info.info.getDocCount() - numDeletedDocs(info);
}
@@ -1280,9 +1281,10 @@ public class IndexWriter implements Clos
ensureOpen();
try {
boolean success = false;
- boolean anySegmentFlushed = false;
try {
- anySegmentFlushed = docWriter.updateDocuments(docs, analyzer, delTerm);
+ if (docWriter.updateDocuments(docs, analyzer, delTerm)) {
+ processEvents(true, false);
+ }
success = true;
} finally {
if (!success) {
@@ -1291,9 +1293,6 @@ public class IndexWriter implements Clos
}
}
}
- if (anySegmentFlushed) {
- maybeMerge(MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
- }
} catch (OutOfMemoryError oom) {
handleOOM(oom, "updateDocuments");
}
@@ -1313,7 +1312,9 @@ public class IndexWriter implements Clos
public void deleteDocuments(Term term) throws IOException {
ensureOpen();
try {
- docWriter.deleteTerms(term);
+ if (docWriter.deleteTerms(term)) {
+ processEvents(true, false);
+ }
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteDocuments(Term)");
}
@@ -1412,7 +1413,9 @@ public class IndexWriter implements Clos
public void deleteDocuments(Term... terms) throws IOException {
ensureOpen();
try {
- docWriter.deleteTerms(terms);
+ if (docWriter.deleteTerms(terms)) {
+ processEvents(true, false);
+ }
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteDocuments(Term..)");
}
@@ -1432,7 +1435,9 @@ public class IndexWriter implements Clos
public void deleteDocuments(Query query) throws IOException {
ensureOpen();
try {
- docWriter.deleteQueries(query);
+ if (docWriter.deleteQueries(query)) {
+ processEvents(true, false);
+ }
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteDocuments(Query)");
}
@@ -1454,7 +1459,9 @@ public class IndexWriter implements Clos
public void deleteDocuments(Query... queries) throws IOException {
ensureOpen();
try {
- docWriter.deleteQueries(queries);
+ if (docWriter.deleteQueries(queries)) {
+ processEvents(true, false);
+ }
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteDocuments(Query..)");
}
@@ -1505,9 +1512,10 @@ public class IndexWriter implements Clos
ensureOpen();
try {
boolean success = false;
- boolean anySegmentFlushed = false;
try {
- anySegmentFlushed = docWriter.updateDocument(doc, analyzer, term);
+ if (docWriter.updateDocument(doc, analyzer, term)) {
+ processEvents(true, false);
+ }
success = true;
} finally {
if (!success) {
@@ -1516,12 +1524,44 @@ public class IndexWriter implements Clos
}
}
}
+ } catch (OutOfMemoryError oom) {
+ handleOOM(oom, "updateDocument");
+ }
+ }
- if (anySegmentFlushed) {
- maybeMerge(MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
+ /**
+ * Updates a document's NumericDocValue for <code>field</code> to the given
+ * <code>value</code>. This method can be used to 'unset' a document's value
+ * by passing {@code null} as the new value. Also, you can only update fields
+ * that already exist in the index, not add new fields through this method.
+ *
+ * <p>
+ * <b>NOTE</b>: if this method hits an OutOfMemoryError you should immediately
+ * close the writer. See <a href="#OOME">above</a> for details.
+ * </p>
+ *
+ * @param term
+ * the term to identify the document(s) to be updated
+ * @param field
+ * field name of the NumericDocValues field
+ * @param value
+ * new value for the field
+ * @throws CorruptIndexException
+ * if the index is corrupt
+ * @throws IOException
+ * if there is a low-level IO error
+ */
+ public void updateNumericDocValue(Term term, String field, Long value) throws IOException {
+ ensureOpen();
+ if (!globalFieldNumberMap.contains(field, DocValuesType.NUMERIC)) {
+ throw new IllegalArgumentException("can only update existing numeric-docvalues fields!");
+ }
+ try {
+ if (docWriter.updateNumericDocValue(term, field, value)) {
+ processEvents(true, false);
}
} catch (OutOfMemoryError oom) {
- handleOOM(oom, "updateDocument");
+ handleOOM(oom, "updateNumericDocValue");
}
}
@@ -1730,7 +1770,6 @@ public class IndexWriter implements Clos
// complete
ensureOpen();
}
-
// NOTE: in the ConcurrentMergeScheduler case, when
// doWait is false, we can return immediately while
// background threads accomplish the merging
@@ -1909,7 +1948,6 @@ public class IndexWriter implements Clos
merge.maxNumSegments = maxNumSegments;
}
}
-
} else {
spec = mergePolicy.findMerges(trigger, segmentInfos);
}
@@ -2009,8 +2047,9 @@ public class IndexWriter implements Clos
mergeScheduler.close();
bufferedDeletesStream.clear();
+ processEvents(false, true);
docWriter.close(); // mark it as closed first to prevent subsequent indexing actions/flushes
- docWriter.abort(); // don't sync on IW here
+ docWriter.abort(this); // don't sync on IW here
synchronized(this) {
if (pendingCommit != null) {
@@ -2102,7 +2141,8 @@ public class IndexWriter implements Clos
* sure it's just like a fresh index.
*/
try {
- docWriter.lockAndAbortAll();
+ docWriter.lockAndAbortAll(this);
+ processEvents(false, true);
synchronized (this) {
try {
// Abort any running merges
@@ -2135,7 +2175,7 @@ public class IndexWriter implements Clos
}
}
} finally {
- docWriter.unlockAllAfterAbortAll();
+ docWriter.unlockAllAfterAbortAll(this);
}
}
}
@@ -2226,6 +2266,15 @@ public class IndexWriter implements Clos
deleter.checkpoint(segmentInfos, false);
}
+ /** Checkpoints with IndexFileDeleter, so it's aware of
+ * new files, and increments changeCount, so on
+ * close/commit we will write a new segments file, but
+ * does NOT bump segmentInfos.version. */
+ synchronized void checkpointNoSIS() throws IOException {
+ changeCount++;
+ deleter.checkpoint(segmentInfos, false);
+ }
+
/** Called internally if any index state has changed. */
synchronized void changed() {
changeCount++;
@@ -2243,33 +2292,40 @@ public class IndexWriter implements Clos
* Atomically adds the segment private delete packet and publishes the flushed
* segments SegmentInfo to the index writer.
*/
- synchronized void publishFlushedSegment(SegmentInfoPerCommit newSegment,
+ void publishFlushedSegment(SegmentInfoPerCommit newSegment,
FrozenBufferedDeletes packet, FrozenBufferedDeletes globalPacket) throws IOException {
- // Lock order IW -> BDS
- synchronized (bufferedDeletesStream) {
- if (infoStream.isEnabled("IW")) {
- infoStream.message("IW", "publishFlushedSegment");
- }
-
- if (globalPacket != null && globalPacket.any()) {
- bufferedDeletesStream.push(globalPacket);
- }
- // Publishing the segment must be synched on IW -> BDS to make the sure
- // that no merge prunes away the seg. private delete packet
- final long nextGen;
- if (packet != null && packet.any()) {
- nextGen = bufferedDeletesStream.push(packet);
- } else {
- // Since we don't have a delete packet to apply we can get a new
- // generation right away
- nextGen = bufferedDeletesStream.getNextGen();
- }
- if (infoStream.isEnabled("IW")) {
- infoStream.message("IW", "publish sets newSegment delGen=" + nextGen + " seg=" + segString(newSegment));
+ try {
+ synchronized (this) {
+ // Lock order IW -> BDS
+ synchronized (bufferedDeletesStream) {
+ if (infoStream.isEnabled("IW")) {
+ infoStream.message("IW", "publishFlushedSegment");
+ }
+
+ if (globalPacket != null && globalPacket.any()) {
+ bufferedDeletesStream.push(globalPacket);
+ }
+ // Publishing the segment must be synched on IW -> BDS to make the sure
+ // that no merge prunes away the seg. private delete packet
+ final long nextGen;
+ if (packet != null && packet.any()) {
+ nextGen = bufferedDeletesStream.push(packet);
+ } else {
+ // Since we don't have a delete packet to apply we can get a new
+ // generation right away
+ nextGen = bufferedDeletesStream.getNextGen();
+ }
+ if (infoStream.isEnabled("IW")) {
+ infoStream.message("IW", "publish sets newSegment delGen=" + nextGen + " seg=" + segString(newSegment));
+ }
+ newSegment.setBufferedDeletesGen(nextGen);
+ segmentInfos.add(newSegment);
+ checkpoint();
+ }
}
- newSegment.setBufferedDeletesGen(nextGen);
- segmentInfos.add(newSegment);
- checkpoint();
+ } finally {
+ flushCount.incrementAndGet();
+ doAfterFlush();
}
}
@@ -2367,7 +2423,7 @@ public class IndexWriter implements Clos
IOContext context = new IOContext(new MergeInfo(info.info.getDocCount(), info.sizeInBytes(), true, -1));
- for(FieldInfo fi : getFieldInfos(info.info)) {
+ for(FieldInfo fi : SegmentReader.readFieldInfos(info)) {
globalFieldNumberMap.addOrGet(fi.name, fi.number, fi.getDocValuesType());
}
infos.add(copySegmentAsIs(info, newSegName, context));
@@ -2462,20 +2518,12 @@ public class IndexWriter implements Clos
String mergedName = newSegmentName();
final List<AtomicReader> mergeReaders = new ArrayList<AtomicReader>();
for (IndexReader indexReader : readers) {
- if (indexReader.numDocs() > 0) {
- numDocs += indexReader.numDocs();
- for (AtomicReaderContext ctx : indexReader.leaves()) {
- if (ctx.reader().numDocs() > 0) { // drop empty (or all deleted) segments
- mergeReaders.add(ctx.reader());
- }
- }
+ numDocs += indexReader.numDocs();
+ for (AtomicReaderContext ctx : indexReader.leaves()) {
+ mergeReaders.add(ctx.reader());
}
}
- if (mergeReaders.isEmpty()) { // no segments with documents to add
- return;
- }
-
final IOContext context = new IOContext(new MergeInfo(numDocs, -1, true, -1));
// TODO: somehow we should fix this merge so it's
@@ -2483,10 +2531,14 @@ public class IndexWriter implements Clos
TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(directory);
SegmentInfo info = new SegmentInfo(directory, Constants.LUCENE_MAIN_VERSION, mergedName, -1,
- false, codec, null, null);
+ false, codec, null);
SegmentMerger merger = new SegmentMerger(mergeReaders, info, infoStream, trackingDir,
MergeState.CheckAbort.NONE, globalFieldNumberMap, context);
+
+ if (!merger.shouldMerge()) {
+ return;
+ }
MergeState mergeState;
boolean success = false;
@@ -2501,7 +2553,7 @@ public class IndexWriter implements Clos
}
}
- SegmentInfoPerCommit infoPerCommit = new SegmentInfoPerCommit(info, 0, -1L);
+ SegmentInfoPerCommit infoPerCommit = new SegmentInfoPerCommit(info, 0, -1L, -1L);
info.setFiles(new HashSet<String>(trackingDir.getCreatedFiles()));
trackingDir.getCreatedFiles().clear();
@@ -2572,23 +2624,14 @@ public class IndexWriter implements Clos
// note: we don't really need this fis (its copied), but we load it up
// so we don't pass a null value to the si writer
- FieldInfos fis = getFieldInfos(info.info);
+ FieldInfos fis = SegmentReader.readFieldInfos(info);
- final Map<String,String> attributes;
- // copy the attributes map, we might modify it below.
- // also we need to ensure its read-write, since we will invoke the SIwriter (which might want to set something).
- if (info.info.attributes() == null) {
- attributes = new HashMap<String,String>();
- } else {
- attributes = new HashMap<String,String>(info.info.attributes());
- }
-
//System.out.println("copy seg=" + info.info.name + " version=" + info.info.getVersion());
// Same SI as before but we change directory and name
SegmentInfo newInfo = new SegmentInfo(directory, info.info.getVersion(), segName, info.info.getDocCount(),
- info.info.getUseCompoundFile(),
- info.info.getCodec(), info.info.getDiagnostics(), attributes);
- SegmentInfoPerCommit newInfoPerCommit = new SegmentInfoPerCommit(newInfo, info.getDelCount(), info.getDelGen());
+ info.info.getUseCompoundFile(), info.info.getCodec(),
+ info.info.getDiagnostics());
+ SegmentInfoPerCommit newInfoPerCommit = new SegmentInfoPerCommit(newInfo, info.getDelCount(), info.getDelGen(), info.getFieldInfosGen());
Set<String> segFiles = new HashSet<String>();
@@ -2709,12 +2752,13 @@ public class IndexWriter implements Clos
boolean flushSuccess = false;
boolean success = false;
try {
- anySegmentsFlushed = docWriter.flushAllThreads();
+ anySegmentsFlushed = docWriter.flushAllThreads(this);
if (!anySegmentsFlushed) {
// prevent double increment since docWriter#doFlush increments the flushcount
// if we flushed anything.
flushCount.incrementAndGet();
}
+ processEvents(false, true);
flushSuccess = true;
synchronized(this) {
@@ -2754,7 +2798,7 @@ public class IndexWriter implements Clos
} catch (OutOfMemoryError oom) {
handleOOM(oom, "prepareCommit");
}
-
+
boolean success = false;
try {
if (anySegmentsFlushed) {
@@ -2769,7 +2813,7 @@ public class IndexWriter implements Clos
}
}
}
-
+
startCommit(toCommit);
}
}
@@ -2836,9 +2880,14 @@ public class IndexWriter implements Clos
commitInternal();
}
- /** Returns true if there are changes that have not been committed */
+ /** Returns true if there are changes that have not been
+ * committed. Note that if a merge kicked off as a
+ * result of flushing a new segment during {@link
+ * #commit}, or a concurrent merged finished,
+ * this method may return true right after you
+ * had just called {@link #commit}. */
public final boolean hasUncommittedChanges() {
- return changeCount != lastCommitChangeCount;
+ return changeCount != lastCommitChangeCount || docWriter.anyChanges() || bufferedDeletesStream.any();
}
private final void commitInternal() throws IOException {
@@ -2883,6 +2932,8 @@ public class IndexWriter implements Clos
segmentInfos.updateGeneration(pendingCommit);
lastCommitChangeCount = pendingCommitChangeCount;
rollbackSegments = pendingCommit.createBackupSegmentInfos();
+ // NOTE: don't use this.checkpoint() here, because
+ // we do not want to increment changeCount:
deleter.checkpoint(pendingCommit, true);
} finally {
// Matches the incRef done in prepareCommit:
@@ -2954,10 +3005,11 @@ public class IndexWriter implements Clos
synchronized (fullFlushLock) {
boolean flushSuccess = false;
try {
- anySegmentFlushed = docWriter.flushAllThreads();
+ anySegmentFlushed = docWriter.flushAllThreads(this);
flushSuccess = true;
} finally {
docWriter.finishFullFlush(flushSuccess);
+ processEvents(false, true);
}
}
synchronized(this) {
@@ -3050,15 +3102,35 @@ public class IndexWriter implements Clos
}
}
- /** Carefully merges deletes for the segments we just
- * merged. This is tricky because, although merging will
- * clear all deletes (compacts the documents), new
- * deletes may have been flushed to the segments since
- * the merge was started. This method "carries over"
- * such new deletes onto the newly merged segment, and
- * saves the resulting deletes file (incrementing the
- * delete generation for merge.info). If no deletes were
- * flushed, no new deletes file is saved. */
+ private MergePolicy.DocMap getDocMap(MergePolicy.OneMerge merge, MergeState mergeState) {
+ MergePolicy.DocMap docMap = merge.getDocMap(mergeState);
+ assert docMap.isConsistent(merge.info.info.getDocCount());
+ return docMap;
+ }
+
+ private void skipDeletedDoc(UpdatesIterator[] updatesIters, int deletedDoc) {
+ for (UpdatesIterator iter : updatesIters) {
+ if (iter.doc() == deletedDoc) {
+ iter.nextDoc();
+ }
+ // when entering the method, all iterators must already be beyond the
+ // deleted document, or right on it, in which case we advance them above
+ // and they must be beyond it now.
+ assert iter.doc() > deletedDoc : "updateDoc=" + iter.doc() + " deletedDoc=" + deletedDoc;
+ }
+ }
+
+ /**
+ * Carefully merges deletes and updates for the segments we just merged. This
+ * is tricky because, although merging will clear all deletes (compacts the
+ * documents) and compact all the updates, new deletes and updates may have
+ * been flushed to the segments since the merge was started. This method
+ * "carries over" such new deletes and updates onto the newly merged segment,
+ * and saves the resulting deletes and updates files (incrementing the delete
+ * and DV generations for merge.info). If no deletes were flushed, no new
+ * deletes file is saved.
+ */
+ // TODO (DVU_RENAME) to commitMergedDeletesAndUpdates
synchronized private ReadersAndLiveDocs commitMergedDeletes(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException {
assert testPoint("startCommitMergeDeletes");
@@ -3075,19 +3147,38 @@ public class IndexWriter implements Clos
long minGen = Long.MAX_VALUE;
// Lazy init (only when we find a delete to carry over):
- ReadersAndLiveDocs mergedDeletes = null;
+ ReadersAndLiveDocs mergedDeletes = null; // TODO (DVU_RENAME) to mergedDeletesAndUpdates
+ boolean initWritableLiveDocs = false;
MergePolicy.DocMap docMap = null;
-
- for(int i=0; i < sourceSegments.size(); i++) {
+ final Map<String,NumericFieldUpdates> mergedFieldUpdates = new HashMap<String,NumericFieldUpdates>();
+
+ for (int i = 0; i < sourceSegments.size(); i++) {
SegmentInfoPerCommit info = sourceSegments.get(i);
minGen = Math.min(info.getBufferedDeletesGen(), minGen);
final int docCount = info.info.getDocCount();
final Bits prevLiveDocs = merge.readers.get(i).getLiveDocs();
- final Bits currentLiveDocs;
final ReadersAndLiveDocs rld = readerPool.get(info, false);
// We hold a ref so it should still be in the pool:
assert rld != null: "seg=" + info.info.name;
- currentLiveDocs = rld.getLiveDocs();
+ final Bits currentLiveDocs = rld.getLiveDocs();
+ final Map<String,NumericFieldUpdates> mergingFieldUpdates = rld.getMergingFieldUpdates();
+ final String[] mergingFields;
+ final UpdatesIterator[] updatesIters;
+ if (mergingFieldUpdates.isEmpty()) {
+ mergingFields = null;
+ updatesIters = null;
+ } else {
+ mergingFields = new String[mergingFieldUpdates.size()];
+ updatesIters = new UpdatesIterator[mergingFieldUpdates.size()];
+ int idx = 0;
+ for (Entry<String,NumericFieldUpdates> e : mergingFieldUpdates.entrySet()) {
+ mergingFields[idx] = e.getKey();
+ updatesIters[idx] = e.getValue().getUpdates();
+ updatesIters[idx].nextDoc(); // advance to first update doc
+ ++idx;
+ }
+ }
+// System.out.println("[" + Thread.currentThread().getName() + "] IW.commitMergedDeletes: info=" + info + ", mergingUpdates=" + mergingUpdates);
if (prevLiveDocs != null) {
@@ -3110,11 +3201,10 @@ public class IndexWriter implements Clos
// If so, we must carefully merge the liveDocs one
// doc at a time:
if (currentLiveDocs != prevLiveDocs) {
-
// This means this segment received new deletes
// since we started the merge, so we
// must merge them:
- for(int j=0;j<docCount;j++) {
+ for (int j = 0; j < docCount; j++) {
if (!prevLiveDocs.get(j)) {
assert !currentLiveDocs.get(j);
} else {
@@ -3122,14 +3212,82 @@ public class IndexWriter implements Clos
if (mergedDeletes == null) {
mergedDeletes = readerPool.get(merge.info, true);
mergedDeletes.initWritableLiveDocs();
- docMap = merge.getDocMap(mergeState);
- assert docMap.isConsistent(merge.info.info.getDocCount());
+ initWritableLiveDocs = true;
+ docMap = getDocMap(merge, mergeState);
+ } else if (!initWritableLiveDocs) { // mergedDeletes was initialized by field-updates changes
+ mergedDeletes.initWritableLiveDocs();
+ initWritableLiveDocs = true;
}
mergedDeletes.delete(docMap.map(docUpto));
+ if (mergingFields != null) { // advance all iters beyond the deleted document
+ skipDeletedDoc(updatesIters, j);
+ }
+ } else if (mergingFields != null) {
+ // document isn't deleted, check if any of the fields have an update to it
+ int newDoc = -1;
+ for (int idx = 0; idx < mergingFields.length; idx++) {
+ UpdatesIterator updatesIter = updatesIters[idx];
+ if (updatesIter.doc() == j) { // document has an update
+ if (mergedDeletes == null) {
+ mergedDeletes = readerPool.get(merge.info, true);
+ docMap = getDocMap(merge, mergeState);
+ }
+ if (newDoc == -1) { // map once per all field updates, but only if there are any updates
+ newDoc = docMap.map(docUpto);
+ }
+ String field = mergingFields[idx];
+ NumericFieldUpdates fieldUpdates = mergedFieldUpdates.get(field);
+ if (fieldUpdates == null) {
+ // an approximantion of maxDoc, used to compute best bitsPerValue
+ fieldUpdates = new NumericFieldUpdates.PackedNumericFieldUpdates(mergeState.segmentInfo.getDocCount());
+ mergedFieldUpdates.put(field, fieldUpdates);
+ }
+ fieldUpdates.add(newDoc, updatesIter.value() == null ? NumericUpdate.MISSING : updatesIter.value());
+ updatesIter.nextDoc(); // advance to next document
+ } else {
+ assert updatesIter.doc() > j : "updateDoc=" + updatesIter.doc() + " curDoc=" + j;
+ }
+ }
}
docUpto++;
}
}
+ } else if (mergingFields != null) {
+ // need to check each non-deleted document if it has any updates
+ for (int j = 0; j < docCount; j++) {
+ if (prevLiveDocs.get(j)) {
+ // document isn't deleted, check if any of the fields have an update to it
+ int newDoc = -1;
+ for (int idx = 0; idx < mergingFields.length; idx++) {
+ UpdatesIterator updatesIter = updatesIters[idx];
+ if (updatesIter.doc() == j) { // document has an update
+ if (mergedDeletes == null) {
+ mergedDeletes = readerPool.get(merge.info, true);
+ docMap = getDocMap(merge, mergeState);
+ }
+ if (newDoc == -1) { // map once per all field updates, but only if there are any updates
+ newDoc = docMap.map(docUpto);
+ }
+ String field = mergingFields[idx];
+ NumericFieldUpdates fieldUpdates = mergedFieldUpdates.get(field);
+ if (fieldUpdates == null) {
+ // an approximantion of maxDoc, used to compute best bitsPerValue
+ fieldUpdates = new NumericFieldUpdates.PackedNumericFieldUpdates(mergeState.segmentInfo.getDocCount());
+ mergedFieldUpdates.put(field, fieldUpdates);
+ }
+ fieldUpdates.add(newDoc, updatesIter.value() == null ? NumericUpdate.MISSING : updatesIter.value());
+ updatesIter.nextDoc(); // advance to next document
+ } else {
+ assert updatesIter.doc() > j : "updateDoc=" + updatesIter.doc() + " curDoc=" + j;
+ }
+ }
+ // advance docUpto for every non-deleted document
+ docUpto++;
+ } else {
+ // advance all iters beyond the deleted document
+ skipDeletedDoc(updatesIters, j);
+ }
+ }
} else {
docUpto += info.info.getDocCount() - info.getDelCount() - rld.getPendingDeleteCount();
}
@@ -3137,31 +3295,118 @@ public class IndexWriter implements Clos
assert currentLiveDocs.length() == docCount;
// This segment had no deletes before but now it
// does:
- for(int j=0; j<docCount; j++) {
+ for (int j = 0; j < docCount; j++) {
if (!currentLiveDocs.get(j)) {
if (mergedDeletes == null) {
mergedDeletes = readerPool.get(merge.info, true);
mergedDeletes.initWritableLiveDocs();
- docMap = merge.getDocMap(mergeState);
- assert docMap.isConsistent(merge.info.info.getDocCount());
+ initWritableLiveDocs = true;
+ docMap = getDocMap(merge, mergeState);
+ } else if (!initWritableLiveDocs) { // mergedDeletes was initialized by field-updates changes
+ mergedDeletes.initWritableLiveDocs();
+ initWritableLiveDocs = true;
}
mergedDeletes.delete(docMap.map(docUpto));
+ if (mergingFields != null) { // advance all iters beyond the deleted document
+ skipDeletedDoc(updatesIters, j);
+ }
+ } else if (mergingFields != null) {
+ // document isn't deleted, check if any of the fields have an update to it
+ int newDoc = -1;
+ for (int idx = 0; idx < mergingFields.length; idx++) {
+ UpdatesIterator updatesIter = updatesIters[idx];
+ if (updatesIter.doc() == j) { // document has an update
+ if (mergedDeletes == null) {
+ mergedDeletes = readerPool.get(merge.info, true);
+ docMap = getDocMap(merge, mergeState);
+ }
+ if (newDoc == -1) { // map once per all field updates, but only if there are any updates
+ newDoc = docMap.map(docUpto);
+ }
+ String field = mergingFields[idx];
+ NumericFieldUpdates fieldUpdates = mergedFieldUpdates.get(field);
+ if (fieldUpdates == null) {
+ // an approximantion of maxDoc, used to compute best bitsPerValue
+ fieldUpdates = new NumericFieldUpdates.PackedNumericFieldUpdates(mergeState.segmentInfo.getDocCount());
+ mergedFieldUpdates.put(field, fieldUpdates);
+ }
+ fieldUpdates.add(newDoc, updatesIter.value() == null ? NumericUpdate.MISSING : updatesIter.value());
+ updatesIter.nextDoc(); // advance to next document
+ } else {
+ assert updatesIter.doc() > j : "field=" + mergingFields[idx] + " updateDoc=" + updatesIter.doc() + " curDoc=" + j;
+ }
+ }
+ }
+ docUpto++;
+ }
+ } else if (mergingFields != null) {
+ // no deletions before or after, but there were updates
+ for (int j = 0; j < docCount; j++) {
+ int newDoc = -1;
+ for (int idx = 0; idx < mergingFields.length; idx++) {
+ UpdatesIterator updatesIter = updatesIters[idx];
+ if (updatesIter.doc() == j) { // document has an update
+ if (mergedDeletes == null) {
+ mergedDeletes = readerPool.get(merge.info, true);
+ docMap = getDocMap(merge, mergeState);
+ }
+ if (newDoc == -1) { // map once per all field updates, but only if there are any updates
+ newDoc = docMap.map(docUpto);
+ }
+ String field = mergingFields[idx];
+ NumericFieldUpdates fieldUpdates = mergedFieldUpdates.get(field);
+ if (fieldUpdates == null) {
+ // an approximantion of maxDoc, used to compute best bitsPerValue
+ fieldUpdates = new NumericFieldUpdates.PackedNumericFieldUpdates(mergeState.segmentInfo.getDocCount());
+ mergedFieldUpdates.put(field, fieldUpdates);
+ }
+ fieldUpdates.add(newDoc, updatesIter.value() == null ? NumericUpdate.MISSING : updatesIter.value());
+ updatesIter.nextDoc(); // advance to next document
+ } else {
+ assert updatesIter.doc() > j : "updateDoc=" + updatesIter.doc() + " curDoc=" + j;
+ }
}
+ // advance docUpto for every non-deleted document
docUpto++;
}
} else {
- // No deletes before or after
+ // No deletes or updates before or after
docUpto += info.info.getDocCount();
}
}
assert docUpto == merge.info.info.getDocCount();
+ if (!mergedFieldUpdates.isEmpty()) {
+// System.out.println("[" + Thread.currentThread().getName() + "] IW.commitMergedDeletes: mergedDeletes.info=" + mergedDeletes.info + ", mergedFieldUpdates=" + mergedFieldUpdates);
+ boolean success = false;
+ try {
+ // if any error occurs while writing the field updates we should release
+ // the info, otherwise it stays in the pool but is considered not "live"
+ // which later causes false exceptions in pool.dropAll().
+ // NOTE: currently this is the only place which throws a true
+ // IOException. If this ever changes, we need to extend that try/finally
+ // block to the rest of the method too.
+ mergedDeletes.writeFieldUpdates(directory, mergedFieldUpdates);
+ success = true;
+ } finally {
+ if (!success) {
+ mergedDeletes.dropChanges();
+ readerPool.drop(merge.info);
+ }
+ }
+ }
+
if (infoStream.isEnabled("IW")) {
if (mergedDeletes == null) {
- infoStream.message("IW", "no new deletes since merge started");
+ infoStream.message("IW", "no new deletes or field updates since merge started");
} else {
- infoStream.message("IW", mergedDeletes.getPendingDeleteCount() + " new deletes since merge started");
+ String msg = mergedDeletes.getPendingDeleteCount() + " new deletes";
+ if (!mergedFieldUpdates.isEmpty()) {
+ msg += " and " + mergedFieldUpdates.size() + " new field updates";
+ }
+ msg += " since merge started";
+ infoStream.message("IW", msg);
}
}
@@ -3198,9 +3443,8 @@ public class IndexWriter implements Clos
return false;
}
- final ReadersAndLiveDocs mergedDeletes = merge.info.info.getDocCount() == 0 ? null : commitMergedDeletes(merge, mergeState);
-
- assert mergedDeletes == null || mergedDeletes.getPendingDeleteCount() != 0;
+ final ReadersAndLiveDocs mergedDeletes = merge.info.info.getDocCount() == 0 ? null : commitMergedDeletes(merge, mergeState);
+// System.out.println("[" + Thread.currentThread().getName() + "] IW.commitMerge: mergedDeletes=" + mergedDeletes);
// If the doc store we are using has been closed and
// is in now compound format (but wasn't when we
@@ -3228,15 +3472,31 @@ public class IndexWriter implements Clos
assert merge.info.info.getDocCount() != 0 || keepFullyDeletedSegments || dropSegment;
- segmentInfos.applyMergeChanges(merge, dropSegment);
-
if (mergedDeletes != null) {
- if (dropSegment) {
- mergedDeletes.dropChanges();
+ boolean success = false;
+ try {
+ if (dropSegment) {
+ mergedDeletes.dropChanges();
+ }
+ // Pass false for assertInfoLive because the merged
+ // segment is not yet live (only below do we commit it
+ // to the segmentInfos):
+ readerPool.release(mergedDeletes, false);
+ success = true;
+ } finally {
+ if (!success) {
+ mergedDeletes.dropChanges();
+ readerPool.drop(merge.info);
+ }
}
- readerPool.release(mergedDeletes);
}
+ // Must do this after readerPool.release, in case an
+ // exception is hit e.g. writing the live docs for the
+ // merge segment, in which case we need to abort the
+ // merge:
+ segmentInfos.applyMergeChanges(merge, dropSegment);
+
if (dropSegment) {
assert !segmentInfos.contains(merge.info);
readerPool.drop(merge.info);
@@ -3300,17 +3560,12 @@ public class IndexWriter implements Clos
// in which case we must throw it so, for example, the
// rollbackTransaction code in addIndexes* is
// executed.
- if (merge.isExternal)
+ if (merge.isExternal) {
throw (MergePolicy.MergeAbortedException) t;
- } else if (t instanceof IOException)
- throw (IOException) t;
- else if (t instanceof RuntimeException)
- throw (RuntimeException) t;
- else if (t instanceof Error)
- throw (Error) t;
- else
- // Should not get here
- throw new RuntimeException(t);
+ }
+ } else {
+ IOUtils.reThrow(t);
+ }
}
/**
@@ -3514,7 +3769,7 @@ public class IndexWriter implements Clos
// Lock order: IW -> BD
final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream.applyDeletes(readerPool, merge.segments);
-
+
if (result.anyDeletes) {
checkpoint();
}
@@ -3538,12 +3793,14 @@ public class IndexWriter implements Clos
// ConcurrentMergePolicy we keep deterministic segment
// names.
final String mergeSegmentName = newSegmentName();
- SegmentInfo si = new SegmentInfo(directory, Constants.LUCENE_MAIN_VERSION, mergeSegmentName, -1, false, codec, null, null);
+ SegmentInfo si = new SegmentInfo(directory, Constants.LUCENE_MAIN_VERSION, mergeSegmentName, -1, false, codec, null);
Map<String,String> details = new HashMap<String,String>();
details.put("mergeMaxNumSegments", "" + merge.maxNumSegments);
details.put("mergeFactor", Integer.toString(merge.segments.size()));
setDiagnostics(si, SOURCE_MERGE, details);
- merge.setInfo(new SegmentInfoPerCommit(si, 0, -1L));
+ merge.setInfo(new SegmentInfoPerCommit(si, 0, -1L, -1L));
+
+// System.out.println("[" + Thread.currentThread().getName() + "] IW._mergeInit: " + segString(merge.segments) + " into " + si);
// Lock order: IW -> BD
bufferedDeletesStream.prune(segmentInfos);
@@ -3585,7 +3842,7 @@ public class IndexWriter implements Clos
// exception inside mergeInit
if (merge.registerDone) {
final List<SegmentInfoPerCommit> sourceSegments = merge.segments;
- for(SegmentInfoPerCommit info : sourceSegments) {
+ for (SegmentInfoPerCommit info : sourceSegments) {
mergingSegments.remove(info);
}
merge.registerDone = false;
@@ -3609,6 +3866,8 @@ public class IndexWriter implements Clos
assert rld != null;
if (drop) {
rld.dropChanges();
+ } else {
+ rld.dropMergingUpdates();
}
rld.release(sr);
readerPool.release(rld);
@@ -3625,11 +3884,8 @@ public class IndexWriter implements Clos
}
// If any error occured, throw it.
- if (!suppressExceptions && th != null) {
- if (th instanceof IOException) throw (IOException) th;
- if (th instanceof RuntimeException) throw (RuntimeException) th;
- if (th instanceof Error) throw (Error) th;
- throw new RuntimeException(th);
+ if (!suppressExceptions) {
+ IOUtils.reThrow(th);
}
}
@@ -3667,20 +3923,20 @@ public class IndexWriter implements Clos
// Hold onto the "live" reader; we will use this to
// commit merged deletes
final ReadersAndLiveDocs rld = readerPool.get(info, true);
- SegmentReader reader = rld.getReader(context);
- assert reader != null;
- // Carefully pull the most recent live docs:
+ // Carefully pull the most recent live docs and reader
+ SegmentReader reader;
final Bits liveDocs;
final int delCount;
- synchronized(this) {
- // Must sync to ensure BufferedDeletesStream
- // cannot change liveDocs/pendingDeleteCount while
- // we pull a copy:
+ synchronized (this) {
+ // Must sync to ensure BufferedDeletesStream cannot change liveDocs,
+ // pendingDeleteCount and field updates while we pull a copy:
+ reader = rld.getReaderForMerge(context);
liveDocs = rld.getReadOnlyLiveDocs();
delCount = rld.getPendingDeleteCount() + info.getDelCount();
+ assert reader != null;
assert rld.verifyDocCounts();
if (infoStream.isEnabled("IW")) {
@@ -3702,7 +3958,7 @@ public class IndexWriter implements Clos
// fix the reader's live docs and del count
assert delCount > reader.numDeletedDocs(); // beware of zombies
- SegmentReader newReader = new SegmentReader(info, reader.core, liveDocs, info.info.getDocCount() - delCount);
+ SegmentReader newReader = new SegmentReader(info, reader, liveDocs, info.info.getDocCount() - delCount);
boolean released = false;
try {
rld.release(reader);
@@ -3721,6 +3977,8 @@ public class IndexWriter implements Clos
segUpto++;
}
+// System.out.println("[" + Thread.currentThread().getName() + "] IW.mergeMiddle: merging " + merge.getMergeReaders());
+
// we pass merge.getMergeReaders() instead of merge.readers to allow the
// OneMerge to return a view over the actual segments to merge
final SegmentMerger merger = new SegmentMerger(merge.getMergeReaders(),
@@ -3733,7 +3991,12 @@ public class IndexWriter implements Clos
MergeState mergeState;
boolean success3 = false;
try {
- mergeState = merger.merge();
+ if (!merger.shouldMerge()) {
+ // would result in a 0 document segment: nothing to merge!
+ mergeState = new MergeState(new ArrayList<AtomicReader>(), merge.info.info, infoStream, checkAbort);
+ } else {
+ mergeState = merger.merge();
+ }
success3 = true;
} finally {
if (!success3) {
@@ -3748,12 +4011,16 @@ public class IndexWriter implements Clos
// Record which codec was used to write the segment
if (infoStream.isEnabled("IW")) {
- infoStream.message("IW", "merge codec=" + codec + " docCount=" + merge.info.info.getDocCount() + "; merged segment has " +
+ if (merge.info.info.getDocCount() == 0) {
+ infoStream.message("IW", "merge away fully deleted segments");
+ } else {
+ infoStream.message("IW", "merge codec=" + codec + " docCount=" + merge.info.info.getDocCount() + "; merged segment has " +
(mergeState.fieldInfos.hasVectors() ? "vectors" : "no vectors") + "; " +
(mergeState.fieldInfos.hasNorms() ? "norms" : "no norms") + "; " +
(mergeState.fieldInfos.hasDocValues() ? "docValues" : "no docValues") + "; " +
(mergeState.fieldInfos.hasProx() ? "prox" : "no prox") + "; " +
(mergeState.fieldInfos.hasProx() ? "freqs" : "no freqs"));
+ }
}
// Very important to do this before opening the reader
@@ -3958,8 +4225,8 @@ public class IndexWriter implements Clos
/** Only for testing.
*
* @lucene.internal */
- void keepFullyDeletedSegments() {
- keepFullyDeletedSegments = true;
+ void setKeepFullyDeletedSegments(boolean v) {
+ keepFullyDeletedSegments = v;
}
boolean getKeepFullyDeletedSegments() {
@@ -4302,4 +4569,65 @@ public class IndexWriter implements Clos
synchronized final void flushFailed(SegmentInfo info) throws IOException {
deleter.refresh(info.name);
}
+
+ final int purge(boolean forced) throws IOException {
+ return docWriter.purgeBuffer(this, forced);
+ }
+
+ final void applyDeletesAndPurge(boolean forcePurge) throws IOException {
+ try {
+ purge(forcePurge);
+ } finally {
+ applyAllDeletes();
+ flushCount.incrementAndGet();
+ }
+ }
+ final void doAfterSegmentFlushed(boolean triggerMerge, boolean forcePurge) throws IOException {
+ try {
+ purge(forcePurge);
+ } finally {
+ if (triggerMerge) {
+ maybeMerge(MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
+ }
+ }
+
+ }
+
+ private boolean processEvents(boolean triggerMerge, boolean forcePurge) throws IOException {
+ return processEvents(eventQueue, triggerMerge, forcePurge);
+ }
+
+ private boolean processEvents(Queue<Event> queue, boolean triggerMerge, boolean forcePurge) throws IOException {
+ Event event;
+ boolean processed = false;
+ while((event = queue.poll()) != null) {
+ processed = true;
+ event.process(this, triggerMerge, forcePurge);
+ }
+ return processed;
+ }
+
+ /**
+ * Interface for internal atomic events. See {@link DocumentsWriter} for details. Events are executed concurrently and no order is guaranteed.
+ * Each event should only rely on the serializeability within it's process method. All actions that must happen before or after a certain action must be
+ * encoded inside the {@link #process(IndexWriter, boolean, boolean)} method.
+ *
+ */
+ static interface Event {
+
+ /**
+ * Processes the event. This method is called by the {@link IndexWriter}
+ * passed as the first argument.
+ *
+ * @param writer
+ * the {@link IndexWriter} that executes the event.
+ * @param triggerMerge
+ * <code>false</code> iff this event should not trigger any segment merges
+ * @param clearBuffers
+ * <code>true</code> iff this event should clear all buffers associated with the event.
+ * @throws IOException
+ * if an {@link IOException} occurs
+ */
+ void process(IndexWriter writer, boolean triggerMerge, boolean clearBuffers) throws IOException;
+ }
}
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java Mon Oct 21 18:58:24 2013
@@ -497,9 +497,11 @@ public final class IndexWriterConfig ext
return super.getRAMBufferSizeMB();
}
- /** If non-null, information about merges, deletes and a
+ /**
+ * Information about merges, deletes and a
* message when maxFieldLength is reached will be printed
- * to this.
+ * to this. Must not be null, but {@link InfoStream#NO_OUTPUT}
+ * may be used to supress output.
*/
public IndexWriterConfig setInfoStream(InfoStream infoStream) {
if (infoStream == null) {
@@ -510,7 +512,9 @@ public final class IndexWriterConfig ext
return this;
}
- /** Convenience method that uses {@link PrintStreamInfoStream} */
+ /**
+ * Convenience method that uses {@link PrintStreamInfoStream}. Must not be null.
+ */
public IndexWriterConfig setInfoStream(PrintStream printStream) {
if (printStream == null) {
throw new IllegalArgumentException("printStream must not be null");
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/MergeState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/MergeState.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/MergeState.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/MergeState.java Mon Oct 21 18:58:24 2013
@@ -105,11 +105,11 @@ public class MergeState {
}
- private static class NoDelDocMap extends DocMap {
+ private static final class NoDelDocMap extends DocMap {
private final int maxDoc;
- private NoDelDocMap(int maxDoc) {
+ NoDelDocMap(int maxDoc) {
this.maxDoc = maxDoc;
}
@@ -151,6 +151,10 @@ public class MergeState {
/** InfoStream for debugging messages. */
public final InfoStream infoStream;
+ /** Counter used for periodic calls to checkAbort
+ * @lucene.internal */
+ public int checkAbortCount;
+
// TODO: get rid of this? it tells you which segments are 'aligned' (e.g. for bulk merging)
// but is this really so expensive to compute again in different components, versus once in SM?
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/MultiDocValues.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/MultiDocValues.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/MultiDocValues.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/MultiDocValues.java Mon Oct 21 18:58:24 2013
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.lucene.index.MultiTermsEnum.TermsEnumIndex;
import org.apache.lucene.index.MultiTermsEnum.TermsEnumWithSlice;
+import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.packed.AppendingPackedLongBuffer;
import org.apache.lucene.util.packed.MonotonicAppendingLongBuffer;
@@ -135,6 +136,51 @@ public class MultiDocValues {
};
}
}
+
+ /** Returns a Bits for a reader's docsWithField (potentially merging on-the-fly)
+ * <p>
+ * This is a slow way to access this bitset. Instead, access them per-segment
+ * with {@link AtomicReader#getDocsWithField(String)}
+ * </p>
+ * */
+ public static Bits getDocsWithField(final IndexReader r, final String field) throws IOException {
+ final List<AtomicReaderContext> leaves = r.leaves();
+ final int size = leaves.size();
+ if (size == 0) {
+ return null;
+ } else if (size == 1) {
+ return leaves.get(0).reader().getDocsWithField(field);
+ }
+
+ boolean anyReal = false;
+ boolean anyMissing = false;
+ final Bits[] values = new Bits[size];
+ final int[] starts = new int[size+1];
+ for (int i = 0; i < size; i++) {
+ AtomicReaderContext context = leaves.get(i);
+ Bits v = context.reader().getDocsWithField(field);
+ if (v == null) {
+ v = new Bits.MatchNoBits(context.reader().maxDoc());
+ anyMissing = true;
+ } else {
+ anyReal = true;
+ if (v instanceof Bits.MatchAllBits == false) {
+ anyMissing = true;
+ }
+ }
+ values[i] = v;
+ starts[i] = context.docBase;
+ }
+ starts[size] = r.maxDoc();
+
+ if (!anyReal) {
+ return null;
+ } else if (!anyMissing) {
+ return new Bits.MatchAllBits(r.maxDoc());
+ } else {
+ return new MultiBits(values, starts, false);
+ }
+ }
/** Returns a BinaryDocValues for a reader's docvalues (potentially merging on-the-fly)
* <p>
@@ -404,7 +450,7 @@ public class MultiDocValues {
public int getOrd(int docID) {
int subIndex = ReaderUtil.subIndex(docID, docStarts);
int segmentOrd = values[subIndex].getOrd(docID - docStarts[subIndex]);
- return (int) mapping.getGlobalOrd(subIndex, segmentOrd);
+ return segmentOrd == -1 ? segmentOrd : (int) mapping.getGlobalOrd(subIndex, segmentOrd);
}
@Override
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/MultiFields.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/MultiFields.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/MultiFields.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/MultiFields.java Mon Oct 21 18:58:24 2013
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHa
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.MergedIterator;
/**
* Exposes flex API, merged from flex API of sub-segments.
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/MultiTerms.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/MultiTerms.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/MultiTerms.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/MultiTerms.java Mon Oct 21 18:58:24 2013
@@ -19,7 +19,6 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Comparator;
import java.util.List;
import org.apache.lucene.util.BytesRef;
@@ -36,7 +35,7 @@ import org.apache.lucene.util.automaton.
public final class MultiTerms extends Terms {
private final Terms[] subs;
private final ReaderSlice[] subSlices;
- private final Comparator<BytesRef> termComp;
+ private final boolean hasFreqs;
private final boolean hasOffsets;
private final boolean hasPositions;
private final boolean hasPayloads;
@@ -51,28 +50,19 @@ public final class MultiTerms extends Te
this.subs = subs;
this.subSlices = subSlices;
- Comparator<BytesRef> _termComp = null;
assert subs.length > 0 : "inefficient: don't use MultiTerms over one sub";
+ boolean _hasFreqs = true;
boolean _hasOffsets = true;
boolean _hasPositions = true;
boolean _hasPayloads = false;
for(int i=0;i<subs.length;i++) {
- if (_termComp == null) {
- _termComp = subs[i].getComparator();
- } else {
- // We cannot merge sub-readers that have
- // different TermComps
- final Comparator<BytesRef> subTermComp = subs[i].getComparator();
- if (subTermComp != null && !subTermComp.equals(_termComp)) {
- throw new IllegalStateException("sub-readers have different BytesRef.Comparators; cannot merge");
- }
- }
+ _hasFreqs &= subs[i].hasFreqs();
_hasOffsets &= subs[i].hasOffsets();
_hasPositions &= subs[i].hasPositions();
_hasPayloads |= subs[i].hasPayloads();
}
- termComp = _termComp;
+ hasFreqs = _hasFreqs;
hasOffsets = _hasOffsets;
hasPositions = _hasPositions;
hasPayloads = hasPositions && _hasPayloads; // if all subs have pos, and at least one has payloads.
@@ -158,8 +148,8 @@ public final class MultiTerms extends Te
}
@Override
- public Comparator<BytesRef> getComparator() {
- return termComp;
+ public boolean hasFreqs() {
+ return hasFreqs;
}
@Override
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/MultiTermsEnum.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/MultiTermsEnum.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/MultiTermsEnum.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/MultiTermsEnum.java Mon Oct 21 18:58:24 2013
@@ -23,7 +23,6 @@ import org.apache.lucene.util.Bits;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Comparator;
/**
* Exposes {@link TermsEnum} API, merged from {@link TermsEnum} API of sub-segments.
@@ -47,7 +46,6 @@ public final class MultiTermsEnum extend
private int numTop;
private int numSubs;
private BytesRef current;
- private Comparator<BytesRef> termComp;
static class TermsEnumIndex {
public final static TermsEnumIndex[] EMPTY_ARRAY = new TermsEnumIndex[0];
@@ -95,36 +93,18 @@ public final class MultiTermsEnum extend
return current;
}
- @Override
- public Comparator<BytesRef> getComparator() {
- return termComp;
- }
-
/** The terms array must be newly created TermsEnum, ie
* {@link TermsEnum#next} has not yet been called. */
public TermsEnum reset(TermsEnumIndex[] termsEnumsIndex) throws IOException {
assert termsEnumsIndex.length <= top.length;
numSubs = 0;
numTop = 0;
- termComp = null;
queue.clear();
for(int i=0;i<termsEnumsIndex.length;i++) {
final TermsEnumIndex termsEnumIndex = termsEnumsIndex[i];
assert termsEnumIndex != null;
- // init our term comp
- if (termComp == null) {
- queue.termComp = termComp = termsEnumIndex.termsEnum.getComparator();
- } else {
- // We cannot merge sub-readers that have
- // different TermComps
- final Comparator<BytesRef> subTermComp = termsEnumIndex.termsEnum.getComparator();
- if (subTermComp != null && !subTermComp.equals(termComp)) {
- throw new IllegalStateException("sub-readers have different BytesRef.Comparators: " + subTermComp + " vs " + termComp + "; cannot merge");
- }
- }
-
final BytesRef term = termsEnumIndex.termsEnum.next();
if (term != null) {
final TermsEnumWithSlice entry = subs[termsEnumIndex.subIndex];
@@ -149,7 +129,7 @@ public final class MultiTermsEnum extend
numTop = 0;
boolean seekOpt = false;
- if (lastSeek != null && termComp.compare(lastSeek, term) <= 0) {
+ if (lastSeek != null && lastSeek.compareTo(term) <= 0) {
seekOpt = true;
}
@@ -167,7 +147,7 @@ public final class MultiTermsEnum extend
if (seekOpt) {
final BytesRef curTerm = currentSubs[i].current;
if (curTerm != null) {
- final int cmp = termComp.compare(term, curTerm);
+ final int cmp = term.compareTo(curTerm);
if (cmp == 0) {
status = true;
} else if (cmp < 0) {
@@ -201,7 +181,7 @@ public final class MultiTermsEnum extend
lastSeekExact = false;
boolean seekOpt = false;
- if (lastSeek != null && termComp.compare(lastSeek, term) <= 0) {
+ if (lastSeek != null && lastSeek.compareTo(term) <= 0) {
seekOpt = true;
}
@@ -219,7 +199,7 @@ public final class MultiTermsEnum extend
if (seekOpt) {
final BytesRef curTerm = currentSubs[i].current;
if (curTerm != null) {
- final int cmp = termComp.compare(term, curTerm);
+ final int cmp = term.compareTo(curTerm);
if (cmp == 0) {
status = SeekStatus.FOUND;
} else if (cmp < 0) {
@@ -519,14 +499,13 @@ public final class MultiTermsEnum extend
}
private final static class TermMergeQueue extends PriorityQueue<TermsEnumWithSlice> {
- Comparator<BytesRef> termComp;
TermMergeQueue(int size) {
super(size);
}
@Override
protected boolean lessThan(TermsEnumWithSlice termsA, TermsEnumWithSlice termsB) {
- final int cmp = termComp.compare(termsA.current, termsB.current);
+ final int cmp = termsA.current.compareTo(termsB.current);
if (cmp != 0) {
return cmp < 0;
} else {
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/NormsConsumerPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/NormsConsumerPerField.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/NormsConsumerPerField.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/NormsConsumerPerField.java Mon Oct 21 18:58:24 2013
@@ -44,7 +44,7 @@ final class NormsConsumerPerField extend
if (fieldInfo.isIndexed() && !fieldInfo.omitsNorms()) {
if (consumer == null) {
fieldInfo.setNormValueType(FieldInfo.DocValuesType.NUMERIC);
- consumer = new NumericDocValuesWriter(fieldInfo, docState.docWriter.bytesUsed);
+ consumer = new NumericDocValuesWriter(fieldInfo, docState.docWriter.bytesUsed, false);
}
consumer.addValue(docState.docID, similarity.computeNorm(fieldState));
}
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesWriter.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesWriter.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesWriter.java Mon Oct 21 18:58:24 2013
@@ -23,6 +23,8 @@ import java.util.NoSuchElementException;
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.util.Counter;
+import org.apache.lucene.util.OpenBitSet;
+import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.packed.AppendingDeltaPackedLongBuffer;
import org.apache.lucene.util.packed.PackedInts;
@@ -35,14 +37,18 @@ class NumericDocValuesWriter extends Doc
private AppendingDeltaPackedLongBuffer pending;
private final Counter iwBytesUsed;
private long bytesUsed;
+ private final OpenBitSet docsWithField;
private final FieldInfo fieldInfo;
+ private final boolean trackDocsWithField;
- public NumericDocValuesWriter(FieldInfo fieldInfo, Counter iwBytesUsed) {
+ public NumericDocValuesWriter(FieldInfo fieldInfo, Counter iwBytesUsed, boolean trackDocsWithField) {
pending = new AppendingDeltaPackedLongBuffer(PackedInts.COMPACT);
- bytesUsed = pending.ramBytesUsed();
+ docsWithField = new OpenBitSet();
+ bytesUsed = pending.ramBytesUsed() + docsWithFieldBytesUsed();
this.fieldInfo = fieldInfo;
this.iwBytesUsed = iwBytesUsed;
iwBytesUsed.addAndGet(bytesUsed);
+ this.trackDocsWithField = trackDocsWithField;
}
public void addValue(int docID, long value) {
@@ -56,12 +62,20 @@ class NumericDocValuesWriter extends Doc
}
pending.add(value);
+ if (trackDocsWithField) {
+ docsWithField.set(docID);
+ }
updateBytesUsed();
}
+
+ private long docsWithFieldBytesUsed() {
+ // size of the long[] + some overhead
+ return RamUsageEstimator.sizeOf(docsWithField.getBits()) + 64;
+ }
private void updateBytesUsed() {
- final long newBytesUsed = pending.ramBytesUsed();
+ final long newBytesUsed = pending.ramBytesUsed() + docsWithFieldBytesUsed();
iwBytesUsed.addAndGet(newBytesUsed - bytesUsed);
bytesUsed = newBytesUsed;
}
@@ -109,14 +123,18 @@ class NumericDocValuesWriter extends Doc
if (!hasNext()) {
throw new NoSuchElementException();
}
- long value;
+ Long value;
if (upto < size) {
- value = iter.next();
+ long v = iter.next();
+ if (!trackDocsWithField || docsWithField.get(upto)) {
+ value = v;
+ } else {
+ value = null;
+ }
} else {
- value = 0;
+ value = trackDocsWithField ? null : MISSING;
}
upto++;
- // TODO: make reusable Number
return value;
}
Modified: lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/ParallelAtomicReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/ParallelAtomicReader.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/ParallelAtomicReader.java (original)
+++ lucene/dev/branches/lucene4956/lucene/core/src/java/org/apache/lucene/index/ParallelAtomicReader.java Mon Oct 21 18:58:24 2013
@@ -286,6 +286,13 @@ public class ParallelAtomicReader extend
}
@Override
+ public Bits getDocsWithField(String field) throws IOException {
+ ensureOpen();
+ AtomicReader reader = fieldToReader.get(field);
+ return reader == null ? null : reader.getDocsWithField(field);
+ }
+
+ @Override
public NumericDocValues getNormValues(String field) throws IOException {
ensureOpen();
AtomicReader reader = fieldToReader.get(field);