You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-commits@lucene.apache.org by mi...@apache.org on 2008/01/03 16:49:50 UTC
svn commit: r608534 - in /lucene/java/trunk/src:
java/org/apache/lucene/index/DocumentsWriter.java
java/org/apache/lucene/index/IndexWriter.java
test/org/apache/lucene/index/TestIndexWriter.java
Author: mikemccand
Date: Thu Jan 3 07:49:50 2008
New Revision: 608534
URL: http://svn.apache.org/viewvc?rev=608534&view=rev
Log:
LUCENE-1112: skip immense terms and mark a doc for deletion if it hits a non-aborting exception
Modified:
lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java
lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java
lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java
Modified: lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=608534&r1=608533&r2=608534&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java Thu Jan 3 07:49:50 2008
@@ -97,6 +97,28 @@
* threads and flush only once they are all idle. This
* means you can call flush with a given thread even while
* other threads are actively adding/deleting documents.
+ *
+ *
+ * Exceptions:
+ *
+ * Because this class directly updates in-memory posting
+ * lists, and flushes stored fields and term vectors
+ * directly to files in the directory, there are certain
+ * limited times when an exception can corrupt this state.
+ * For example, a disk full while flushing stored fields
+ * leaves this file in a corrupt state. Or, an OOM
+ * exception while appending to the in-memory posting lists
+ * can corrupt that posting list. We call such exceptions
+ * "aborting exceptions". In these cases we must call
+ * abort() to discard all docs added since the last flush.
+ *
+ * All other exceptions ("non-aborting exceptions") can
+ * still partially update the index structures. These
+ * updates are consistent, but, they represent only a part
+ * of the document seen up until the exception was hit.
+ * When this happens, we immediately mark the document as
+ * deleted so that the document is always atomically ("all
+ * or none") added to the index.
*/
final class DocumentsWriter {
@@ -137,6 +159,9 @@
private HashMap bufferedDeleteTerms = new HashMap();
private int numBufferedDeleteTerms = 0;
+ // Currently used only for deleting a doc on hitting an non-aborting exception
+ private List bufferedDeleteDocIDs = new ArrayList();
+
// The max number of delete terms that can be buffered before
// they must be flushed to disk.
private int maxBufferedDeleteTerms = IndexWriter.DEFAULT_MAX_BUFFERED_DELETE_TERMS;
@@ -155,6 +180,7 @@
private static int OBJECT_HEADER_BYTES = 12;
private static int OBJECT_POINTER_BYTES = 4; // TODO: should be 8 on 64-bit platform
private static int BYTES_PER_CHAR = 2;
+ private static int BYTES_PER_INT = 4;
private BufferedNorms[] norms = new BufferedNorms[0]; // Holds norms until we flush
@@ -311,6 +337,7 @@
pauseAllThreads();
bufferedDeleteTerms.clear();
+ bufferedDeleteDocIDs.clear();
numBufferedDeleteTerms = 0;
try {
@@ -522,7 +549,8 @@
int numAllFieldData;
FieldData[] fieldDataHash; // Hash FieldData instances by field name
int fieldDataHashMask;
- int maxTermHit; // Set to > 0 if this doc has a too-large term
+ String maxTermPrefix; // Non-null prefix of a too-large term if this
+ // doc has one
boolean doFlushAfter;
boolean abortOnExc;
@@ -623,7 +651,7 @@
numStoredFields = 0;
numFieldData = 0;
numVectorFields = 0;
- maxTermHit = 0;
+ maxTermPrefix = null;
assert 0 == fdtLocal.length();
assert 0 == tvfLocal.length();
@@ -717,12 +745,10 @@
}
if (field.isTermVectorStored()) {
- if (!fp.doVectors) {
- if (numVectorFields++ == vectorFieldPointers.length) {
- final int newSize = (int) (numVectorFields*1.5);
- vectorFieldPointers = new long[newSize];
- vectorFieldNumbers = new int[newSize];
- }
+ if (!fp.doVectors && numVectorFields++ == vectorFieldPointers.length) {
+ final int newSize = (int) (numVectorFields*1.5);
+ vectorFieldPointers = new long[newSize];
+ vectorFieldNumbers = new int[newSize];
}
fp.doVectors = true;
docHasVectors = true;
@@ -989,6 +1015,9 @@
for(int i=0;i<numFields;i++)
fieldDataArray[i].processField(analyzer);
+ if (maxTermPrefix != null && infoStream != null)
+ infoStream.println("WARNING: document contains at least one immense term (longer than the max length " + MAX_TERM_LENGTH + "), all of which were skipped. Please correct the analyzer to not produce such terms. The prefix of the first immense term is: '" + maxTermPrefix + "...'");
+
if (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH
&& numBytesUsed > 0.95 * ramBufferSize)
balanceRAM();
@@ -1553,11 +1582,17 @@
final int textLen1 = 1+tokenTextLen;
if (textLen1 + charPool.byteUpto > CHAR_BLOCK_SIZE) {
if (textLen1 > CHAR_BLOCK_SIZE) {
- maxTermHit = tokenTextLen;
- // Just skip this term; we will throw an
- // exception after processing all accepted
- // terms in the doc
+ // Just skip this term, to remain as robust as
+ // possible during indexing. A TokenFilter
+ // can be inserted into the analyzer chain if
+ // other behavior is wanted (pruning the term
+ // to a prefix, throwing an exception, etc).
abortOnExc = false;
+ if (maxTermPrefix == null)
+ maxTermPrefix = new String(tokenText, 0, 30);
+
+ // Still increment position:
+ position++;
return;
}
charPool.nextBuffer();
@@ -2267,29 +2302,27 @@
/** Returns true if the caller (IndexWriter) should now
* flush. */
- int addDocument(Document doc, Analyzer analyzer)
+ boolean addDocument(Document doc, Analyzer analyzer)
throws CorruptIndexException, IOException {
return updateDocument(doc, analyzer, null);
}
- int updateDocument(Term t, Document doc, Analyzer analyzer)
+ boolean updateDocument(Term t, Document doc, Analyzer analyzer)
throws CorruptIndexException, IOException {
return updateDocument(doc, analyzer, t);
}
- int updateDocument(Document doc, Analyzer analyzer, Term delTerm)
+ boolean updateDocument(Document doc, Analyzer analyzer, Term delTerm)
throws CorruptIndexException, IOException {
// This call is synchronized but fast
final ThreadState state = getThreadState(doc, delTerm);
boolean success = false;
- int maxTermHit;
try {
try {
// This call is not synchronized and does all the work
state.processDocument(analyzer);
} finally {
- maxTermHit = state.maxTermHit;
// This call is synchronized but fast
finishDocument(state);
}
@@ -2299,16 +2332,20 @@
synchronized(this) {
state.isIdle = true;
if (state.abortOnExc)
+ // Abort all buffered docs since last flush
abort();
+ else
+ // Immediately mark this document as deleted
+ // since likely it was partially added. This
+ // keeps indexing as "all or none" (atomic) when
+ // adding a document:
+ addDeleteDocID(state.docID);
notifyAll();
}
}
}
- int status = maxTermHit<<1;
- if (state.doFlushAfter || timeToFlushDeletes())
- status += 1;
- return status;
+ return state.doFlushAfter || timeToFlushDeletes();
}
synchronized int getNumBufferedDeleteTerms() {
@@ -2319,9 +2356,14 @@
return bufferedDeleteTerms;
}
+ synchronized List getBufferedDeleteDocIDs() {
+ return bufferedDeleteDocIDs;
+ }
+
// Reset buffered deletes.
- synchronized void clearBufferedDeleteTerms() throws IOException {
+ synchronized void clearBufferedDeletes() throws IOException {
bufferedDeleteTerms.clear();
+ bufferedDeleteDocIDs.clear();
numBufferedDeleteTerms = 0;
if (numBytesUsed > 0)
resetPostingsData();
@@ -2366,7 +2408,7 @@
}
synchronized boolean hasDeletes() {
- return bufferedDeleteTerms.size() > 0;
+ return bufferedDeleteTerms.size() > 0 || bufferedDeleteDocIDs.size() > 0;
}
// Number of documents a delete term applies to.
@@ -2414,6 +2456,13 @@
numBufferedDeleteTerms++;
}
+ // Buffer a specific docID for deletion. Currently only
+ // used when we hit a exception when adding a document
+ synchronized private void addDeleteDocID(int docId) {
+ bufferedDeleteDocIDs.add(new Integer(docId));
+ numBytesUsed += OBJECT_HEADER_BYTES + BYTES_PER_INT + OBJECT_POINTER_BYTES;
+ }
+
/** Does the synchronized work to finish/flush the
* inverted document. */
private synchronized void finishDocument(ThreadState state) throws IOException {
@@ -2909,6 +2958,8 @@
final static int CHAR_BLOCK_SHIFT = 14;
final static int CHAR_BLOCK_SIZE = (int) Math.pow(2.0, CHAR_BLOCK_SHIFT);
final static int CHAR_BLOCK_MASK = CHAR_BLOCK_SIZE - 1;
+
+ final static int MAX_TERM_LENGTH = CHAR_BLOCK_SIZE-1;
private ArrayList freeCharBlocks = new ArrayList();
Modified: lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java?rev=608534&r1=608533&r2=608534&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java Thu Jan 3 07:49:50 2008
@@ -241,6 +241,14 @@
* Default value is 128. Change using {@link #setTermIndexInterval(int)}.
*/
public final static int DEFAULT_TERM_INDEX_INTERVAL = 128;
+
+ /**
+ * Absolute hard maximum length for a term. If a term
+ * arrives from the analyzer longer than this length, it
+ * is skipped and a message is printed to infoStream, if
+ * set (see {@link setInfoStream}).
+ */
+ public final static int MAX_TERM_LENGTH = DocumentsWriter.MAX_TERM_LENGTH;
// The normal read buffer size defaults to 1024, but
// increasing this during merging seems to yield
@@ -1431,10 +1439,10 @@
*/
public void addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException {
ensureOpen();
- int status = 0;
+ boolean doFlush = false;
boolean success = false;
try {
- status = docWriter.addDocument(doc, analyzer);
+ doFlush = docWriter.addDocument(doc, analyzer);
success = true;
} finally {
if (!success) {
@@ -1453,9 +1461,8 @@
}
}
}
- if ((status & 1) != 0)
+ if (doFlush)
flush(true, false);
- checkMaxTermLength(status);
}
/**
@@ -1519,10 +1526,10 @@
public void updateDocument(Term term, Document doc, Analyzer analyzer)
throws CorruptIndexException, IOException {
ensureOpen();
- int status = 0;
+ boolean doFlush = false;
boolean success = false;
try {
- status = docWriter.updateDocument(term, doc, analyzer);
+ doFlush = docWriter.updateDocument(term, doc, analyzer);
success = true;
} finally {
if (!success) {
@@ -1539,17 +1546,8 @@
}
}
}
- if ((status & 1) != 0)
+ if (doFlush)
flush(true, false);
- checkMaxTermLength(status);
- }
-
- /** Throws IllegalArgumentException if the return status
- * from DocumentsWriter.{add,update}Document indicates
- * that a too-long term was encountered */
- final private void checkMaxTermLength(int status) {
- if (status > 1)
- throw new IllegalArgumentException("at least one term (length " + (status>>1) + ") exceeds max term length " + (DocumentsWriter.CHAR_BLOCK_SIZE-1) + "; these terms were skipped");
}
// for test purpose
@@ -2500,9 +2498,7 @@
// buffer deletes longer and then flush them to
// multiple flushed segments, when
// autoCommit=false
- int delCount = applyDeletes(flushDocs);
- if (infoStream != null)
- infoStream.println("flushed " + delCount + " deleted documents");
+ applyDeletes(flushDocs);
doAfterFlush();
}
@@ -3220,68 +3216,65 @@
// flushedNewSegment is true then a new segment was just
// created and flushed from the ram segments, so we will
// selectively apply the deletes to that new segment.
- private final int applyDeletes(boolean flushedNewSegment) throws CorruptIndexException, IOException {
+ private final void applyDeletes(boolean flushedNewSegment) throws CorruptIndexException, IOException {
final HashMap bufferedDeleteTerms = docWriter.getBufferedDeleteTerms();
+ final List bufferedDeleteDocIDs = docWriter.getBufferedDeleteDocIDs();
- int delCount = 0;
- if (bufferedDeleteTerms.size() > 0) {
- if (infoStream != null)
- message("flush " + docWriter.getNumBufferedDeleteTerms() + " buffered deleted terms on "
- + segmentInfos.size() + " segments.");
+ if (infoStream != null)
+ message("flush " + docWriter.getNumBufferedDeleteTerms() + " buffered deleted terms and " +
+ bufferedDeleteDocIDs.size() + " deleted docIDs on "
+ + segmentInfos.size() + " segments.");
- if (flushedNewSegment) {
- IndexReader reader = null;
- try {
- // Open readers w/o opening the stored fields /
- // vectors because these files may still be held
- // open for writing by docWriter
- reader = SegmentReader.get(segmentInfos.info(segmentInfos.size() - 1), false);
-
- // Apply delete terms to the segment just flushed from ram
- // apply appropriately so that a delete term is only applied to
- // the documents buffered before it, not those buffered after it.
- delCount += applyDeletesSelectively(bufferedDeleteTerms, reader);
- } finally {
- if (reader != null) {
- try {
- reader.doCommit();
- } finally {
- reader.doClose();
- }
+ if (flushedNewSegment) {
+ IndexReader reader = null;
+ try {
+ // Open readers w/o opening the stored fields /
+ // vectors because these files may still be held
+ // open for writing by docWriter
+ reader = SegmentReader.get(segmentInfos.info(segmentInfos.size() - 1), false);
+
+ // Apply delete terms to the segment just flushed from ram
+ // apply appropriately so that a delete term is only applied to
+ // the documents buffered before it, not those buffered after it.
+ applyDeletesSelectively(bufferedDeleteTerms, bufferedDeleteDocIDs, reader);
+ } finally {
+ if (reader != null) {
+ try {
+ reader.doCommit();
+ } finally {
+ reader.doClose();
}
}
}
+ }
- int infosEnd = segmentInfos.size();
- if (flushedNewSegment) {
- infosEnd--;
- }
+ int infosEnd = segmentInfos.size();
+ if (flushedNewSegment) {
+ infosEnd--;
+ }
- for (int i = 0; i < infosEnd; i++) {
- IndexReader reader = null;
- try {
- reader = SegmentReader.get(segmentInfos.info(i), false);
+ for (int i = 0; i < infosEnd; i++) {
+ IndexReader reader = null;
+ try {
+ reader = SegmentReader.get(segmentInfos.info(i), false);
- // Apply delete terms to disk segments
- // except the one just flushed from ram.
- delCount += applyDeletes(bufferedDeleteTerms, reader);
- } finally {
- if (reader != null) {
- try {
- reader.doCommit();
- } finally {
- reader.doClose();
- }
+ // Apply delete terms to disk segments
+ // except the one just flushed from ram.
+ applyDeletes(bufferedDeleteTerms, reader);
+ } finally {
+ if (reader != null) {
+ try {
+ reader.doCommit();
+ } finally {
+ reader.doClose();
}
}
}
-
- // Clean up bufferedDeleteTerms.
- docWriter.clearBufferedDeleteTerms();
}
- return delCount;
+ // Clean up bufferedDeleteTerms.
+ docWriter.clearBufferedDeletes();
}
// For test purposes.
@@ -3297,10 +3290,10 @@
// Apply buffered delete terms to the segment just flushed from ram
// apply appropriately so that a delete term is only applied to
// the documents buffered before it, not those buffered after it.
- private final int applyDeletesSelectively(HashMap deleteTerms,
- IndexReader reader) throws CorruptIndexException, IOException {
+ private final void applyDeletesSelectively(HashMap deleteTerms, List deleteIds,
+ IndexReader reader)
+ throws CorruptIndexException, IOException {
Iterator iter = deleteTerms.entrySet().iterator();
- int delCount = 0;
while (iter.hasNext()) {
Entry entry = (Entry) iter.next();
Term term = (Term) entry.getKey();
@@ -3315,26 +3308,28 @@
break;
}
reader.deleteDocument(doc);
- delCount++;
}
} finally {
docs.close();
}
}
}
- return delCount;
+
+ if (deleteIds.size() > 0) {
+ iter = deleteIds.iterator();
+ while(iter.hasNext())
+ reader.deleteDocument(((Integer) iter.next()).intValue());
+ }
}
// Apply buffered delete terms to this reader.
- private final int applyDeletes(HashMap deleteTerms, IndexReader reader)
+ private final void applyDeletes(HashMap deleteTerms, IndexReader reader)
throws CorruptIndexException, IOException {
Iterator iter = deleteTerms.entrySet().iterator();
- int delCount = 0;
while (iter.hasNext()) {
Entry entry = (Entry) iter.next();
- delCount += reader.deleteDocuments((Term) entry.getKey());
+ reader.deleteDocuments((Term) entry.getKey());
}
- return delCount;
}
// utility routines for tests
Modified: lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java?rev=608534&r1=608533&r2=608534&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java Thu Jan 3 07:49:50 2008
@@ -538,8 +538,7 @@
}
/**
- * Make sure we get a friendly exception for a wicked
- * long term.
+ * Make sure we skip wicked long terms.
*/
public void testWickedLongTerm() throws IOException {
RAMDirectory dir = new RAMDirectory();
@@ -552,13 +551,9 @@
// Max length term is 16383, so this contents produces
// a too-long term:
- String contents = "abc xyz x" + bigTerm;
+ String contents = "abc xyz x" + bigTerm + " another term";
doc.add(new Field("content", contents, Field.Store.NO, Field.Index.TOKENIZED));
- try {
- writer.addDocument(doc);
- fail("did not hit expected exception");
- } catch (IllegalArgumentException e) {
- }
+ writer.addDocument(doc);
// Make sure we can add another normal document
doc = new Document();
@@ -567,9 +562,24 @@
writer.close();
IndexReader reader = IndexReader.open(dir);
+
// Make sure all terms < max size were indexed
assertEquals(2, reader.docFreq(new Term("content", "abc")));
assertEquals(1, reader.docFreq(new Term("content", "bbb")));
+ assertEquals(1, reader.docFreq(new Term("content", "term")));
+ assertEquals(1, reader.docFreq(new Term("content", "another")));
+
+ // Make sure position is still incremented when
+ // massive term is skipped:
+ TermPositions tps = reader.termPositions(new Term("content", "another"));
+ assertTrue(tps.next());
+ assertEquals(1, tps.freq());
+ assertEquals(3, tps.nextPosition());
+
+ // Make sure the doc that has the massive term is in
+ // the index:
+ assertEquals("document with wicked long term should is not in the index!", 2, reader.numDocs());
+
reader.close();
// Make sure we can add a document with exactly the
@@ -1789,7 +1799,18 @@
writer.close();
IndexReader reader = IndexReader.open(dir);
- assertEquals(reader.docFreq(new Term("content", "aa")), 3);
+ final Term t = new Term("content", "aa");
+ assertEquals(reader.docFreq(t), 3);
+
+ // Make sure the doc that hit the exception was marked
+ // as deleted:
+ TermDocs tdocs = reader.termDocs(t);
+ int count = 0;
+ while(tdocs.next()) {
+ count++;
+ }
+ assertEquals(2, count);
+
assertEquals(reader.docFreq(new Term("content", "gg")), 0);
reader.close();
dir.close();
@@ -1905,12 +1926,18 @@
int expected = 3+(1-i)*2;
assertEquals(expected, reader.docFreq(new Term("contents", "here")));
assertEquals(expected, reader.maxDoc());
+ int numDel = 0;
for(int j=0;j<reader.maxDoc();j++) {
- reader.document(j);
+ if (reader.isDeleted(j))
+ numDel++;
+ else
+ reader.document(j);
reader.getTermFreqVectors(j);
}
reader.close();
+ assertEquals(1, numDel);
+
writer = new IndexWriter(dir, analyzer);
writer.setMaxBufferedDocs(10);
doc = new Document();
@@ -1922,14 +1949,19 @@
writer.close();
reader = IndexReader.open(dir);
- expected = 20+(1-i)*2;
+ expected = 19+(1-i)*2;
assertEquals(expected, reader.docFreq(new Term("contents", "here")));
assertEquals(expected, reader.maxDoc());
+ numDel = 0;
for(int j=0;j<reader.maxDoc();j++) {
- reader.document(j);
+ if (reader.isDeleted(j))
+ numDel++;
+ else
+ reader.document(j);
reader.getTermFreqVectors(j);
}
reader.close();
+ assertEquals(0, numDel);
dir.close();
}