You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2011/05/25 01:56:50 UTC
svn commit: r1127335 - in /lucene/dev/branches/branch_3x/lucene: ./
contrib/misc/src/java/org/apache/lucene/index/
src/java/org/apache/lucene/index/ src/test-framework/org/apache/lucene/index/
src/test/org/apache/lucene/index/
Author: mikemccand
Date: Tue May 24 23:56:49 2011
New Revision: 1127335
URL: http://svn.apache.org/viewvc?rev=1127335&view=rev
Log:
LUCENE-3112: add IW.add/updateDocuments, to atomically add a block of docs with guaranteed sequential docIDs (backport to 3.x, take 2!)
Modified:
lucene/dev/branches/branch_3x/lucene/CHANGES.txt
lucene/dev/branches/branch_3x/lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSorter.java
lucene/dev/branches/branch_3x/lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSplitter.java
lucene/dev/branches/branch_3x/lucene/contrib/misc/src/java/org/apache/lucene/index/MultiPassIndexSplitter.java
lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/IndexWriter.java
lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java
lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java
lucene/dev/branches/branch_3x/lucene/src/test-framework/org/apache/lucene/index/RandomIndexWriter.java
lucene/dev/branches/branch_3x/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
lucene/dev/branches/branch_3x/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java
Modified: lucene/dev/branches/branch_3x/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/CHANGES.txt?rev=1127335&r1=1127334&r2=1127335&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/CHANGES.txt (original)
+++ lucene/dev/branches/branch_3x/lucene/CHANGES.txt Tue May 24 23:56:49 2011
@@ -59,6 +59,10 @@ New features
document IDs and scores encountered during the search, and "replay" them to
another Collector. (Mike McCandless, Shai Erera)
+* LUCENE-3112: Added experimental IndexWriter.add/updateDocuments,
+ enabling a block of documents to be indexed, atomically, with
+ guaranteed sequential docIDs. (Mike McCandless)
+
API Changes
* LUCENE-3061: IndexWriter's getNextMerge() and merge(OneMerge) are now public
Modified: lucene/dev/branches/branch_3x/lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSorter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSorter.java?rev=1127335&r1=1127334&r2=1127335&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSorter.java (original)
+++ lucene/dev/branches/branch_3x/lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSorter.java Tue May 24 23:56:49 2011
@@ -24,6 +24,7 @@ import java.util.logging.Logger;
import org.apache.lucene.analysis.WhitespaceAnalyzer;
import org.apache.lucene.document.*;
+import org.apache.lucene.index.IndexWriter; // javadocs
import org.apache.lucene.store.*;
import org.apache.lucene.util.Version;
@@ -32,6 +33,11 @@ import org.apache.lucene.util.Version;
* specified field, which has to be single-valued and stored, with string value
* that represents a float number. Stored fields in the output index remain
* consistent, i.e. both stored fields and postings are renumbered in sync.
+ *
+ * <p><b>NOTE</b>: this tool is unaware of documents added
+ * atomically via {@link IndexWriter#addDocuments} or {@link
+ * IndexWriter#updateDocuments}, which means it can easily
+ * break up such document groups.
*/
public class IndexSorter {
private static final Logger LOG = Logger.getLogger(IndexSorter.class.getName());
Modified: lucene/dev/branches/branch_3x/lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSplitter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSplitter.java?rev=1127335&r1=1127334&r2=1127335&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSplitter.java (original)
+++ lucene/dev/branches/branch_3x/lucene/contrib/misc/src/java/org/apache/lucene/index/IndexSplitter.java Tue May 24 23:56:49 2011
@@ -26,6 +26,7 @@ import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
+import org.apache.lucene.index.IndexWriter; // javadocs
import org.apache.lucene.store.FSDirectory;
/**
@@ -44,6 +45,11 @@ import org.apache.lucene.store.FSDirecto
* @lucene.experimental You can easily
* accidentally remove segments from your index so be
* careful!
+ *
+ * <p><b>NOTE</b>: this tool is unaware of documents added
+ * atomically via {@link IndexWriter#addDocuments} or {@link
+ * IndexWriter#updateDocuments}, which means it can easily
+ * break up such document groups.
*/
public class IndexSplitter {
public SegmentInfos infos;
Modified: lucene/dev/branches/branch_3x/lucene/contrib/misc/src/java/org/apache/lucene/index/MultiPassIndexSplitter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/contrib/misc/src/java/org/apache/lucene/index/MultiPassIndexSplitter.java?rev=1127335&r1=1127334&r2=1127335&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/contrib/misc/src/java/org/apache/lucene/index/MultiPassIndexSplitter.java (original)
+++ lucene/dev/branches/branch_3x/lucene/contrib/misc/src/java/org/apache/lucene/index/MultiPassIndexSplitter.java Tue May 24 23:56:49 2011
@@ -39,6 +39,11 @@ import org.apache.lucene.util.Version;
* <p>Note 2: the disadvantage of this tool is that source index needs to be
* read as many times as there are parts to be created, hence the name of this
* tool.
+ *
+ * <p><b>NOTE</b>: this tool is unaware of documents added
+ * atomically via {@link IndexWriter#addDocuments} or {@link
+ * IndexWriter#updateDocuments}, which means it can easily
+ * break up such document groups.
*/
public class MultiPassIndexSplitter {
Modified: lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1127335&r1=1127334&r2=1127335&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Tue May 24 23:56:49 2011
@@ -539,8 +539,9 @@ final class DocumentsWriter {
SegmentInfo newSegment;
try {
+ //System.out.println(Thread.currentThread().getName() + ": nw=" + waitQueue.numWaiting);
assert nextDocID == numDocs: "nextDocID=" + nextDocID + " numDocs=" + numDocs;
- assert waitQueue.numWaiting == 0;
+ assert waitQueue.numWaiting == 0: "numWaiting=" + waitQueue.numWaiting;
assert waitQueue.waitingBytes == 0;
if (infoStream != null) {
@@ -674,7 +675,7 @@ final class DocumentsWriter {
* flush is pending. If delTerm is non-null then we
* buffer this deleted term after the thread state has
* been acquired. */
- synchronized DocumentsWriterThreadState getThreadState(Document doc, Term delTerm) throws IOException {
+ synchronized DocumentsWriterThreadState getThreadState(Term delTerm, int docCount) throws IOException {
final Thread currentThread = Thread.currentThread();
assert !Thread.holdsLock(writer);
@@ -721,13 +722,14 @@ final class DocumentsWriter {
assert numDocs == 0;
}
- state.docState.docID = nextDocID++;
+ state.docState.docID = nextDocID;
+ nextDocID += docCount;
if (delTerm != null) {
pendingDeletes.addTerm(delTerm, state.docState.docID);
}
- numDocs++;
+ numDocs += docCount;
state.isIdle = false;
return state;
}
@@ -743,7 +745,7 @@ final class DocumentsWriter {
boolean doFlush = flushControl.waitUpdate(1, delTerm != null ? 1 : 0);
// This call is synchronized but fast
- final DocumentsWriterThreadState state = getThreadState(doc, delTerm);
+ final DocumentsWriterThreadState state = getThreadState(delTerm, 1);
final DocState docState = state.docState;
docState.doc = doc;
@@ -812,6 +814,160 @@ final class DocumentsWriter {
return doFlush;
}
+ boolean updateDocuments(Collection<Document> docs, Analyzer analyzer, Term delTerm)
+ throws CorruptIndexException, IOException {
+
+ // Possibly trigger a flush, or wait until any running flush completes:
+ boolean doFlush = flushControl.waitUpdate(docs.size(), delTerm != null ? 1 : 0);
+
+ final int docCount = docs.size();
+
+ // This call is synchronized but fast -- we allocate the
+ // N docIDs up front:
+ final DocumentsWriterThreadState state = getThreadState(null, docCount);
+ final DocState docState = state.docState;
+
+ final int startDocID = docState.docID;
+ int docID = startDocID;
+
+ //System.out.println(Thread.currentThread().getName() + ": A " + docCount);
+ for(Document doc : docs) {
+ docState.doc = doc;
+ docState.analyzer = analyzer;
+ // Assign next docID from our block:
+ docState.docID = docID++;
+
+ boolean success = false;
+ try {
+ // This call is not synchronized and does all the
+ // work
+ final DocWriter perDoc;
+ try {
+ perDoc = state.consumer.processDocument();
+ } finally {
+ docState.clear();
+ }
+
+ // Must call this w/o holding synchronized(this) else
+ // we'll hit deadlock:
+ balanceRAM();
+
+ // Synchronized but fast
+ synchronized(this) {
+ if (aborting) {
+ break;
+ }
+ assert perDoc == null || perDoc.docID == docState.docID;
+ final boolean doPause;
+ if (perDoc != null) {
+ doPause = waitQueue.add(perDoc);
+ } else {
+ skipDocWriter.docID = docState.docID;
+ doPause = waitQueue.add(skipDocWriter);
+ }
+ if (doPause) {
+ waitForWaitQueue();
+ }
+ }
+
+ success = true;
+ } finally {
+ if (!success) {
+ //System.out.println(Thread.currentThread().getName() + ": E");
+
+ // If this thread state had decided to flush, we
+ // must clear it so another thread can flush
+ if (doFlush) {
+ message("clearFlushPending!");
+ flushControl.clearFlushPending();
+ }
+
+ if (infoStream != null) {
+ message("exception in updateDocuments aborting=" + aborting);
+ }
+
+ synchronized(this) {
+
+ state.isIdle = true;
+ notifyAll();
+
+ if (aborting) {
+ abort();
+ } else {
+
+ // Fill hole in the doc stores for all
+ // docIDs we pre-allocated
+ //System.out.println(Thread.currentThread().getName() + ": F " + docCount);
+ final int endDocID = startDocID + docCount;
+ docID = docState.docID;
+ while(docID < endDocID) {
+ skipDocWriter.docID = docID++;
+ boolean success2 = false;
+ try {
+ waitQueue.add(skipDocWriter);
+ success2 = true;
+ } finally {
+ if (!success2) {
+ abort();
+ return false;
+ }
+ }
+ }
+ //System.out.println(Thread.currentThread().getName() + ": F " + docCount + " done");
+
+ // Mark all pre-allocated docIDs as deleted:
+ docID = startDocID;
+ while(docID < startDocID + docs.size()) {
+ deleteDocID(docID++);
+ }
+ }
+ }
+ }
+ }
+ }
+ //System.out.println(Thread.currentThread().getName() + ": A " + docCount);
+
+ synchronized(this) {
+ if (aborting) {
+
+ // We are currently aborting, and another thread is
+ // waiting for me to become idle. We just forcefully
+ // idle this threadState; it will be fully reset by
+ // abort()
+ state.isIdle = true;
+
+ // wakes up any threads waiting on the wait queue
+ notifyAll();
+
+ abort();
+
+ if (doFlush) {
+ message("clearFlushPending!");
+ flushControl.clearFlushPending();
+ }
+
+ return false;
+ }
+
+ // Apply delTerm only after all indexing has
+ // succeeded, but apply it only to docs prior to when
+ // this batch started:
+ if (delTerm != null) {
+ pendingDeletes.addTerm(delTerm, startDocID);
+ }
+
+ state.isIdle = true;
+
+ // wakes up any threads waiting on the wait queue
+ notifyAll();
+ }
+
+ doFlush |= flushControl.flushByRAMUsage("new document");
+
+ //System.out.println(Thread.currentThread().getName() + ": B " + docCount);
+ return doFlush;
+ }
+
public synchronized void waitIdle() {
while (!allThreadsIdle()) {
try {
Modified: lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/IndexWriter.java?rev=1127335&r1=1127334&r2=1127335&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/IndexWriter.java Tue May 24 23:56:49 2011
@@ -2064,6 +2064,115 @@ public class IndexWriter implements Clos
}
/**
+ * Atomically adds a block of documents with sequentially
+ * assigned document IDs, such that an external reader
+ * will see all or none of the documents.
+ *
+ * <p><b>WARNING</b>: the index does not currently record
+ * which documents were added as a block. Today this is
+ * fine, because merging will preserve the block (as long
+ * as none them were deleted). But it's possible in the
+ * future that Lucene may more aggressively re-order
+ * documents (for example, perhaps to obtain better index
+ * compression), in which case you may need to fully
+ * re-index your documents at that time.
+ *
+ * <p>See {@link #addDocument(Document)} for details on
+ * index and IndexWriter state after an Exception, and
+ * flushing/merging temporary free space requirements.</p>
+ *
+ * <p><b>NOTE</b>: tools that do offline splitting of an index
+ * (for example, IndexSplitter in contrib) or
+ * re-sorting of documents (for example, IndexSorter in
+ * contrib) are not aware of these atomically added documents
+ * and will likely break them up. Use such tools at your
+ * own risk!
+ *
+ * <p><b>NOTE</b>: if this method hits an OutOfMemoryError
+ * you should immediately close the writer. See <a
+ * href="#OOME">above</a> for details.</p>
+ *
+ * @throws CorruptIndexException if the index is corrupt
+ * @throws IOException if there is a low-level IO error
+ *
+ * @lucene.experimental
+ */
+ public void addDocuments(Collection<Document> docs) throws CorruptIndexException, IOException {
+ // TODO: if we backport DWPT we should change arg to Iterable<Document>
+ addDocuments(docs, analyzer);
+ }
+
+ /**
+ * Atomically adds a block of documents, analyzed using the
+ * provided analyzer, with sequentially assigned document
+ * IDs, such that an external reader will see all or none
+ * of the documents.
+ *
+ * @throws CorruptIndexException if the index is corrupt
+ * @throws IOException if there is a low-level IO error
+ *
+ * @lucene.experimental
+ */
+ public void addDocuments(Collection<Document> docs, Analyzer analyzer) throws CorruptIndexException, IOException {
+ // TODO: if we backport DWPT we should change arg to Iterable<Document>
+ updateDocuments(null, docs, analyzer);
+ }
+
+ /**
+ * Atomically deletes documents matching the provided
+ * delTerm and adds a block of documents with sequentially
+ * assigned document IDs, such that an external reader
+ * will see all or none of the documents.
+ *
+ * See {@link #addDocuments(Collection)}.
+ *
+ * @throws CorruptIndexException if the index is corrupt
+ * @throws IOException if there is a low-level IO error
+ *
+ * @lucene.experimental
+ */
+ public void updateDocuments(Term delTerm, Collection<Document> docs) throws CorruptIndexException, IOException {
+ // TODO: if we backport DWPT we should change arg to Iterable<Document>
+ updateDocuments(delTerm, docs, analyzer);
+ }
+
+ /**
+ * Atomically deletes documents matching the provided
+ * delTerm and adds a block of documents, analyzed using
+ * the provided analyzer, with sequentially
+ * assigned document IDs, such that an external reader
+ * will see all or none of the documents.
+ *
+ * See {@link #addDocuments(Collection)}.
+ *
+ * @throws CorruptIndexException if the index is corrupt
+ * @throws IOException if there is a low-level IO error
+ *
+ * @lucene.experimental
+ */
+ public void updateDocuments(Term delTerm, Collection<Document> docs, Analyzer analyzer) throws CorruptIndexException, IOException {
+ // TODO: if we backport DWPT we should change arg to Iterable<Document>
+ ensureOpen();
+ try {
+ boolean success = false;
+ boolean doFlush = false;
+ try {
+ doFlush = docWriter.updateDocuments(docs, analyzer, delTerm);
+ success = true;
+ } finally {
+ if (!success && infoStream != null) {
+ message("hit exception updating document");
+ }
+ }
+ if (doFlush) {
+ flush(true, false);
+ }
+ } catch (OutOfMemoryError oom) {
+ handleOOM(oom, "updateDocuments");
+ }
+ }
+
+ /**
* Deletes the document(s) containing <code>term</code>.
*
* <p><b>NOTE</b>: if this method hits an OutOfMemoryError
Modified: lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java?rev=1127335&r1=1127334&r2=1127335&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java (original)
+++ lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java Tue May 24 23:56:49 2011
@@ -159,7 +159,7 @@ final class TermVectorsTermsWriter exten
perDoc.numVectorFields = 0;
}
- assert lastDocID == perDoc.docID;
+ assert lastDocID == perDoc.docID: "lastDocID=" + lastDocID + " perDoc.docID=" + perDoc.docID;
lastDocID++;
Modified: lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java?rev=1127335&r1=1127334&r2=1127335&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java (original)
+++ lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java Tue May 24 23:56:49 2011
@@ -160,7 +160,7 @@ final class TermsHashPerField extends In
}
}
- assert upto == numPostings;
+ assert upto == numPostings: "upto=" + upto + " numPostings=" + numPostings;
postingsCompacted = true;
}
Modified: lucene/dev/branches/branch_3x/lucene/src/test-framework/org/apache/lucene/index/RandomIndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/src/test-framework/org/apache/lucene/index/RandomIndexWriter.java?rev=1127335&r1=1127334&r2=1127335&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/src/test-framework/org/apache/lucene/index/RandomIndexWriter.java (original)
+++ lucene/dev/branches/branch_3x/lucene/src/test-framework/org/apache/lucene/index/RandomIndexWriter.java Tue May 24 23:56:49 2011
@@ -19,6 +19,9 @@ package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import java.util.Random;
import org.apache.lucene.analysis.Analyzer;
@@ -95,11 +98,33 @@ public class RandomIndexWriter implement
* Adds a Document.
* @see IndexWriter#addDocument(Document)
*/
- public void addDocument(Document doc) throws IOException {
- w.addDocument(doc);
+ public void addDocument(final Document doc) throws IOException {
+ if (r.nextInt(5) == 3) {
+ // TODO: maybe, we should simply buffer up added docs
+ // (but we need to clone them), and only when
+ // getReader, commit, etc. are called, we do an
+ // addDocuments? Would be better testing.
+ w.addDocuments(Collections.singletonList(doc));
+ } else {
+ w.addDocument(doc);
+ }
+ maybeCommit();
+ }
+
+ public void addDocuments(Collection<Document> docs) throws IOException {
+ w.addDocuments(docs);
+ maybeCommit();
+ }
+
+ public void updateDocuments(Term delTerm, Collection<Document> docs) throws IOException {
+ w.updateDocuments(delTerm, docs);
+ maybeCommit();
+ }
+
+ private void maybeCommit() throws IOException {
if (docCount++ == flushAt) {
if (LuceneTestCase.VERBOSE) {
- System.out.println("RIW.addDocument: now doing a commit at docCount=" + docCount);
+ System.out.println("RIW.add/updateDocument: now doing a commit at docCount=" + docCount);
}
w.commit();
flushAt += _TestUtil.nextInt(r, (int) (flushAtFactor * 10), (int) (flushAtFactor * 1000));
@@ -109,16 +134,18 @@ public class RandomIndexWriter implement
}
}
}
-
- public void updateDocument(Term t, Document doc) throws IOException {
- w.updateDocument(t, doc);
- if (docCount++ == flushAt) {
- if (LuceneTestCase.VERBOSE) {
- System.out.println("RIW.updateDocument: now doing a commit");
- }
- w.commit();
- flushAt += _TestUtil.nextInt(r, 10, 1000);
+
+ /**
+ * Updates a document.
+ * @see IndexWriter#updateDocument(Term, Document)
+ */
+ public void updateDocument(Term t, final Document doc) throws IOException {
+ if (r.nextInt(5) == 3) {
+ w.updateDocuments(t, Collections.singletonList(doc));
+ } else {
+ w.updateDocument(t, doc);
}
+ maybeCommit();
}
public void addIndexes(Directory... dirs) throws CorruptIndexException, IOException {
Modified: lucene/dev/branches/branch_3x/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java?rev=1127335&r1=1127334&r2=1127335&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java (original)
+++ lucene/dev/branches/branch_3x/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java Tue May 24 23:56:49 2011
@@ -17,13 +17,14 @@ package org.apache.lucene.index;
* limitations under the License.
*/
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.io.Reader;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util._TestUtil;
@@ -33,15 +34,15 @@ import org.apache.lucene.store.IndexOutp
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.LowerCaseTokenizer;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.analysis.MockTokenizer;
import org.apache.lucene.analysis.TokenFilter;
import org.apache.lucene.analysis.TokenStream;
-import org.apache.lucene.analysis.WhitespaceTokenizer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.PhraseQuery;
public class TestIndexWriterExceptions extends LuceneTestCase {
@@ -86,7 +87,16 @@ public class TestIndexWriterExceptions e
idField.setValue(id);
Term idTerm = new Term("id", id);
try {
- writer.updateDocument(idTerm, doc);
+ if (r.nextBoolean()) {
+ final List<Document> docs = new ArrayList<Document>();
+ final int count = _TestUtil.nextInt(r, 1, 20);
+ for(int c=0;c<count;c++) {
+ docs.add(doc);
+ }
+ writer.updateDocuments(idTerm, docs);
+ } else {
+ writer.updateDocument(idTerm, doc);
+ }
} catch (RuntimeException re) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": EXC: ");
@@ -135,7 +145,7 @@ public class TestIndexWriterExceptions e
@Override
boolean testPoint(String name) {
- if (doFail.get() != null && !name.equals("startDoFlush") && r.nextInt(20) == 17) {
+ if (doFail.get() != null && !name.equals("startDoFlush") && r.nextInt(40) == 17) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": NOW FAIL: " + name);
new Throwable().printStackTrace(System.out);
@@ -265,6 +275,8 @@ public class TestIndexWriterExceptions e
}
}
+ private static String CRASH_FAIL_MESSAGE = "I'm experiencing problems";
+
private class CrashingFilter extends TokenFilter {
String fieldName;
int count;
@@ -277,7 +289,7 @@ public class TestIndexWriterExceptions e
@Override
public boolean incrementToken() throws IOException {
if (this.fieldName.equals("crash") && count++ >= 4)
- throw new IOException("I'm experiencing problems");
+ throw new IOException(CRASH_FAIL_MESSAGE);
return input.incrementToken();
}
@@ -1146,4 +1158,141 @@ public class TestIndexWriterExceptions e
writer.close();
dir.close();
}
+
+ public void testAddDocsNonAbortingException() throws Exception {
+ final Directory dir = newDirectory();
+ final RandomIndexWriter w = new RandomIndexWriter(random, dir);
+ final int numDocs1 = random.nextInt(25);
+ for(int docCount=0;docCount<numDocs1;docCount++) {
+ Document doc = new Document();
+ doc.add(newField("content", "good content", Field.Index.ANALYZED));
+ w.addDocument(doc);
+ }
+
+ final List<Document> docs = new ArrayList<Document>();
+ for(int docCount=0;docCount<7;docCount++) {
+ Document doc = new Document();
+ docs.add(doc);
+ doc.add(newField("id", docCount+"", Field.Index.NOT_ANALYZED));
+ doc.add(newField("content", "silly content " + docCount, Field.Index.ANALYZED));
+ if (docCount == 4) {
+ Field f = newField("crash", "", Field.Index.ANALYZED);
+ doc.add(f);
+ MockTokenizer tokenizer = new MockTokenizer(new StringReader("crash me on the 4th token"), MockTokenizer.WHITESPACE, false);
+ tokenizer.setEnableChecks(false); // disable workflow checking as we forcefully close() in exceptional cases.
+ f.setTokenStream(new CrashingFilter("crash", tokenizer));
+ }
+ }
+ try {
+ w.addDocuments(docs);
+ // BUG: CrashingFilter didn't
+ fail("did not hit expected exception");
+ } catch (IOException ioe) {
+ // expected
+ assertEquals(CRASH_FAIL_MESSAGE, ioe.getMessage());
+ }
+
+ final int numDocs2 = random.nextInt(25);
+ for(int docCount=0;docCount<numDocs2;docCount++) {
+ Document doc = new Document();
+ doc.add(newField("content", "good content", Field.Index.ANALYZED));
+ w.addDocument(doc);
+ }
+
+ final IndexReader r = w.getReader();
+ w.close();
+
+ final IndexSearcher s = new IndexSearcher(r);
+ PhraseQuery pq = new PhraseQuery();
+ pq.add(new Term("content", "silly"));
+ pq.add(new Term("content", "content"));
+ assertEquals(0, s.search(pq, 1).totalHits);
+
+ pq = new PhraseQuery();
+ pq.add(new Term("content", "good"));
+ pq.add(new Term("content", "content"));
+ assertEquals(numDocs1+numDocs2, s.search(pq, 1).totalHits);
+ r.close();
+ dir.close();
+ }
+
+
+ public void testUpdateDocsNonAbortingException() throws Exception {
+ final Directory dir = newDirectory();
+ final RandomIndexWriter w = new RandomIndexWriter(random, dir);
+ final int numDocs1 = random.nextInt(25);
+ for(int docCount=0;docCount<numDocs1;docCount++) {
+ Document doc = new Document();
+ doc.add(newField("content", "good content", Field.Index.ANALYZED));
+ w.addDocument(doc);
+ }
+
+ // Use addDocs (no exception) to get docs in the index:
+ final List<Document> docs = new ArrayList<Document>();
+ final int numDocs2 = random.nextInt(25);
+ for(int docCount=0;docCount<numDocs2;docCount++) {
+ Document doc = new Document();
+ docs.add(doc);
+ doc.add(newField("subid", "subs", Field.Index.NOT_ANALYZED));
+ doc.add(newField("id", docCount+"", Field.Index.NOT_ANALYZED));
+ doc.add(newField("content", "silly content " + docCount, Field.Index.ANALYZED));
+ }
+ w.addDocuments(docs);
+
+ final int numDocs3 = random.nextInt(25);
+ for(int docCount=0;docCount<numDocs3;docCount++) {
+ Document doc = new Document();
+ doc.add(newField("content", "good content", Field.Index.ANALYZED));
+ w.addDocument(doc);
+ }
+
+ docs.clear();
+ final int limit = _TestUtil.nextInt(random, 2, 25);
+ final int crashAt = random.nextInt(limit);
+ for(int docCount=0;docCount<limit;docCount++) {
+ Document doc = new Document();
+ docs.add(doc);
+ doc.add(newField("id", docCount+"", Field.Index.NOT_ANALYZED));
+ doc.add(newField("content", "silly content " + docCount, Field.Index.ANALYZED));
+ if (docCount == crashAt) {
+ Field f = newField("crash", "", Field.Index.ANALYZED);
+ doc.add(f);
+ MockTokenizer tokenizer = new MockTokenizer(new StringReader("crash me on the 4th token"), MockTokenizer.WHITESPACE, false);
+ tokenizer.setEnableChecks(false); // disable workflow checking as we forcefully close() in exceptional cases.
+ f.setTokenStream(new CrashingFilter("crash", tokenizer));
+ }
+ }
+
+ try {
+ w.updateDocuments(new Term("subid", "subs"), docs);
+ // BUG: CrashingFilter didn't
+ fail("did not hit expected exception");
+ } catch (IOException ioe) {
+ // expected
+ assertEquals(CRASH_FAIL_MESSAGE, ioe.getMessage());
+ }
+
+ final int numDocs4 = random.nextInt(25);
+ for(int docCount=0;docCount<numDocs4;docCount++) {
+ Document doc = new Document();
+ doc.add(newField("content", "good content", Field.Index.ANALYZED));
+ w.addDocument(doc);
+ }
+
+ final IndexReader r = w.getReader();
+ w.close();
+
+ final IndexSearcher s = new IndexSearcher(r);
+ PhraseQuery pq = new PhraseQuery();
+ pq.add(new Term("content", "silly"));
+ pq.add(new Term("content", "content"));
+ assertEquals(numDocs2, s.search(pq, 1).totalHits);
+
+ pq = new PhraseQuery();
+ pq.add(new Term("content", "good"));
+ pq.add(new Term("content", "content"));
+ assertEquals(numDocs1+numDocs3+numDocs4, s.search(pq, 1).totalHits);
+ r.close();
+ dir.close();
+ }
}
Modified: lucene/dev/branches/branch_3x/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java?rev=1127335&r1=1127334&r2=1127335&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java (original)
+++ lucene/dev/branches/branch_3x/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java Tue May 24 23:56:49 2011
@@ -21,25 +21,27 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.HashSet;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Fieldable;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.PhraseQuery;
import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
-import org.apache.lucene.document.Field;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.LineFileDocs;
@@ -53,6 +55,39 @@ import org.junit.Test;
public class TestNRTThreads extends LuceneTestCase {
+ private static class SubDocs {
+ public final String packID;
+ public final List<String> subIDs;
+ public boolean deleted;
+
+ public SubDocs(String packID, List<String> subIDs) {
+ this.packID = packID;
+ this.subIDs = subIDs;
+ }
+ }
+
+ // TODO: is there a pre-existing way to do this!!!
+ private Document cloneDoc(Document doc1) {
+ final Document doc2 = new Document();
+ for(Fieldable f : doc1.getFields()) {
+ Field field1 = (Field) f;
+
+ Field field2 = new Field(field1.name(),
+ field1.stringValue(),
+ field1.isStored() ? Field.Store.YES : Field.Store.NO,
+ field1.isIndexed() ? (field1.isTokenized() ? Field.Index.ANALYZED : Field.Index.NOT_ANALYZED) : Field.Index.NO);
+ if (field1.getOmitNorms()) {
+ field2.setOmitNorms(true);
+ }
+ if (field1.getOmitTermFreqAndPositions()) {
+ field2.setOmitTermFreqAndPositions(true);
+ }
+ doc2.add(field2);
+ }
+
+ return doc2;
+ }
+
@Test
public void testNRTThreads() throws Exception {
@@ -96,13 +131,16 @@ public class TestNRTThreads extends Luce
final int NUM_INDEX_THREADS = 2;
final int NUM_SEARCH_THREADS = 3;
+
final int RUN_TIME_SEC = LuceneTestCase.TEST_NIGHTLY ? 300 : 5;
final AtomicBoolean failed = new AtomicBoolean();
final AtomicInteger addCount = new AtomicInteger();
final AtomicInteger delCount = new AtomicInteger();
+ final AtomicInteger packCount = new AtomicInteger();
final Set<String> delIDs = Collections.synchronizedSet(new HashSet<String>());
+ final List<SubDocs> allSubDocs = Collections.synchronizedList(new ArrayList<SubDocs>());
final long stopTime = System.currentTimeMillis() + RUN_TIME_SEC*1000;
Thread[] threads = new Thread[NUM_INDEX_THREADS];
@@ -110,7 +148,9 @@ public class TestNRTThreads extends Luce
threads[thread] = new Thread() {
@Override
public void run() {
+ // TODO: would be better if this were cross thread, so that we make sure one thread deleting anothers added docs works:
final List<String> toDeleteIDs = new ArrayList<String>();
+ final List<SubDocs> toDeleteSubDocs = new ArrayList<SubDocs>();
while(System.currentTimeMillis() < stopTime && !failed.get()) {
try {
Document doc = docs.nextDoc();
@@ -128,7 +168,92 @@ public class TestNRTThreads extends Luce
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": add doc id:" + doc.get("docid"));
}
- writer.addDocument(doc);
+
+ if (random.nextBoolean()) {
+ // Add a pack of adjacent sub-docs
+ final String packID;
+ final SubDocs delSubDocs;
+ if (toDeleteSubDocs.size() > 0 && random.nextBoolean()) {
+ delSubDocs = toDeleteSubDocs.get(random.nextInt(toDeleteSubDocs.size()));
+ assert !delSubDocs.deleted;
+ toDeleteSubDocs.remove(delSubDocs);
+ // reuse prior packID
+ packID = delSubDocs.packID;
+ } else {
+ delSubDocs = null;
+ // make new packID
+ packID = packCount.getAndIncrement() + "";
+ }
+
+ final Field packIDField = newField("packID", packID, Field.Store.YES, Field.Index.NOT_ANALYZED);
+ final List<String> docIDs = new ArrayList<String>();
+ final SubDocs subDocs = new SubDocs(packID, docIDs);
+ final List<Document> docsList = new ArrayList<Document>();
+
+ allSubDocs.add(subDocs);
+ doc.add(packIDField);
+ docsList.add(cloneDoc(doc));
+ docIDs.add(doc.get("docid"));
+
+ final int maxDocCount = _TestUtil.nextInt(random, 1, 10);
+ while(docsList.size() < maxDocCount) {
+ doc = docs.nextDoc();
+ if (doc == null) {
+ break;
+ }
+ docsList.add(cloneDoc(doc));
+ docIDs.add(doc.get("docid"));
+ }
+ addCount.addAndGet(docsList.size());
+
+ if (delSubDocs != null) {
+ delSubDocs.deleted = true;
+ delIDs.addAll(delSubDocs.subIDs);
+ delCount.addAndGet(delSubDocs.subIDs.size());
+ if (VERBOSE) {
+ System.out.println("TEST: update pack packID=" + delSubDocs.packID + " count=" + docsList.size() + " docs=" + docIDs);
+ }
+ writer.updateDocuments(new Term("packID", delSubDocs.packID), docsList);
+ /*
+ // non-atomic:
+ writer.deleteDocuments(new Term("packID", delSubDocs.packID));
+ for(Document subDoc : docsList) {
+ writer.addDocument(subDoc);
+ }
+ */
+ } else {
+ if (VERBOSE) {
+ System.out.println("TEST: add pack packID=" + packID + " count=" + docsList.size() + " docs=" + docIDs);
+ }
+ writer.addDocuments(docsList);
+
+ /*
+ // non-atomic:
+ for(Document subDoc : docsList) {
+ writer.addDocument(subDoc);
+ }
+ */
+ }
+ doc.removeField("packID");
+
+ if (random.nextInt(5) == 2) {
+ if (VERBOSE) {
+ //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + packID);
+ }
+ toDeleteSubDocs.add(subDocs);
+ }
+
+ } else {
+ writer.addDocument(doc);
+ addCount.getAndIncrement();
+
+ if (random.nextInt(5) == 3) {
+ if (VERBOSE) {
+ //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
+ }
+ toDeleteIDs.add(doc.get("docid"));
+ }
+ }
} else {
// we use update but it never replaces a
// prior doc
@@ -136,14 +261,17 @@ public class TestNRTThreads extends Luce
System.out.println(Thread.currentThread().getName() + ": update doc id:" + doc.get("docid"));
}
writer.updateDocument(new Term("docid", doc.get("docid")), doc);
- }
- if (random.nextInt(5) == 3) {
- if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
+ addCount.getAndIncrement();
+
+ if (random.nextInt(5) == 3) {
+ if (VERBOSE) {
+ //System.out.println(Thread.currentThread().getName() + ": buffer del id:" + doc.get("docid"));
+ }
+ toDeleteIDs.add(doc.get("docid"));
}
- toDeleteIDs.add(doc.get("docid"));
}
- if (random.nextInt(50) == 17) {
+
+ if (random.nextInt(30) == 17) {
if (VERBOSE) {
System.out.println(Thread.currentThread().getName() + ": apply " + toDeleteIDs.size() + " deletes");
}
@@ -159,8 +287,19 @@ public class TestNRTThreads extends Luce
}
delIDs.addAll(toDeleteIDs);
toDeleteIDs.clear();
+
+ for(SubDocs subDocs : toDeleteSubDocs) {
+ assert !subDocs.deleted;
+ writer.deleteDocuments(new Term("packID", subDocs.packID));
+ subDocs.deleted = true;
+ if (VERBOSE) {
+ System.out.println(" del subs: " + subDocs.subIDs + " packID=" + subDocs.packID);
+ }
+ delIDs.addAll(subDocs.subIDs);
+ delCount.addAndGet(subDocs.subIDs.size());
+ }
+ toDeleteSubDocs.clear();
}
- addCount.getAndIncrement();
if (addedField != null) {
doc.removeField(addedField);
}
@@ -331,7 +470,7 @@ public class TestNRTThreads extends Luce
if (VERBOSE) {
System.out.println("TEST: done join [" + (System.currentTimeMillis()-t0) + " ms]; addCount=" + addCount + " delCount=" + delCount);
}
-
+
final IndexReader r2 = writer.getReader();
final IndexSearcher s = newSearcher(r2);
boolean doFail = false;
@@ -342,6 +481,43 @@ public class TestNRTThreads extends Luce
doFail = true;
}
}
+
+ // Make sure each group of sub-docs are still in docID order:
+ for(SubDocs subDocs : allSubDocs) {
+ if (!subDocs.deleted) {
+ // We sort by relevance but the scores should be identical so sort falls back to by docID:
+ TopDocs hits = s.search(new TermQuery(new Term("packID", subDocs.packID)), 20);
+ assertEquals(subDocs.subIDs.size(), hits.totalHits);
+ int lastDocID = -1;
+ int startDocID = -1;
+ for(ScoreDoc scoreDoc : hits.scoreDocs) {
+ final int docID = scoreDoc.doc;
+ if (lastDocID != -1) {
+ assertEquals(1+lastDocID, docID);
+ } else {
+ startDocID = docID;
+ }
+ lastDocID = docID;
+ final Document doc = s.doc(docID);
+ assertEquals(subDocs.packID, doc.get("packID"));
+ }
+
+ lastDocID = startDocID - 1;
+ for(String subID : subDocs.subIDs) {
+ hits = s.search(new TermQuery(new Term("docid", subID)), 1);
+ assertEquals(1, hits.totalHits);
+ final int docID = hits.scoreDocs[0].doc;
+ if (lastDocID != -1) {
+ assertEquals(1+lastDocID, docID);
+ }
+ lastDocID = docID;
+ }
+ } else {
+ for(String subID : subDocs.subIDs) {
+ assertEquals(0, s.search(new TermQuery(new Term("docid", subID)), 1).totalHits);
+ }
+ }
+ }
final int endID = Integer.parseInt(docs.nextDoc().get("docid"));
for(int id=0;id<endID;id++) {