You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-commits@lucene.apache.org by mi...@apache.org on 2007/12/04 23:29:43 UTC
svn commit: r601121 - in /lucene/java/trunk/src:
java/org/apache/lucene/index/DocumentsWriter.java
test/org/apache/lucene/index/TestIndexWriter.java
Author: mikemccand
Date: Tue Dec 4 14:29:42 2007
New Revision: 601121
URL: http://svn.apache.org/viewvc?rev=601121&view=rev
Log:
LUCENE-1072: make sure an exception raised in Tokenizer.next() leaves DocumentsWriter in OK state
Modified:
lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java
lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java
Modified: lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=601121&r1=601120&r2=601121&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java Tue Dec 4 14:29:42 2007
@@ -389,6 +389,7 @@
try {
wait();
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
}
}
@@ -521,6 +522,7 @@
int maxTermHit; // Set to > 0 if this doc has a too-large term
boolean doFlushAfter;
+ boolean abortOnExc;
public ThreadState() {
fieldDataArray = new FieldData[8];
@@ -558,6 +560,12 @@
* the ThreadState into the "real" stores. */
public void writeDocument() throws IOException {
+ // If we hit an exception while appending to the
+ // stored fields or term vectors files, we have to
+ // abort because it means those files are possibly
+ // inconsistent.
+ abortOnExc = true;
+
// Append stored fields to the real FieldsWriter:
fieldsWriter.flushDocument(fdtLocal);
fdtLocal.reset();
@@ -581,6 +589,7 @@
tvfLocal.reset();
}
}
+ abortOnExc = false;
// Append norms for the fields we saw:
for(int i=0;i<numFieldData;i++) {
@@ -604,6 +613,7 @@
/** Initializes shared state for this new document */
void init(Document doc, int docID) throws IOException {
+ abortOnExc = false;
this.docID = docID;
docBoost = doc.getBoost();
numStoredFields = 0;
@@ -1177,6 +1187,7 @@
boolean doVectors;
boolean doVectorPositions;
boolean doVectorOffsets;
+ boolean postingsCompacted;
int numPostings;
@@ -1197,8 +1208,11 @@
}
void resetPostingArrays() {
+ if (!postingsCompacted)
+ compactPostings();
recyclePostings(this.postingsHash, numPostings);
Arrays.fill(postingsHash, 0, postingsHash.length, null);
+ postingsCompacted = false;
numPostings = 0;
}
@@ -1217,15 +1231,20 @@
return fieldInfo.name.compareTo(((FieldData) o).fieldInfo.name);
}
- /** Collapse the hash table & sort in-place. */
- public Posting[] sortPostings() {
+ private void compactPostings() {
int upto = 0;
for(int i=0;i<postingsHashSize;i++)
if (postingsHash[i] != null)
postingsHash[upto++] = postingsHash[i];
assert upto == numPostings;
- doPostingSort(postingsHash, upto);
+ postingsCompacted = true;
+ }
+
+ /** Collapse the hash table & sort in-place. */
+ public Posting[] sortPostings() {
+ compactPostings();
+ doPostingSort(postingsHash, numPostings);
return postingsHash;
}
@@ -1241,26 +1260,29 @@
final int limit = fieldCount;
final Fieldable[] docFieldsFinal = docFields;
- // Walk through all occurrences in this doc for this field:
- for(int j=0;j<limit;j++) {
- Fieldable field = docFieldsFinal[j];
-
- if (field.isIndexed())
- invertField(field, analyzer, maxFieldLength);
-
- if (field.isStored())
- localFieldsWriter.writeField(fieldInfo, field);
-
- docFieldsFinal[j] = null;
- }
-
- if (postingsVectorsUpto > 0) {
- // Add term vectors for this field
- writeVectors(fieldInfo);
- if (postingsVectorsUpto > maxPostingsVectors)
- maxPostingsVectors = postingsVectorsUpto;
- postingsVectorsUpto = 0;
- vectorsPool.reset();
+ // Walk through all occurrences in this doc for this
+ // field:
+ try {
+ for(int j=0;j<limit;j++) {
+ Fieldable field = docFieldsFinal[j];
+
+ if (field.isIndexed())
+ invertField(field, analyzer, maxFieldLength);
+
+ if (field.isStored())
+ localFieldsWriter.writeField(fieldInfo, field);
+
+ docFieldsFinal[j] = null;
+ }
+ } finally {
+ if (postingsVectorsUpto > 0) {
+ // Add term vectors for this field
+ writeVectors(fieldInfo);
+ if (postingsVectorsUpto > maxPostingsVectors)
+ maxPostingsVectors = postingsVectorsUpto;
+ postingsVectorsUpto = 0;
+ vectorsPool.reset();
+ }
}
}
@@ -1406,6 +1428,8 @@
int hashPos = code & postingsHashMask;
+ assert !postingsCompacted;
+
// Locate Posting in hash
p = postingsHash[hashPos];
@@ -1422,6 +1446,12 @@
final int proxCode;
+ // If we hit an exception below, it's possible the
+ // posting list or term vectors data will be
+ // partially written and thus inconsistent if
+ // flushed, so we have to abort:
+ abortOnExc = true;
+
if (p != null) { // term seen since last flush
if (docID != p.lastDocID) { // term not yet seen in this doc
@@ -1492,6 +1522,7 @@
// Just skip this term; we will throw an
// exception after processing all accepted
// terms in the doc
+ abortOnExc = false;
return;
}
charPool.nextBuffer();
@@ -1572,6 +1603,8 @@
vector.lastOffset = offsetEnd;
vector.offsetUpto = offsetUpto + (vector.offsetUpto & BYTE_BLOCK_NOT_MASK);
}
+
+ abortOnExc = false;
}
/** Called when postings hash is too small (> 50%
@@ -2142,7 +2175,9 @@
while(!state.isIdle || pauseThreads != 0 || flushPending)
try {
wait();
- } catch (InterruptedException e) {}
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
if (segment == null)
segment = writer.newSegmentName();
@@ -2165,22 +2200,24 @@
boolean success = false;
try {
state.init(doc, nextDocID++);
+
+ if (delTerm != null) {
+ addDeleteTerm(delTerm, state.docID);
+ if (!state.doFlushAfter)
+ state.doFlushAfter = timeToFlushDeletes();
+ }
+
success = true;
} finally {
if (!success) {
- state.isIdle = true;
- if (state.doFlushAfter) {
- state.doFlushAfter = false;
- flushPending = false;
+ synchronized(this) {
+ state.isIdle = true;
+ if (state.doFlushAfter) {
+ state.doFlushAfter = false;
+ flushPending = false;
+ }
+ notifyAll();
}
- abort();
- }
- }
-
- if (delTerm != null) {
- addDeleteTerm(delTerm, state.docID);
- if (!state.doFlushAfter) {
- state.doFlushAfter = timeToFlushDeletes();
}
}
@@ -2208,15 +2245,22 @@
int maxTermHit;
try {
// This call is not synchronized and does all the work
- state.processDocument(analyzer);
- // This call synchronized but fast
- maxTermHit = state.maxTermHit;
- finishDocument(state);
+ try {
+ state.processDocument(analyzer);
+ } finally {
+ maxTermHit = state.maxTermHit;
+ // This call synchronized but fast
+ finishDocument(state);
+ }
success = true;
} finally {
if (!success) {
- state.isIdle = true;
- abort();
+ synchronized(this) {
+ state.isIdle = true;
+ if (state.abortOnExc)
+ abort();
+ notifyAll();
+ }
}
}
@@ -2246,7 +2290,9 @@
while(pauseThreads != 0 || flushPending)
try {
wait();
- } catch (InterruptedException e) {}
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
for (int i = 0; i < terms.length; i++)
addDeleteTerm(terms[i], numDocsInRAM);
return timeToFlushDeletes();
@@ -2256,7 +2302,9 @@
while(pauseThreads != 0 || flushPending)
try {
wait();
- } catch (InterruptedException e) {}
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
addDeleteTerm(term, numDocsInRAM);
return timeToFlushDeletes();
}
Modified: lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java?rev=601121&r1=601120&r2=601121&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java Tue Dec 4 14:29:42 2007
@@ -18,6 +18,7 @@
*/
import java.io.IOException;
+import java.io.Reader;
import java.io.File;
import java.util.Arrays;
import java.util.Random;
@@ -25,7 +26,12 @@
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.analysis.WhitespaceAnalyzer;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.TokenFilter;
+import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.analysis.standard.StandardTokenizer;
+import org.apache.lucene.analysis.Token;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.search.IndexSearcher;
@@ -1735,5 +1741,109 @@
for(int i=0;i<177;i++)
iw.addDocument(document);
iw.close();
+ }
+
+ // LUCENE-1072
+ public void testExceptionFromTokenStream() throws IOException {
+ RAMDirectory dir = new MockRAMDirectory();
+ IndexWriter writer = new IndexWriter(dir, new Analyzer() {
+
+ public TokenStream tokenStream(String fieldName, Reader reader) {
+ return new TokenFilter(new StandardTokenizer(reader)) {
+ private int count = 0;
+
+ public Token next() throws IOException {
+ if (count++ == 5) {
+ throw new IOException();
+ }
+ return input.next();
+ }
+ };
+ }
+
+ }, true);
+
+ Document doc = new Document();
+ String contents = "aa bb cc dd ee ff gg hh ii jj kk";
+ doc.add(new Field("content", contents, Field.Store.NO,
+ Field.Index.TOKENIZED));
+ try {
+ writer.addDocument(doc);
+ fail("did not hit expected exception");
+ } catch (Exception e) {
+ }
+
+ // Make sure we can add another normal document
+ doc = new Document();
+ doc.add(new Field("content", "aa bb cc dd", Field.Store.NO,
+ Field.Index.TOKENIZED));
+ writer.addDocument(doc);
+
+ // Make sure we can add another normal document
+ doc = new Document();
+ doc.add(new Field("content", "aa bb cc dd", Field.Store.NO,
+ Field.Index.TOKENIZED));
+ writer.addDocument(doc);
+
+ writer.close();
+ IndexReader reader = IndexReader.open(dir);
+ assertEquals(reader.docFreq(new Term("content", "aa")), 3);
+ assertEquals(reader.docFreq(new Term("content", "gg")), 0);
+ reader.close();
+ dir.close();
+ }
+
+ private static class FailOnlyOnFlush extends MockRAMDirectory.Failure {
+ boolean doFail = false;
+ int count;
+
+ public void setDoFail() {
+ this.doFail = true;
+ }
+ public void clearDoFail() {
+ this.doFail = false;
+ }
+
+ public void eval(MockRAMDirectory dir) throws IOException {
+ if (doFail) {
+ StackTraceElement[] trace = new Exception().getStackTrace();
+ for (int i = 0; i < trace.length; i++) {
+ if ("appendPostings".equals(trace[i].getMethodName()) && count++ == 30) {
+ doFail = false;
+ throw new IOException("now failing during flush");
+ }
+ }
+ }
+ }
+ }
+
+ // LUCENE-1072: make sure an errant exception on flushing
+ // one segment only takes out those docs in that one flush
+ public void testDocumentsWriterAbort() throws IOException {
+ MockRAMDirectory dir = new MockRAMDirectory();
+ FailOnlyOnFlush failure = new FailOnlyOnFlush();
+ failure.setDoFail();
+ dir.failOn(failure);
+
+ IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer());
+ writer.setMaxBufferedDocs(2);
+ Document doc = new Document();
+ String contents = "aa bb cc dd ee ff gg hh ii jj kk";
+ doc.add(new Field("content", contents, Field.Store.NO,
+ Field.Index.TOKENIZED));
+ boolean hitError = false;
+ for(int i=0;i<200;i++) {
+ try {
+ writer.addDocument(doc);
+ } catch (IOException ioe) {
+ // only one flush should fail:
+ assertFalse(hitError);
+ hitError = true;
+ }
+ }
+ assertTrue(hitError);
+ writer.close();
+ IndexReader reader = IndexReader.open(dir);
+ assertEquals(198, reader.docFreq(new Term("content", "aa")));
}
}