You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by bu...@apache.org on 2011/02/25 08:15:29 UTC
svn commit: r1074414 - in /lucene/dev/branches/realtime_search/lucene/src:
java/org/apache/lucene/index/ test/org/apache/lucene/index/
Author: buschmi
Date: Fri Feb 25 07:15:28 2011
New Revision: 1074414
URL: http://svn.apache.org/viewvc?rev=1074414&view=rev
Log:
LUCENE-2324: new doc deletes approach; various bug fixes
Modified:
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java
lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1074414&r1=1074413&r2=1074414&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Fri Feb 25 07:15:28 2011
@@ -27,13 +27,13 @@ import java.util.concurrent.atomic.Atomi
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.SimilarityProvider;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
-import org.apache.lucene.util.BitVector;
/**
* This class accepts multiple added documents and directly
@@ -145,28 +145,24 @@ final class DocumentsWriter {
}
boolean deleteQueries(final Query... queries) throws IOException {
+ synchronized(this) {
+ for (Query query : queries) {
+ pendingDeletes.addQuery(query, BufferedDeletes.MAX_INT);
+ }
+ }
+
Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
- boolean deleted = false;
while (threadsIterator.hasNext()) {
ThreadState state = threadsIterator.next();
state.lock();
try {
state.perThread.deleteQueries(queries);
- deleted = true;
} finally {
state.unlock();
}
}
- if (!deleted) {
- synchronized(this) {
- for (Query query : queries) {
- pendingDeletes.addQuery(query, BufferedDeletes.MAX_INT);
- }
- }
- }
-
return false;
}
@@ -175,12 +171,16 @@ final class DocumentsWriter {
}
boolean deleteTerms(final Term... terms) throws IOException {
+ synchronized(this) {
+ for (Term term : terms) {
+ pendingDeletes.addTerm(term, BufferedDeletes.MAX_INT);
+ }
+ }
+
Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
- boolean deleted = false;
while (threadsIterator.hasNext()) {
ThreadState state = threadsIterator.next();
- deleted = true;
state.lock();
try {
state.perThread.deleteTerms(terms);
@@ -189,14 +189,6 @@ final class DocumentsWriter {
}
}
- if (!deleted) {
- synchronized(this) {
- for (Term term : terms) {
- pendingDeletes.addTerm(term, BufferedDeletes.MAX_INT);
- }
- }
- }
-
return false;
}
@@ -207,12 +199,14 @@ final class DocumentsWriter {
return deleteTerms(term);
}
- boolean deleteTerm(final Term term, ThreadState exclude) {
+ void deleteTerm(final Term term, ThreadState exclude) {
+ synchronized(this) {
+ pendingDeletes.addTerm(term, BufferedDeletes.MAX_INT);
+ }
+
Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
- boolean deleted = false;
while (threadsIterator.hasNext()) {
- deleted = true;
ThreadState state = threadsIterator.next();
if (state != exclude) {
state.lock();
@@ -223,8 +217,6 @@ final class DocumentsWriter {
}
}
}
-
- return deleted;
}
/** If non-null, various details of indexing are printed
@@ -303,6 +295,10 @@ final class DocumentsWriter {
synchronized void abort() throws IOException {
boolean success = false;
+ synchronized (this) {
+ pendingDeletes.clear();
+ }
+
try {
if (infoStream != null) {
message("docWriter: abort");
@@ -328,7 +324,7 @@ final class DocumentsWriter {
}
}
- synchronized boolean anyChanges() {
+ boolean anyChanges() {
return numDocsInRAM.get() != 0 || anyDeletions();
}
@@ -355,29 +351,10 @@ final class DocumentsWriter {
return numDeletes;
}
- // TODO: can we improve performance of this method by keeping track
- // here in DW of whether any DWPT has deletions?
- public synchronized boolean anyDeletions() {
- if (pendingDeletes.any()) {
- return true;
+ public boolean anyDeletions() {
+ return pendingDeletes.any();
}
- Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
- while (threadsIterator.hasNext()) {
- ThreadState state = threadsIterator.next();
- state.lock();
- try {
- if (state.perThread.pendingDeletes.any()) {
- return true;
- }
- } finally {
- state.unlock();
- }
- }
-
- return false;
- }
-
void close() {
closed = true;
}
@@ -386,9 +363,7 @@ final class DocumentsWriter {
throws CorruptIndexException, IOException {
ensureOpen();
- SegmentInfo newSegment = null;
- BufferedDeletes segmentDeletes = null;
- BitVector deletedDocs = null;
+ FlushedSegment newSegment = null;
ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this, doc);
try {
@@ -398,39 +373,38 @@ final class DocumentsWriter {
numDocsInRAM.incrementAndGet();
newSegment = finishAddDocument(dwpt, perThreadRAMUsedBeforeAdd);
- if (newSegment != null) {
- deletedDocs = dwpt.flushState.deletedDocs;
- if (dwpt.pendingDeletes.any()) {
- segmentDeletes = dwpt.pendingDeletes;
- dwpt.pendingDeletes = new BufferedDeletes(false);
- }
- }
} finally {
perThread.unlock();
}
- if (segmentDeletes != null) {
- pushDeletes(newSegment, segmentDeletes);
+ // delete term from other DWPTs later, so that this thread
+ // doesn't have to lock multiple DWPTs at the same time
+ if (delTerm != null) {
+ deleteTerm(delTerm, perThread);
+ }
+
+ if (newSegment != null) {
+ finishFlushedSegment(newSegment);
}
if (newSegment != null) {
perThreadPool.clearThreadBindings(perThread);
- indexWriter.addFlushedSegment(newSegment, deletedDocs);
return true;
}
- // delete term from other DWPTs later, so that this thread
- // doesn't have to lock multiple DWPTs at the same time
- if (delTerm != null) {
- deleteTerm(delTerm, perThread);
+ return false;
}
- return false;
+ private void finishFlushedSegment(FlushedSegment newSegment) throws IOException {
+ pushDeletes(newSegment);
+ if (newSegment != null) {
+ indexWriter.addFlushedSegment(newSegment);
+ }
}
- private final SegmentInfo finishAddDocument(DocumentsWriterPerThread perThread,
+ private final FlushedSegment finishAddDocument(DocumentsWriterPerThread perThread,
long perThreadRAMUsedBeforeAdd) throws IOException {
- SegmentInfo newSegment = null;
+ FlushedSegment newSegment = null;
if (perThread.getNumDocsInRAM() == maxBufferedDocs) {
newSegment = perThread.flush();
@@ -445,20 +419,21 @@ final class DocumentsWriter {
return newSegment;
}
- final void substractFlushedNumDocs(int numFlushed) {
+ final void subtractFlushedNumDocs(int numFlushed) {
int oldValue = numDocsInRAM.get();
while (!numDocsInRAM.compareAndSet(oldValue, oldValue - numFlushed)) {
oldValue = numDocsInRAM.get();
}
}
- private final void pushDeletes(SegmentInfo segmentInfo, BufferedDeletes segmentDeletes) {
- synchronized(indexWriter) {
- // Lock order: DW -> BD
+ private synchronized void pushDeletes(FlushedSegment flushedSegment) {
+ maybePushPendingDeletes();
+ if (flushedSegment != null) {
+ BufferedDeletes deletes = flushedSegment.segmentDeletes;
final long delGen = bufferedDeletesStream.getNextGen();
- if (segmentDeletes.any()) {
- if (indexWriter.segmentInfos.size() > 0 || segmentInfo != null) {
- final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(segmentDeletes, delGen);
+ // Lock order: DW -> BD
+ if (deletes != null && deletes.any()) {
+ final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(deletes, delGen);
if (infoStream != null) {
message("flush: push buffered deletes");
}
@@ -466,40 +441,27 @@ final class DocumentsWriter {
if (infoStream != null) {
message("flush: delGen=" + packet.gen);
}
- if (segmentInfo != null) {
- segmentInfo.setBufferedDeletesGen(packet.gen);
}
- } else {
- if (infoStream != null) {
- message("flush: drop buffered deletes: no segments");
+ flushedSegment.segmentInfo.setBufferedDeletesGen(delGen);
}
- // We can safely discard these deletes: since
- // there are no segments, the deletions cannot
- // affect anything.
}
- } else if (segmentInfo != null) {
- segmentInfo.setBufferedDeletesGen(delGen);
+
+ private synchronized final void maybePushPendingDeletes() {
+ final long delGen = bufferedDeletesStream.getNextGen();
+ if (pendingDeletes.any()) {
+ bufferedDeletesStream.push(new FrozenBufferedDeletes(pendingDeletes, delGen));
+ pendingDeletes.clear();
}
}
- }
final boolean flushAllThreads(final boolean flushDeletes)
throws IOException {
- if (flushDeletes) {
- synchronized (this) {
- pushDeletes(null, pendingDeletes);
- pendingDeletes = new BufferedDeletes(false);
- }
- }
-
Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
boolean anythingFlushed = false;
while (threadsIterator.hasNext()) {
- SegmentInfo newSegment = null;
- BufferedDeletes segmentDeletes = null;
- BitVector deletedDocs = null;
+ FlushedSegment newSegment = null;
ThreadState perThread = threadsIterator.next();
perThread.lock();
@@ -520,34 +482,24 @@ final class DocumentsWriter {
newSegment = dwpt.flush();
if (newSegment != null) {
- anythingFlushed = true;
- deletedDocs = dwpt.flushState.deletedDocs;
perThreadPool.clearThreadBindings(perThread);
- if (dwpt.pendingDeletes.any()) {
- segmentDeletes = dwpt.pendingDeletes;
- dwpt.pendingDeletes = new BufferedDeletes(false);
}
}
- } else if (flushDeletes && dwpt.pendingDeletes.any()) {
- segmentDeletes = dwpt.pendingDeletes;
- dwpt.pendingDeletes = new BufferedDeletes(false);
- }
} finally {
perThread.unlock();
}
- if (segmentDeletes != null) {
- pushDeletes(newSegment, segmentDeletes);
- }
-
-
if (newSegment != null) {
- // important do unlock the perThread before finishFlushedSegment
- // is called to prevent deadlock on IndexWriter mutex
- indexWriter.addFlushedSegment(newSegment, deletedDocs);
+ anythingFlushed = true;
+ finishFlushedSegment(newSegment);
}
}
+ if (!anythingFlushed && flushDeletes) {
+ maybePushPendingDeletes();
+ }
+
+
return anythingFlushed;
}
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=1074414&r1=1074413&r2=1074414&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Fri Feb 25 07:15:28 2011
@@ -101,6 +101,26 @@ public class DocumentsWriterPerThread {
public boolean testPoint(String name) {
return docWriter.writer.testPoint(name);
}
+
+ public void clear() {
+ // don't hold onto doc nor analyzer, in case it is
+ // largish:
+ doc = null;
+ analyzer = null;
+ }
+ }
+
+ static class FlushedSegment {
+ final SegmentInfo segmentInfo;
+ final BufferedDeletes segmentDeletes;
+ final BitVector deletedDocuments;
+
+ private FlushedSegment(SegmentInfo segmentInfo,
+ BufferedDeletes segmentDeletes, BitVector deletedDocuments) {
+ this.segmentInfo = segmentInfo;
+ this.segmentDeletes = segmentDeletes;
+ this.deletedDocuments = deletedDocuments;
+ }
}
/** Called if we hit an exception at a bad time (when
@@ -136,7 +156,6 @@ public class DocumentsWriterPerThread {
final Directory directory;
final DocState docState;
final DocConsumer consumer;
- private DocFieldProcessor docFieldProcessor;
String segment; // Current segment we are working on
boolean aborting; // True if an abort is pending
@@ -160,10 +179,7 @@ public class DocumentsWriterPerThread {
this.docState.similarityProvider = parent.indexWriter.getConfig().getSimilarityProvider();
consumer = indexingChain.getChain(this);
- if (consumer instanceof DocFieldProcessor) {
- docFieldProcessor = (DocFieldProcessor) consumer;
}
- }
void setAborting() {
aborting = true;
@@ -175,7 +191,7 @@ public class DocumentsWriterPerThread {
docState.analyzer = analyzer;
docState.docID = numDocsInRAM;
if (delTerm != null) {
- pendingDeletes.addTerm(delTerm, docState.docID);
+ pendingDeletes.addTerm(delTerm, numDocsInRAM);
}
if (segment == null) {
@@ -186,7 +202,11 @@ public class DocumentsWriterPerThread {
boolean success = false;
try {
+ try {
consumer.processDocument(fieldInfos);
+ } finally {
+ docState.clear();
+ }
success = true;
} finally {
@@ -230,16 +250,20 @@ public class DocumentsWriterPerThread {
}
void deleteQueries(Query... queries) {
+ if (numDocsInRAM > 0) {
for (Query query : queries) {
pendingDeletes.addQuery(query, numDocsInRAM);
}
}
+ }
void deleteTerms(Term... terms) {
+ if (numDocsInRAM > 0) {
for (Term term : terms) {
pendingDeletes.addTerm(term, numDocsInRAM);
}
}
+ }
int getNumDocsInRAM() {
return numDocsInRAM;
@@ -254,12 +278,12 @@ public class DocumentsWriterPerThread {
segment = null;
consumer.doAfterFlush();
fieldInfos = fieldInfos.newFieldInfosWithGlobalFieldNumberMap();
- parent.substractFlushedNumDocs(numDocsInRAM);
+ parent.subtractFlushedNumDocs(numDocsInRAM);
numDocsInRAM = 0;
}
/** Flush all pending docs to a new segment */
- SegmentInfo flush() throws IOException {
+ FlushedSegment flush() throws IOException {
assert numDocsInRAM > 0;
flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
@@ -295,6 +319,7 @@ public class DocumentsWriterPerThread {
SegmentInfo newSegment = new SegmentInfo(segment, flushState.numDocs, directory, false, flushState.segmentCodecs, fieldInfos);
consumer.flush(flushState);
+ pendingDeletes.terms.clear();
newSegment.clearFilesCache();
if (infoStream != null) {
@@ -305,11 +330,19 @@ public class DocumentsWriterPerThread {
}
flushedDocCount += flushState.numDocs;
+ BufferedDeletes segmentDeletes = null;
+ if (pendingDeletes.queries.isEmpty()) {
+ pendingDeletes.clear();
+ } else {
+ segmentDeletes = pendingDeletes;
+ pendingDeletes = new BufferedDeletes(false);
+ }
+
doAfterFlush();
success = true;
- return newSegment;
+ return new FlushedSegment(newSegment, segmentDeletes, flushState.deletedDocs);
} finally {
if (!success) {
if (segment != null) {
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java?rev=1074414&r1=1074413&r2=1074414&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java Fri Feb 25 07:15:28 2011
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHa
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor;
import org.apache.lucene.index.codecs.CodecProvider;
@@ -2080,8 +2081,10 @@ public class IndexWriter implements Clos
deleter.checkpoint(segmentInfos, false);
}
- void addFlushedSegment(SegmentInfo newSegment, BitVector deletedDocs) throws IOException {
- assert newSegment != null;
+ void addFlushedSegment(FlushedSegment flushedSegment) throws IOException {
+ assert flushedSegment != null;
+
+ SegmentInfo newSegment = flushedSegment.segmentInfo;
setDiagnostics(newSegment, "flush");
@@ -2107,8 +2110,8 @@ public class IndexWriter implements Clos
// Must write deleted docs after the CFS so we don't
// slurp the del file into CFS:
- if (deletedDocs != null) {
- final int delCount = deletedDocs.count();
+ if (flushedSegment.deletedDocuments != null) {
+ final int delCount = flushedSegment.deletedDocuments.count();
assert delCount > 0;
newSegment.setDelCount(delCount);
newSegment.advanceDelGen();
@@ -2123,7 +2126,7 @@ public class IndexWriter implements Clos
// shortly-to-be-opened SegmentReader and let it
// carry the changes; there's no reason to use
// filesystem as intermediary here.
- deletedDocs.write(directory, delFileName);
+ flushedSegment.deletedDocuments.write(directory, delFileName);
success2 = true;
} finally {
if (!success2) {
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java?rev=1074414&r1=1074413&r2=1074414&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java Fri Feb 25 07:15:28 2011
@@ -71,7 +71,6 @@ final class TermVectorsTermsWriterPerFie
if (doVectors) {
termsWriter.hasVectors = true;
- if (termsWriter.tvx != null) {
if (termsHashPerField.bytesHash.size() != 0) {
// Only necessary if previous doc hit a
// non-aborting exception while writing vectors in
@@ -79,7 +78,6 @@ final class TermVectorsTermsWriterPerFie
termsHashPerField.reset();
}
}
- }
// TODO: only if needed for performance
//perThread.postingsCount = 0;
Modified: lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java?rev=1074414&r1=1074413&r2=1074414&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java Fri Feb 25 07:15:28 2011
@@ -19,6 +19,7 @@ package org.apache.lucene.index;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.*;
+import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.*;
import org.apache.lucene.util.*;
import org.junit.Test;
@@ -48,7 +49,21 @@ public class TestRollingUpdates extends
id++;
}
doc.getField("id").setValue(myID);
- w.updateDocument(new Term("id", myID), doc);
+ int mode = docIter % 3;
+ switch (mode) {
+ case 0: {
+ w.deleteDocuments(new Term("id", myID));
+ w.addDocument(doc);
+ break;
+ }
+ case 1: {
+ w.deleteDocuments(new TermQuery(new Term("id", myID)));
+ w.addDocument(doc);
+ break;
+ }
+ default : w.updateDocument(new Term("id", myID), doc);
+ }
+
if (docIter >= SIZE && random.nextInt(50) == 17) {
if (r != null) {