You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-commits@lucene.apache.org by mi...@apache.org on 2008/01/14 18:06:24 UTC
svn commit: r611855 - in /lucene/java/trunk/src:
java/org/apache/lucene/index/ test/org/apache/lucene/index/
test/org/apache/lucene/store/
Author: mikemccand
Date: Mon Jan 14 09:06:21 2008
New Revision: 611855
URL: http://svn.apache.org/viewvc?rev=611855&view=rev
Log:
LUCENE-1130: fix thread safety issues when hitting IOExceptions in DocumentsWriter
Modified:
lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java
lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java
lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java
lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMDirectory.java
lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMOutputStream.java
Modified: lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java?rev=611855&r1=611854&r2=611855&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java Mon Jan 14 09:06:21 2008
@@ -251,7 +251,7 @@
message(" merge thread: done");
- } catch (IOException exc) {
+ } catch (Throwable exc) {
if (merge != null) {
merge.setException(exc);
Modified: lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=611855&r1=611854&r2=611855&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java Mon Jan 14 09:06:21 2008
@@ -145,11 +145,12 @@
private ThreadState[] threadStates = new ThreadState[0];
private final HashMap threadBindings = new HashMap();
private int numWaiting;
- private ThreadState[] waitingThreadStates = new ThreadState[1];
+ private final ThreadState[] waitingThreadStates = new ThreadState[MAX_THREAD_STATE];
private int pauseThreads; // Non-zero when we need all threads to
// pause (eg to flush)
private boolean flushPending; // True when a thread has decided to flush
- private boolean bufferIsFull; // True when it's time to write segment
+ private boolean bufferIsFull; // True when it's time to write segment
+ private boolean aborting; // True while abort is running
private PrintStream infoStream;
@@ -321,74 +322,129 @@
return files;
}
+ synchronized void setAborting() {
+ aborting = true;
+ }
+
/** Called if we hit an exception when adding docs,
* flushing, etc. This resets our state, discarding any
- * docs added since last flush. */
- synchronized void abort() throws IOException {
+ * docs added since last flush. If ae is non-null, it
+ * contains the root cause exception (which we re-throw
+ * after we are done aborting). */
+ synchronized void abort(AbortException ae) throws IOException {
+
+ // Anywhere that throws an AbortException must first
+ // mark aborting to make sure while the exception is
+ // unwinding the un-synchronized stack, no thread grabs
+ // the corrupt ThreadState that hit the aborting
+ // exception:
+ assert ae == null || aborting;
- if (infoStream != null)
- infoStream.println("docWriter: now abort");
+ try {
- // Forcefully remove waiting ThreadStates from line
- for(int i=0;i<numWaiting;i++)
- waitingThreadStates[i].isIdle = true;
- numWaiting = 0;
+ if (infoStream != null)
+ infoStream.println("docWriter: now abort");
- pauseAllThreads();
+ // Forcefully remove waiting ThreadStates from line
+ for(int i=0;i<numWaiting;i++)
+ waitingThreadStates[i].isIdle = true;
+ numWaiting = 0;
- bufferedDeleteTerms.clear();
- bufferedDeleteDocIDs.clear();
- numBufferedDeleteTerms = 0;
+ // Wait for all other threads to finish with DocumentsWriter:
+ pauseAllThreads();
- try {
+ assert 0 == numWaiting;
- abortedFiles = files();
+ try {
- // Discard pending norms:
- final int numField = fieldInfos.size();
- for (int i=0;i<numField;i++) {
- FieldInfo fi = fieldInfos.fieldInfo(i);
- if (fi.isIndexed && !fi.omitNorms) {
- BufferedNorms n = norms[i];
- if (n != null) {
- n.out.reset();
- n.reset();
+ bufferedDeleteTerms.clear();
+ bufferedDeleteDocIDs.clear();
+ numBufferedDeleteTerms = 0;
+
+ abortedFiles = files();
+
+ // Discard pending norms:
+ final int numField = fieldInfos.size();
+ for (int i=0;i<numField;i++) {
+ FieldInfo fi = fieldInfos.fieldInfo(i);
+ if (fi.isIndexed && !fi.omitNorms) {
+ BufferedNorms n = norms[i];
+ if (n != null) {
+ n.out.reset();
+ n.reset();
+ }
}
}
- }
-
- // Reset vectors writer
- if (tvx != null) {
- tvx.close();
- tvf.close();
- tvd.close();
- tvx = null;
- }
- // Reset fields writer
- if (fieldsWriter != null) {
- fieldsWriter.close();
- fieldsWriter = null;
- }
+ // Reset vectors writer
+ if (tvx != null) {
+ try {
+ tvx.close();
+ } catch (IOException ioe) {
+ }
+ tvx = null;
+ }
+ if (tvd != null) {
+ try {
+ tvd.close();
+ } catch (IOException ioe) {
+ }
+ tvd = null;
+ }
+ if (tvf != null) {
+ try {
+ tvf.close();
+ } catch (IOException ioe) {
+ }
+ tvf = null;
+ }
- // Reset all postings data
- resetPostingsData();
+ // Reset fields writer
+ if (fieldsWriter != null) {
+ try {
+ fieldsWriter.close();
+ } catch (IOException ioe) {
+ }
+ fieldsWriter = null;
+ }
- // Clear vectors & fields from ThreadStates
- for(int i=0;i<threadStates.length;i++) {
- ThreadState state = threadStates[i];
- if (state.localFieldsWriter != null) {
- state.localFieldsWriter.close();
- state.localFieldsWriter = null;
+ // Clear vectors & fields from ThreadStates
+ for(int i=0;i<threadStates.length;i++) {
+ ThreadState state = threadStates[i];
+ if (state.localFieldsWriter != null) {
+ state.localFieldsWriter.close();
+ state.localFieldsWriter = null;
+ }
+ state.tvfLocal.reset();
+ state.fdtLocal.reset();
}
- state.tvfLocal.reset();
- state.fdtLocal.reset();
+
+ // Reset all postings data
+ resetPostingsData();
+
+ docStoreSegment = null;
+ files = null;
+
+ } finally {
+ resumeAllThreads();
}
- docStoreSegment = null;
- files = null;
+ // If we have a root cause exception, re-throw it now:
+ if (ae != null) {
+ Throwable t = ae.getCause();
+ if (t instanceof IOException)
+ throw (IOException) t;
+ else if (t instanceof RuntimeException)
+ throw (RuntimeException) t;
+ else if (t instanceof Error)
+ throw (Error) t;
+ else
+ // Should not get here
+ assert false: "unknown exception: " + t;
+ }
} finally {
- resumeAllThreads();
+ aborting = false;
+ notifyAll();
}
}
@@ -412,17 +468,17 @@
files = null;
}
- synchronized void pauseAllThreads() {
+ // Returns true if an abort is in progress
+ synchronized boolean pauseAllThreads() {
pauseThreads++;
- if (1 == pauseThreads) {
- while(!allThreadsIdle()) {
- try {
- wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ while(!allThreadsIdle()) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
}
+ return aborting;
}
synchronized void resumeAllThreads() {
@@ -444,7 +500,7 @@
List newFiles;
/** Flush all pending docs to a new segment */
- int flush(boolean closeDocStore) throws IOException {
+ synchronized int flush(boolean closeDocStore) throws IOException {
assert allThreadsIdle();
@@ -456,13 +512,6 @@
docStoreOffset = numDocsInStore;
- if (closeDocStore) {
- assert docStoreSegment != null;
- assert docStoreSegment.equals(segment);
- newFiles.addAll(files());
- closeDocStore();
- }
-
int docCount;
assert numDocsInRAM > 0;
@@ -474,6 +523,13 @@
try {
+ if (closeDocStore) {
+ assert docStoreSegment != null;
+ assert docStoreSegment.equals(segment);
+ newFiles.addAll(files());
+ closeDocStore();
+ }
+
fieldInfos.write(directory, segment + ".fnm");
docCount = numDocsInRAM;
@@ -484,7 +540,7 @@
} finally {
if (!success)
- abort();
+ abort(null);
}
return docCount;
@@ -553,7 +609,6 @@
// doc has one
boolean doFlushAfter;
- boolean abortOnExc;
public ThreadState() {
fieldDataArray = new FieldData[8];
@@ -574,6 +629,7 @@
localFieldsWriter.close();
localFieldsWriter = null;
}
+ fieldGen = 0;
maxPostingsVectors = 0;
doFlushAfter = false;
postingsPool.reset();
@@ -589,52 +645,56 @@
/** Move all per-document state that was accumulated in
* the ThreadState into the "real" stores. */
- public void writeDocument() throws IOException {
+ public void writeDocument() throws IOException, AbortException {
// If we hit an exception while appending to the
// stored fields or term vectors files, we have to
// abort all documents since we last flushed because
// it means those files are possibly inconsistent.
- abortOnExc = true;
-
- // Append stored fields to the real FieldsWriter:
- fieldsWriter.flushDocument(numStoredFields, fdtLocal);
- fdtLocal.reset();
- numStoredFields = 0;
+ try {
- // Append term vectors to the real outputs:
- if (tvx != null) {
- tvx.writeLong(tvd.getFilePointer());
- tvd.writeVInt(numVectorFields);
- if (numVectorFields > 0) {
- for(int i=0;i<numVectorFields;i++)
- tvd.writeVInt(vectorFieldNumbers[i]);
- assert 0 == vectorFieldPointers[0];
- tvd.writeVLong(tvf.getFilePointer());
- long lastPos = vectorFieldPointers[0];
- for(int i=1;i<numVectorFields;i++) {
- long pos = vectorFieldPointers[i];
- tvd.writeVLong(pos-lastPos);
- lastPos = pos;
- }
- tvfLocal.writeTo(tvf);
- tvfLocal.reset();
+ // Append stored fields to the real FieldsWriter:
+ fieldsWriter.flushDocument(numStoredFields, fdtLocal);
+ fdtLocal.reset();
+
+ // Append term vectors to the real outputs:
+ if (tvx != null) {
+ tvx.writeLong(tvd.getFilePointer());
+ tvd.writeVInt(numVectorFields);
+ if (numVectorFields > 0) {
+ for(int i=0;i<numVectorFields;i++)
+ tvd.writeVInt(vectorFieldNumbers[i]);
+ assert 0 == vectorFieldPointers[0];
+ tvd.writeVLong(tvf.getFilePointer());
+ long lastPos = vectorFieldPointers[0];
+ for(int i=1;i<numVectorFields;i++) {
+ long pos = vectorFieldPointers[i];
+ tvd.writeVLong(pos-lastPos);
+ lastPos = pos;
+ }
+ tvfLocal.writeTo(tvf);
+ tvfLocal.reset();
+ }
}
- }
- // Append norms for the fields we saw:
- for(int i=0;i<numFieldData;i++) {
- FieldData fp = fieldDataArray[i];
- if (fp.doNorms) {
- BufferedNorms bn = norms[fp.fieldInfo.number];
- assert bn != null;
- assert bn.upto <= docID;
- bn.fill(docID);
- float norm = fp.boost * writer.getSimilarity().lengthNorm(fp.fieldInfo.name, fp.length);
- bn.add(norm);
- }
+ // Append norms for the fields we saw:
+ for(int i=0;i<numFieldData;i++) {
+ FieldData fp = fieldDataArray[i];
+ if (fp.doNorms) {
+ BufferedNorms bn = norms[fp.fieldInfo.number];
+ assert bn != null;
+ assert bn.upto <= docID;
+ bn.fill(docID);
+ float norm = fp.boost * writer.getSimilarity().lengthNorm(fp.fieldInfo.name, fp.length);
+ bn.add(norm);
+ }
+ }
+ } catch (Throwable t) {
+ // Forcefully idle this threadstate -- its state will
+ // be reset by abort()
+ isIdle = true;
+ throw new AbortException(t, DocumentsWriter.this);
}
- abortOnExc = false;
if (bufferIsFull && !flushPending) {
flushPending = true;
@@ -642,10 +702,13 @@
}
}
+ int fieldGen;
+
/** Initializes shared state for this new document */
- void init(Document doc, int docID) throws IOException {
+ void init(Document doc, int docID) throws IOException, AbortException {
+
+ assert !isIdle;
- abortOnExc = false;
this.docID = docID;
docBoost = doc.getBoost();
numStoredFields = 0;
@@ -654,7 +717,10 @@
maxTermPrefix = null;
assert 0 == fdtLocal.length();
+ assert 0 == fdtLocal.getFilePointer();
assert 0 == tvfLocal.length();
+ assert 0 == tvfLocal.getFilePointer();
+ final int thisFieldGen = fieldGen++;
List docFields = doc.getFields();
final int numDocFields = docFields.size();
@@ -700,36 +766,37 @@
if (numAllFieldData == allFieldDataArray.length) {
int newSize = (int) (allFieldDataArray.length*1.5);
+ int newHashSize = fieldDataHash.length*2;
FieldData newArray[] = new FieldData[newSize];
+ FieldData newHashArray[] = new FieldData[newHashSize];
System.arraycopy(allFieldDataArray, 0, newArray, 0, numAllFieldData);
- allFieldDataArray = newArray;
// Rehash
- newSize = fieldDataHash.length*2;
- newArray = new FieldData[newSize];
fieldDataHashMask = newSize-1;
for(int j=0;j<fieldDataHash.length;j++) {
FieldData fp0 = fieldDataHash[j];
while(fp0 != null) {
hashPos = fp0.fieldInfo.name.hashCode() & fieldDataHashMask;
FieldData nextFP0 = fp0.next;
- fp0.next = newArray[hashPos];
- newArray[hashPos] = fp0;
+ fp0.next = newHashArray[hashPos];
+ newHashArray[hashPos] = fp0;
fp0 = nextFP0;
}
}
- fieldDataHash = newArray;
+
+ allFieldDataArray = newArray;
+ fieldDataHash = newHashArray;
}
allFieldDataArray[numAllFieldData++] = fp;
} else {
assert fp.fieldInfo == fi;
}
- if (docID != fp.lastDocID) {
+ if (thisFieldGen != fp.lastGen) {
// First time we're seeing this field for this doc
- fp.lastDocID = docID;
+ fp.lastGen = thisFieldGen;
fp.fieldCount = 0;
fp.doVectors = fp.doVectorPositions = fp.doVectorOffsets = false;
fp.doNorms = fi.isIndexed && !fi.omitNorms;
@@ -776,7 +843,15 @@
assert docStoreSegment == null;
assert segment != null;
docStoreSegment = segment;
- fieldsWriter = new FieldsWriter(directory, docStoreSegment, fieldInfos);
+ // If we hit an exception while init'ing the
+ // fieldsWriter, we must abort this segment
+ // because those files will be in an unknown
+ // state:
+ try {
+ fieldsWriter = new FieldsWriter(directory, docStoreSegment, fieldInfos);
+ } catch (Throwable t) {
+ throw new AbortException(t, DocumentsWriter.this);
+ }
files = null;
}
localFieldsWriter = new FieldsWriter(null, fdtLocal, fieldInfos);
@@ -787,18 +862,26 @@
if (docHasVectors) {
if (tvx == null) {
assert docStoreSegment != null;
- tvx = directory.createOutput(docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION);
- tvx.writeInt(TermVectorsReader.FORMAT_VERSION);
- tvd = directory.createOutput(docStoreSegment + "." + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION);
- tvd.writeInt(TermVectorsReader.FORMAT_VERSION);
- tvf = directory.createOutput(docStoreSegment + "." + IndexFileNames.VECTORS_FIELDS_EXTENSION);
- tvf.writeInt(TermVectorsReader.FORMAT_VERSION);
+ // If we hit an exception while init'ing the term
+ // vector output files, we must abort this segment
+ // because those files will be in an unknown
+ // state:
+ try {
+ tvx = directory.createOutput(docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION);
+ tvx.writeInt(TermVectorsReader.FORMAT_VERSION);
+ tvd = directory.createOutput(docStoreSegment + "." + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION);
+ tvd.writeInt(TermVectorsReader.FORMAT_VERSION);
+ tvf = directory.createOutput(docStoreSegment + "." + IndexFileNames.VECTORS_FIELDS_EXTENSION);
+ tvf.writeInt(TermVectorsReader.FORMAT_VERSION);
+
+ // We must "catch up" for all docIDs that had no
+ // vectors before this one
+ for(int i=0;i<docID;i++)
+ tvx.writeLong(0);
+ } catch (Throwable t) {
+ throw new AbortException(t, DocumentsWriter.this);
+ }
files = null;
-
- // We must "catch up" for all docIDs that had no
- // vectors before this one
- for(int i=0;i<docID;i++)
- tvx.writeLong(0);
}
numVectorFields = 0;
@@ -929,7 +1012,7 @@
int upto = 0;
for(int i=0;i<numAllFieldData;i++) {
FieldData fp = allFieldDataArray[i];
- if (fp.lastDocID == -1) {
+ if (fp.lastGen == -1) {
// This field was not seen since the previous
// flush, so, free up its resources now
@@ -953,7 +1036,7 @@
} else {
// Reset
- fp.lastDocID = -1;
+ fp.lastGen = -1;
allFieldDataArray[upto++] = fp;
if (fp.numPostings > 0 && ((float) fp.numPostings) / fp.postingsHashSize < 0.2) {
@@ -996,7 +1079,7 @@
/** Tokenizes the fields of a document into Postings */
void processDocument(Analyzer analyzer)
- throws IOException {
+ throws IOException, AbortException {
final int numFields = numFieldData;
@@ -1215,7 +1298,7 @@
int fieldCount;
Fieldable[] docFields = new Fieldable[1];
- int lastDocID = -1;
+ int lastGen = -1;
FieldData next;
boolean doNorms;
@@ -1284,7 +1367,7 @@
}
/** Process all occurrences of one field in the document. */
- public void processField(Analyzer analyzer) throws IOException {
+ public void processField(Analyzer analyzer) throws IOException, AbortException {
length = 0;
position = 0;
offset = 0;
@@ -1316,10 +1399,8 @@
// contents of fdtLocal can be corrupt, so
// we must discard all stored fields for
// this document:
- if (!success) {
- numStoredFields = 0;
+ if (!success)
fdtLocal.reset();
- }
}
}
@@ -1354,7 +1435,7 @@
Token localToken = new Token();
/* Invert one occurrence of one field in the document */
- public void invertField(Fieldable field, Analyzer analyzer, final int maxFieldLength) throws IOException {
+ public void invertField(Fieldable field, Analyzer analyzer, final int maxFieldLength) throws IOException, AbortException {
if (length>0)
position += analyzer.getPositionIncrementGap(fieldInfo.name);
@@ -1475,7 +1556,7 @@
* for every term of every document. Its job is to *
* update the postings byte stream (Postings hash) *
* based on the occurence of a single term. */
- private void addPosition(Token token) {
+ private void addPosition(Token token) throws AbortException {
final Payload payload = token.getPayload();
@@ -1519,167 +1600,168 @@
// partially written and thus inconsistent if
// flushed, so we have to abort all documents
// since the last flush:
- abortOnExc = true;
- if (p != null) { // term seen since last flush
+ try {
- if (docID != p.lastDocID) { // term not yet seen in this doc
-
- // System.out.println(" seen before (new docID=" + docID + ") freqUpto=" + p.freqUpto +" proxUpto=" + p.proxUpto);
+ if (p != null) { // term seen since last flush
- assert p.docFreq > 0;
+ if (docID != p.lastDocID) { // term not yet seen in this doc
+
+ // System.out.println(" seen before (new docID=" + docID + ") freqUpto=" + p.freqUpto +" proxUpto=" + p.proxUpto);
- // Now that we know doc freq for previous doc,
- // write it & lastDocCode
- freqUpto = p.freqUpto & BYTE_BLOCK_MASK;
- freq = postingsPool.buffers[p.freqUpto >> BYTE_BLOCK_SHIFT];
- if (1 == p.docFreq)
- writeFreqVInt(p.lastDocCode|1);
- else {
- writeFreqVInt(p.lastDocCode);
- writeFreqVInt(p.docFreq);
- }
- p.freqUpto = freqUpto + (p.freqUpto & BYTE_BLOCK_NOT_MASK);
+ assert p.docFreq > 0;
- if (doVectors) {
- vector = addNewVector();
- if (doVectorOffsets) {
- offsetStartCode = offsetStart = offset + token.startOffset();
- offsetEnd = offset + token.endOffset();
+ // Now that we know doc freq for previous doc,
+ // write it & lastDocCode
+ freqUpto = p.freqUpto & BYTE_BLOCK_MASK;
+ freq = postingsPool.buffers[p.freqUpto >> BYTE_BLOCK_SHIFT];
+ if (1 == p.docFreq)
+ writeFreqVInt(p.lastDocCode|1);
+ else {
+ writeFreqVInt(p.lastDocCode);
+ writeFreqVInt(p.docFreq);
}
- }
+ p.freqUpto = freqUpto + (p.freqUpto & BYTE_BLOCK_NOT_MASK);
- proxCode = position;
-
- p.docFreq = 1;
-
- // Store code so we can write this after we're
- // done with this new doc
- p.lastDocCode = (docID-p.lastDocID) << 1;
- p.lastDocID = docID;
+ if (doVectors) {
+ vector = addNewVector();
+ if (doVectorOffsets) {
+ offsetStartCode = offsetStart = offset + token.startOffset();
+ offsetEnd = offset + token.endOffset();
+ }
+ }
- } else { // term already seen in this doc
- // System.out.println(" seen before (same docID=" + docID + ") proxUpto=" + p.proxUpto);
- p.docFreq++;
+ proxCode = position;
- proxCode = position-p.lastPosition;
+ p.docFreq = 1;
- if (doVectors) {
- vector = p.vector;
- if (vector == null)
- vector = addNewVector();
- if (doVectorOffsets) {
- offsetStart = offset + token.startOffset();
- offsetEnd = offset + token.endOffset();
- offsetStartCode = offsetStart-vector.lastOffset;
+ // Store code so we can write this after we're
+ // done with this new doc
+ p.lastDocCode = (docID-p.lastDocID) << 1;
+ p.lastDocID = docID;
+
+ } else { // term already seen in this doc
+ // System.out.println(" seen before (same docID=" + docID + ") proxUpto=" + p.proxUpto);
+ p.docFreq++;
+
+ proxCode = position-p.lastPosition;
+
+ if (doVectors) {
+ vector = p.vector;
+ if (vector == null)
+ vector = addNewVector();
+ if (doVectorOffsets) {
+ offsetStart = offset + token.startOffset();
+ offsetEnd = offset + token.endOffset();
+ offsetStartCode = offsetStart-vector.lastOffset;
+ }
}
}
- }
- } else { // term not seen before
- // System.out.println(" never seen docID=" + docID);
+ } else { // term not seen before
+ // System.out.println(" never seen docID=" + docID);
- // Refill?
- if (0 == postingsFreeCount) {
- postingsFreeCount = postingsFreeList.length;
- getPostings(postingsFreeList);
- }
-
- final int textLen1 = 1+tokenTextLen;
- if (textLen1 + charPool.byteUpto > CHAR_BLOCK_SIZE) {
- if (textLen1 > CHAR_BLOCK_SIZE) {
- // Just skip this term, to remain as robust as
- // possible during indexing. A TokenFilter
- // can be inserted into the analyzer chain if
- // other behavior is wanted (pruning the term
- // to a prefix, throwing an exception, etc).
- abortOnExc = false;
- if (maxTermPrefix == null)
- maxTermPrefix = new String(tokenText, 0, 30);
-
- // Still increment position:
- position++;
- return;
+ // Refill?
+ if (0 == postingsFreeCount) {
+ postingsFreeCount = postingsFreeList.length;
+ getPostings(postingsFreeList);
}
- charPool.nextBuffer();
- }
- final char[] text = charPool.buffer;
- final int textUpto = charPool.byteUpto;
- // Pull next free Posting from free list
- p = postingsFreeList[--postingsFreeCount];
+ final int textLen1 = 1+tokenTextLen;
+ if (textLen1 + charPool.byteUpto > CHAR_BLOCK_SIZE) {
+ if (textLen1 > CHAR_BLOCK_SIZE) {
+ // Just skip this term, to remain as robust as
+ // possible during indexing. A TokenFilter
+ // can be inserted into the analyzer chain if
+ // other behavior is wanted (pruning the term
+ // to a prefix, throwing an exception, etc).
+ if (maxTermPrefix == null)
+ maxTermPrefix = new String(tokenText, 0, 30);
+
+ // Still increment position:
+ position++;
+ return;
+ }
+ charPool.nextBuffer();
+ }
+ final char[] text = charPool.buffer;
+ final int textUpto = charPool.byteUpto;
+
+ // Pull next free Posting from free list
+ p = postingsFreeList[--postingsFreeCount];
- p.textStart = textUpto + charPool.byteOffset;
- charPool.byteUpto += textLen1;
+ p.textStart = textUpto + charPool.byteOffset;
+ charPool.byteUpto += textLen1;
- System.arraycopy(tokenText, 0, text, textUpto, tokenTextLen);
+ System.arraycopy(tokenText, 0, text, textUpto, tokenTextLen);
- text[textUpto+tokenTextLen] = 0xffff;
+ text[textUpto+tokenTextLen] = 0xffff;
- assert postingsHash[hashPos] == null;
+ assert postingsHash[hashPos] == null;
- postingsHash[hashPos] = p;
- numPostings++;
+ postingsHash[hashPos] = p;
+ numPostings++;
- if (numPostings == postingsHashHalfSize)
- rehashPostings(2*postingsHashSize);
+ if (numPostings == postingsHashHalfSize)
+ rehashPostings(2*postingsHashSize);
- // Init first slice for freq & prox streams
- final int firstSize = levelSizeArray[0];
+ // Init first slice for freq & prox streams
+ final int firstSize = levelSizeArray[0];
- final int upto1 = postingsPool.newSlice(firstSize);
- p.freqStart = p.freqUpto = postingsPool.byteOffset + upto1;
+ final int upto1 = postingsPool.newSlice(firstSize);
+ p.freqStart = p.freqUpto = postingsPool.byteOffset + upto1;
- final int upto2 = postingsPool.newSlice(firstSize);
- p.proxStart = p.proxUpto = postingsPool.byteOffset + upto2;
+ final int upto2 = postingsPool.newSlice(firstSize);
+ p.proxStart = p.proxUpto = postingsPool.byteOffset + upto2;
- p.lastDocCode = docID << 1;
- p.lastDocID = docID;
- p.docFreq = 1;
+ p.lastDocCode = docID << 1;
+ p.lastDocID = docID;
+ p.docFreq = 1;
- if (doVectors) {
- vector = addNewVector();
- if (doVectorOffsets) {
- offsetStart = offsetStartCode = offset + token.startOffset();
- offsetEnd = offset + token.endOffset();
+ if (doVectors) {
+ vector = addNewVector();
+ if (doVectorOffsets) {
+ offsetStart = offsetStartCode = offset + token.startOffset();
+ offsetEnd = offset + token.endOffset();
+ }
}
- }
- proxCode = position;
- }
-
- proxUpto = p.proxUpto & BYTE_BLOCK_MASK;
- prox = postingsPool.buffers[p.proxUpto >> BYTE_BLOCK_SHIFT];
- assert prox != null;
+ proxCode = position;
+ }
- if (payload != null && payload.length > 0) {
- writeProxVInt((proxCode<<1)|1);
- writeProxVInt(payload.length);
- writeProxBytes(payload.data, payload.offset, payload.length);
- fieldInfo.storePayloads = true;
- } else
- writeProxVInt(proxCode<<1);
+ proxUpto = p.proxUpto & BYTE_BLOCK_MASK;
+ prox = postingsPool.buffers[p.proxUpto >> BYTE_BLOCK_SHIFT];
+ assert prox != null;
+
+ if (payload != null && payload.length > 0) {
+ writeProxVInt((proxCode<<1)|1);
+ writeProxVInt(payload.length);
+ writeProxBytes(payload.data, payload.offset, payload.length);
+ fieldInfo.storePayloads = true;
+ } else
+ writeProxVInt(proxCode<<1);
- p.proxUpto = proxUpto + (p.proxUpto & BYTE_BLOCK_NOT_MASK);
+ p.proxUpto = proxUpto + (p.proxUpto & BYTE_BLOCK_NOT_MASK);
- p.lastPosition = position++;
+ p.lastPosition = position++;
- if (doVectorPositions) {
- posUpto = vector.posUpto & BYTE_BLOCK_MASK;
- pos = vectorsPool.buffers[vector.posUpto >> BYTE_BLOCK_SHIFT];
- writePosVInt(proxCode);
- vector.posUpto = posUpto + (vector.posUpto & BYTE_BLOCK_NOT_MASK);
- }
+ if (doVectorPositions) {
+ posUpto = vector.posUpto & BYTE_BLOCK_MASK;
+ pos = vectorsPool.buffers[vector.posUpto >> BYTE_BLOCK_SHIFT];
+ writePosVInt(proxCode);
+ vector.posUpto = posUpto + (vector.posUpto & BYTE_BLOCK_NOT_MASK);
+ }
- if (doVectorOffsets) {
- offsetUpto = vector.offsetUpto & BYTE_BLOCK_MASK;
- offsets = vectorsPool.buffers[vector.offsetUpto >> BYTE_BLOCK_SHIFT];
- writeOffsetVInt(offsetStartCode);
- writeOffsetVInt(offsetEnd-offsetStart);
- vector.lastOffset = offsetEnd;
- vector.offsetUpto = offsetUpto + (vector.offsetUpto & BYTE_BLOCK_NOT_MASK);
+ if (doVectorOffsets) {
+ offsetUpto = vector.offsetUpto & BYTE_BLOCK_MASK;
+ offsets = vectorsPool.buffers[vector.offsetUpto >> BYTE_BLOCK_SHIFT];
+ writeOffsetVInt(offsetStartCode);
+ writeOffsetVInt(offsetEnd-offsetStart);
+ vector.lastOffset = offsetEnd;
+ vector.offsetUpto = offsetUpto + (vector.offsetUpto & BYTE_BLOCK_NOT_MASK);
+ }
+ } catch (Throwable t) {
+ throw new AbortException(t, DocumentsWriter.this);
}
-
- abortOnExc = false;
}
/** Called when postings hash is too small (> 50%
@@ -2209,6 +2291,7 @@
synchronized void close() {
closed = true;
+ notifyAll();
}
/** Returns a free (idle) ThreadState that may be used for
@@ -2247,7 +2330,7 @@
// Next, wait until my thread state is idle (in case
// it's shared with other threads) and for threads to
// not be paused nor a flush pending:
- while(!state.isIdle || pauseThreads != 0 || flushPending)
+ while(!closed && (!state.isIdle || pauseThreads != 0 || flushPending || aborting))
try {
wait();
} catch (InterruptedException e) {
@@ -2275,28 +2358,31 @@
state.isIdle = false;
- boolean success = false;
try {
- state.init(doc, nextDocID++);
-
- if (delTerm != null) {
- addDeleteTerm(delTerm, state.docID);
- if (!state.doFlushAfter)
- state.doFlushAfter = timeToFlushDeletes();
- }
-
- success = true;
- } finally {
- if (!success) {
- synchronized(this) {
+ boolean success = false;
+ try {
+ state.init(doc, nextDocID);
+ if (delTerm != null) {
+ addDeleteTerm(delTerm, state.docID);
+ if (!state.doFlushAfter)
+ state.doFlushAfter = timeToFlushDeletes();
+ }
+ // Only increment nextDocID on successful init
+ nextDocID++;
+ success = true;
+ } finally {
+ if (!success) {
+ // Forcefully idle this ThreadState:
state.isIdle = true;
+ notifyAll();
if (state.doFlushAfter) {
state.doFlushAfter = false;
flushPending = false;
}
- notifyAll();
}
}
+ } catch (AbortException ae) {
+ abort(ae);
}
return state;
@@ -2319,32 +2405,30 @@
// This call is synchronized but fast
final ThreadState state = getThreadState(doc, delTerm);
- boolean success = false;
try {
+ boolean success = false;
try {
- // This call is not synchronized and does all the work
- state.processDocument(analyzer);
+ try {
+ // This call is not synchronized and does all the work
+ state.processDocument(analyzer);
+ } finally {
+ // This call is synchronized but fast
+ finishDocument(state);
+ }
+ success = true;
} finally {
- // This call is synchronized but fast
- finishDocument(state);
- }
- success = true;
- } finally {
- if (!success) {
- synchronized(this) {
- state.isIdle = true;
- if (state.abortOnExc)
- // Abort all buffered docs since last flush
- abort();
- else
+ if (!success) {
+ synchronized(this) {
// Immediately mark this document as deleted
// since likely it was partially added. This
// keeps indexing as "all or none" (atomic) when
// adding a document:
addDeleteDocID(state.docID);
- notifyAll();
+ }
}
}
+ } catch (AbortException ae) {
+ abort(ae);
}
return state.doFlushAfter || timeToFlushDeletes();
@@ -2467,51 +2551,57 @@
/** Does the synchronized work to finish/flush the
* inverted document. */
- private synchronized void finishDocument(ThreadState state) throws IOException {
+ private synchronized void finishDocument(ThreadState state) throws IOException, AbortException {
+ if (aborting) {
+ // Forcefully idle this threadstate -- its state will
+ // be reset by abort()
+ state.isIdle = true;
+ notifyAll();
+ return;
+ }
// Now write the indexed document to the real files.
-
if (nextWriteDocID == state.docID) {
// It's my turn, so write everything now:
- state.isIdle = true;
nextWriteDocID++;
state.writeDocument();
+ state.isIdle = true;
+ notifyAll();
// If any states were waiting on me, sweep through and
// flush those that are enabled by my write.
if (numWaiting > 0) {
- while(true) {
- int upto = 0;
- for(int i=0;i<numWaiting;i++) {
- ThreadState s = waitingThreadStates[i];
+ boolean any = true;
+ while(any) {
+ any = false;
+ for(int i=0;i<numWaiting;) {
+ final ThreadState s = waitingThreadStates[i];
if (s.docID == nextWriteDocID) {
+ s.writeDocument();
s.isIdle = true;
nextWriteDocID++;
- s.writeDocument();
- } else
- // Compact as we go
- waitingThreadStates[upto++] = waitingThreadStates[i];
+ any = true;
+ if (numWaiting > i+1)
+ // Swap in the last waiting state to fill in
+ // the hole we just created. It's important
+ // to do this as-we-go and not at the end of
+ // the loop, because if we hit an aborting
+ // exception in one of the s.writeDocument
+ // calls (above), it leaves this array in an
+ // inconsistent state:
+ waitingThreadStates[i] = waitingThreadStates[numWaiting-1];
+ numWaiting--;
+ } else {
+ assert !s.isIdle;
+ i++;
+ }
}
- if (upto == numWaiting)
- break;
- numWaiting = upto;
}
}
-
- // Now notify any incoming calls to addDocument
- // (above) that are waiting on our line to
- // shrink
- notifyAll();
-
} else {
// Another thread got a docID before me, but, it
// hasn't finished its processing. So add myself to
// the line but don't hold up this thread.
- if (numWaiting == waitingThreadStates.length) {
- ThreadState[] newWaiting = new ThreadState[2*waitingThreadStates.length];
- System.arraycopy(waitingThreadStates, 0, newWaiting, 0, numWaiting);
- waitingThreadStates = newWaiting;
- }
waitingThreadStates[numWaiting++] = state;
}
}
@@ -3135,5 +3225,14 @@
int offsetUpto; // Next write address for offsets
int posStart; // Address of first slice for positions
int posUpto; // Next write address for positions
+ }
+}
+
+// Used only internally to DW to call abort "up the stack"
+class AbortException extends IOException {
+ public AbortException(Throwable cause, DocumentsWriter docWriter) {
+ super();
+ initCause(cause);
+ docWriter.setAborting();
}
}
Modified: lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java?rev=611855&r1=611854&r2=611855&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java Mon Jan 14 09:06:21 2008
@@ -1278,7 +1278,7 @@
if (!success) {
if (infoStream != null)
message("hit exception closing doc store segment");
- docWriter.abort();
+ docWriter.abort(null);
}
}
@@ -1999,7 +1999,7 @@
segmentInfos.clear();
segmentInfos.addAll(rollbackSegmentInfos);
- docWriter.abort();
+ docWriter.abort(null);
// Ask deleter to locate unreferenced files & remove
// them:
@@ -2401,7 +2401,13 @@
private synchronized final boolean doFlush(boolean flushDocStores) throws CorruptIndexException, IOException {
// Make sure no threads are actively adding a document
- docWriter.pauseAllThreads();
+
+ // Returns true if docWriter is currently aborting, in
+ // which case we skip flushing this segment
+ if (docWriter.pauseAllThreads()) {
+ docWriter.resumeAllThreads();
+ return false;
+ }
try {
@@ -2536,7 +2542,7 @@
segmentInfos.remove(segmentInfos.size()-1);
}
if (flushDocs)
- docWriter.abort();
+ docWriter.abort(null);
deletePartialSegmentsFile();
deleter.checkpoint(segmentInfos, false);
Modified: lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java?rev=611855&r1=611854&r2=611855&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java Mon Jan 14 09:06:21 2008
@@ -1887,6 +1887,11 @@
throw new IOException("I'm experiencing problems");
return input.next(result);
}
+
+ public void reset() throws IOException {
+ super.reset();
+ count = 0;
+ }
}
public void testDocumentsWriterExceptions() throws IOException {
@@ -1969,6 +1974,122 @@
}
}
+ public void testDocumentsWriterExceptionThreads() throws IOException {
+ Analyzer analyzer = new Analyzer() {
+ public TokenStream tokenStream(String fieldName, Reader reader) {
+ return new CrashingFilter(fieldName, new WhitespaceTokenizer(reader));
+ }
+ };
+
+ final int NUM_THREAD = 3;
+ final int NUM_ITER = 100;
+
+ for(int i=0;i<2;i++) {
+ MockRAMDirectory dir = new MockRAMDirectory();
+
+ {
+ final IndexWriter writer = new IndexWriter(dir, analyzer);
+
+ final int finalI = i;
+
+ Thread[] threads = new Thread[NUM_THREAD];
+ for(int t=0;t<NUM_THREAD;t++) {
+ threads[t] = new Thread() {
+ public void run() {
+ try {
+ for(int iter=0;iter<NUM_ITER;iter++) {
+ Document doc = new Document();
+ doc.add(new Field("contents", "here are some contents", Field.Store.YES,
+ Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+ writer.addDocument(doc);
+ writer.addDocument(doc);
+ doc.add(new Field("crash", "this should crash after 4 terms", Field.Store.YES,
+ Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+ doc.add(new Field("other", "this will not get indexed", Field.Store.YES,
+ Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+ try {
+ writer.addDocument(doc);
+ fail("did not hit expected exception");
+ } catch (IOException ioe) {
+ }
+
+ if (0 == finalI) {
+ doc = new Document();
+ doc.add(new Field("contents", "here are some contents", Field.Store.YES,
+ Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+ writer.addDocument(doc);
+ writer.addDocument(doc);
+ }
+ }
+ } catch (Throwable t) {
+ synchronized(this) {
+ System.out.println(Thread.currentThread().getName() + ": ERROR: hit unexpected exception");
+ t.printStackTrace(System.out);
+ }
+ fail();
+ }
+ }
+ };
+ threads[t].start();
+ }
+
+ for(int t=0;t<NUM_THREAD;t++)
+ while (true)
+ try {
+ threads[t].join();
+ break;
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+
+ writer.close();
+ }
+
+ IndexReader reader = IndexReader.open(dir);
+ int expected = (3+(1-i)*2)*NUM_THREAD*NUM_ITER;
+ assertEquals(expected, reader.docFreq(new Term("contents", "here")));
+ assertEquals(expected, reader.maxDoc());
+ int numDel = 0;
+ for(int j=0;j<reader.maxDoc();j++) {
+ if (reader.isDeleted(j))
+ numDel++;
+ else
+ reader.document(j);
+ reader.getTermFreqVectors(j);
+ }
+ reader.close();
+
+ assertEquals(NUM_THREAD*NUM_ITER, numDel);
+
+ IndexWriter writer = new IndexWriter(dir, analyzer);
+ writer.setMaxBufferedDocs(10);
+ Document doc = new Document();
+ doc.add(new Field("contents", "here are some contents", Field.Store.YES,
+ Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+ for(int j=0;j<17;j++)
+ writer.addDocument(doc);
+ writer.optimize();
+ writer.close();
+
+ reader = IndexReader.open(dir);
+ expected += 17-NUM_THREAD*NUM_ITER;
+ assertEquals(expected, reader.docFreq(new Term("contents", "here")));
+ assertEquals(expected, reader.maxDoc());
+ numDel = 0;
+ for(int j=0;j<reader.maxDoc();j++) {
+ if (reader.isDeleted(j))
+ numDel++;
+ else
+ reader.document(j);
+ reader.getTermFreqVectors(j);
+ }
+ reader.close();
+ assertEquals(0, numDel);
+
+ dir.close();
+ }
+ }
+
public void testVariableSchema() throws IOException {
MockRAMDirectory dir = new MockRAMDirectory();
int delID = 0;
@@ -2112,4 +2233,358 @@
directory.close();
}
+ // Used by test cases below
+ private class IndexerThread extends Thread {
+
+ boolean diskFull;
+ Throwable error;
+ AlreadyClosedException ace;
+ IndexWriter writer;
+ boolean noErrors;
+
+ public IndexerThread(IndexWriter writer, boolean noErrors) {
+ this.writer = writer;
+ this.noErrors = noErrors;
+ }
+
+ public void run() {
+
+ final Document doc = new Document();
+ doc.add(new Field("field", "aaa bbb ccc ddd eee fff ggg hhh iii jjj", Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+
+ int idUpto = 0;
+ int fullCount = 0;
+
+ while(true) {
+ try {
+ writer.updateDocument(new Term("id", ""+(idUpto++)), doc);
+ } catch (IOException ioe) {
+ if (ioe.getMessage().startsWith("fake disk full at") ||
+ ioe.getMessage().equals("now failing on purpose")) {
+ diskFull = true;
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ if (fullCount++ >= 5)
+ break;
+ } else {
+ if (noErrors) {
+ System.out.println(Thread.currentThread().getName() + ": ERROR: unexpected IOException:");
+ ioe.printStackTrace(System.out);
+ error = ioe;
+ }
+ break;
+ }
+ } catch (Throwable t) {
+ if (noErrors) {
+ System.out.println(Thread.currentThread().getName() + ": ERROR: unexpected Throwable:");
+ t.printStackTrace(System.out);
+ error = t;
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ // LUCENE-1130: make sure we can close() even while
+ // threads are trying to add documents. Strictly
+ // speaking, this isn't valid us of Lucene's APIs, but we
+ // still want to be robust to this case:
+ public void testCloseWithThreads() throws IOException {
+ int NUM_THREADS = 3;
+
+ for(int iter=0;iter<50;iter++) {
+ MockRAMDirectory dir = new MockRAMDirectory();
+ IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer());
+ ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
+
+ writer.setMergeScheduler(cms);
+ writer.setMaxBufferedDocs(10);
+ writer.setMergeFactor(4);
+
+ IndexerThread[] threads = new IndexerThread[NUM_THREADS];
+ boolean diskFull = false;
+
+ for(int i=0;i<NUM_THREADS;i++)
+ threads[i] = new IndexerThread(writer, false);
+
+ for(int i=0;i<NUM_THREADS;i++)
+ threads[i].start();
+
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+
+ writer.close(false);
+
+ // Make sure threads that are adding docs are not hung:
+ for(int i=0;i<NUM_THREADS;i++) {
+ while(true) {
+ try {
+ // Without fix for LUCENE-1130: one of the
+ // threads will hang
+ threads[i].join();
+ break;
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ if (threads[i].isAlive())
+ fail("thread seems to be hung");
+ }
+
+ // Quick test to make sure index is not corrupt:
+ IndexReader reader = IndexReader.open(dir);
+ TermDocs tdocs = reader.termDocs(new Term("field", "aaa"));
+ int count = 0;
+ while(tdocs.next()) {
+ count++;
+ }
+ assertTrue(count > 0);
+ reader.close();
+
+ dir.close();
+ }
+ }
+
+ // LUCENE-1130: make sure immeidate disk full on creating
+ // an IndexWriter (hit during DW.ThreadState.init()) is
+ // OK:
+ public void testImmediateDiskFull() throws IOException {
+ MockRAMDirectory dir = new MockRAMDirectory();
+ IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer());
+ dir.setMaxSizeInBytes(dir.getRecomputedActualSizeInBytes());
+ writer.setMaxBufferedDocs(2);
+ final Document doc = new Document();
+ doc.add(new Field("field", "aaa bbb ccc ddd eee fff ggg hhh iii jjj", Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+ try {
+ writer.addDocument(doc);
+ fail("did not hit disk full");
+ } catch (IOException ioe) {
+ }
+ // Without fix for LUCENE-1130: this call will hang:
+ try {
+ writer.addDocument(doc);
+ fail("did not hit disk full");
+ } catch (IOException ioe) {
+ }
+ try {
+ writer.close(false);
+ fail("did not hit disk full");
+ } catch (IOException ioe) {
+ }
+ }
+
+ // LUCENE-1130: make sure immeidate disk full on creating
+ // an IndexWriter (hit during DW.ThreadState.init()), with
+ // multiple threads, is OK:
+ public void testImmediateDiskFullWithThreads() throws IOException {
+
+ int NUM_THREADS = 3;
+
+ for(int iter=0;iter<10;iter++) {
+ MockRAMDirectory dir = new MockRAMDirectory();
+ IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer());
+ ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
+ // We expect disk full exceptions in the merge threads
+ cms.setSuppressExceptions();
+ writer.setMergeScheduler(cms);
+ writer.setMaxBufferedDocs(2);
+ writer.setMergeFactor(4);
+ dir.setMaxSizeInBytes(4*1024+20*iter);
+
+ IndexerThread[] threads = new IndexerThread[NUM_THREADS];
+ boolean diskFull = false;
+
+ for(int i=0;i<NUM_THREADS;i++)
+ threads[i] = new IndexerThread(writer, true);
+
+ for(int i=0;i<NUM_THREADS;i++)
+ threads[i].start();
+
+ for(int i=0;i<NUM_THREADS;i++) {
+ while(true) {
+ try {
+ // Without fix for LUCENE-1130: one of the
+ // threads will hang
+ threads[i].join();
+ break;
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ if (threads[i].isAlive())
+ fail("thread seems to be hung");
+ else
+ assertTrue("hit unexpected Throwable", threads[i].error == null);
+ }
+
+ try {
+ writer.close(false);
+ } catch (IOException ioe) {
+ }
+
+ dir.close();
+ }
+ }
+
+ // Throws IOException during FieldsWriter.flushDocument and during DocumentsWriter.abort
+ private static class FailOnlyOnAbortOrFlush extends MockRAMDirectory.Failure {
+ public void eval(MockRAMDirectory dir) throws IOException {
+ if (doFail) {
+ StackTraceElement[] trace = new Exception().getStackTrace();
+ for (int i = 0; i < trace.length; i++) {
+ if ("abort".equals(trace[i].getMethodName()) ||
+ "flushDocument".equals(trace[i].getMethodName()))
+ throw new IOException("now failing on purpose");
+ }
+ }
+ }
+ }
+
+ // Runs test, with one thread, using the specific failure
+ // to trigger an IOException
+ public void _testSingleThreadFailure(MockRAMDirectory.Failure failure) throws IOException {
+ MockRAMDirectory dir = new MockRAMDirectory();
+
+ IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer());
+ writer.setMaxBufferedDocs(2);
+ final Document doc = new Document();
+ doc.add(new Field("field", "aaa bbb ccc ddd eee fff ggg hhh iii jjj", Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+
+ for(int i=0;i<6;i++)
+ writer.addDocument(doc);
+
+ dir.failOn(failure);
+ failure.setDoFail();
+ try {
+ writer.addDocument(doc);
+ writer.addDocument(doc);
+ fail("did not hit exception");
+ } catch (IOException ioe) {
+ }
+ failure.clearDoFail();
+ writer.addDocument(doc);
+ writer.close(false);
+ }
+
+ // Runs test, with multiple threads, using the specific
+ // failure to trigger an IOException
+ public void _testMultipleThreadsFailure(MockRAMDirectory.Failure failure) throws IOException {
+
+ int NUM_THREADS = 3;
+
+ for(int iter=0;iter<5;iter++) {
+ MockRAMDirectory dir = new MockRAMDirectory();
+ IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer());
+ ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
+ // We expect disk full exceptions in the merge threads
+ cms.setSuppressExceptions();
+ writer.setMergeScheduler(cms);
+ writer.setMaxBufferedDocs(2);
+ writer.setMergeFactor(4);
+
+ IndexerThread[] threads = new IndexerThread[NUM_THREADS];
+ boolean diskFull = false;
+
+ for(int i=0;i<NUM_THREADS;i++)
+ threads[i] = new IndexerThread(writer, true);
+
+ for(int i=0;i<NUM_THREADS;i++)
+ threads[i].start();
+
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+
+ dir.failOn(failure);
+ failure.setDoFail();
+
+ for(int i=0;i<NUM_THREADS;i++) {
+ while(true) {
+ try {
+ threads[i].join();
+ break;
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ if (threads[i].isAlive())
+ fail("thread seems to be hung");
+ else
+ assertTrue("hit unexpected Throwable", threads[i].error == null);
+ }
+
+ try {
+ writer.close(false);
+ } catch (IOException ioe) {
+ }
+
+ dir.close();
+ }
+ }
+
+ // LUCENE-1130: make sure initial IOException, and then 2nd
+ // IOException during abort(), is OK:
+ public void testIOExceptionDuringAbort() throws IOException {
+ _testSingleThreadFailure(new FailOnlyOnAbortOrFlush());
+ }
+
+ // LUCENE-1130: make sure initial IOException, and then 2nd
+ // IOException during abort(), with multiple threads, is OK:
+ public void testIOExceptionDuringAbortWithThreads() throws IOException {
+ _testMultipleThreadsFailure(new FailOnlyOnAbortOrFlush());
+ }
+
+ // Throws IOException during DocumentsWriter.closeDocStore
+ private static class FailOnlyInCloseDocStore extends MockRAMDirectory.Failure {
+ public void eval(MockRAMDirectory dir) throws IOException {
+ if (doFail) {
+ StackTraceElement[] trace = new Exception().getStackTrace();
+ for (int i = 0; i < trace.length; i++) {
+ if ("closeDocStore".equals(trace[i].getMethodName()))
+ throw new IOException("now failing on purpose");
+ }
+ }
+ }
+ }
+
+ // LUCENE-1130: test IOException in closeDocStore
+ public void testIOExceptionDuringCloseDocStore() throws IOException {
+ _testSingleThreadFailure(new FailOnlyInCloseDocStore());
+ }
+
+ // LUCENE-1130: test IOException in closeDocStore, with threads
+ public void testIOExceptionDuringCloseDocStoreWithThreads() throws IOException {
+ _testMultipleThreadsFailure(new FailOnlyInCloseDocStore());
+ }
+
+ // Throws IOException during DocumentsWriter.writeSegment
+ private static class FailOnlyInWriteSegment extends MockRAMDirectory.Failure {
+ public void eval(MockRAMDirectory dir) throws IOException {
+ if (doFail) {
+ StackTraceElement[] trace = new Exception().getStackTrace();
+ for (int i = 0; i < trace.length; i++) {
+ if ("writeSegment".equals(trace[i].getMethodName()))
+ throw new IOException("now failing on purpose");
+ }
+ }
+ }
+ }
+ // LUCENE-1130: test IOException in writeSegment
+ public void testIOExceptionDuringWriteSegment() throws IOException {
+ _testSingleThreadFailure(new FailOnlyInWriteSegment());
+ }
+
+ // LUCENE-1130: test IOException in writeSegment, with threads
+ public void testIOExceptionDuringWriteSegmentWithThreads() throws IOException {
+ _testMultipleThreadsFailure(new FailOnlyInWriteSegment());
+ }
}
Modified: lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriterDelete.java?rev=611855&r1=611854&r2=611855&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriterDelete.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriterDelete.java Mon Jan 14 09:06:21 2008
@@ -19,7 +19,6 @@
import java.io.IOException;
import java.util.Arrays;
-import java.lang.StackTraceElement;
import org.apache.lucene.util.LuceneTestCase;
@@ -454,7 +453,7 @@
String[] startFiles = dir.list();
SegmentInfos infos = new SegmentInfos();
infos.read(dir);
- IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null);
+ new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null);
String[] endFiles = dir.list();
Arrays.sort(startFiles);
@@ -560,8 +559,19 @@
}
public void eval(MockRAMDirectory dir) throws IOException {
if (sawMaybe && !failed) {
- failed = true;
- throw new IOException("fail after applyDeletes");
+ boolean seen = false;
+ StackTraceElement[] trace = new Exception().getStackTrace();
+ for (int i = 0; i < trace.length; i++) {
+ if ("applyDeletes".equals(trace[i].getMethodName())) {
+ seen = true;
+ break;
+ }
+ }
+ if (!seen) {
+ // Only fail once we are no longer in applyDeletes
+ failed = true;
+ throw new IOException("fail after applyDeletes");
+ }
}
if (!failed) {
StackTraceElement[] trace = new Exception().getStackTrace();
@@ -740,7 +750,7 @@
String[] startFiles = dir.list();
SegmentInfos infos = new SegmentInfos();
infos.read(dir);
- IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null);
+ new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null);
String[] endFiles = dir.list();
if (!Arrays.equals(startFiles, endFiles)) {
Modified: lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMDirectory.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMDirectory.java?rev=611855&r1=611854&r2=611855&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMDirectory.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMDirectory.java Mon Jan 14 09:06:21 2008
@@ -117,7 +117,6 @@
}
void maybeThrowIOException() throws IOException {
- maybeThrowDeterministicException();
if (randomIOExceptionRate > 0.0) {
int number = Math.abs(randomState.nextInt() % 1000);
if (number < randomIOExceptionRate*1000) {
@@ -198,7 +197,7 @@
* RAMOutputStream.BUFFER_SIZE (now 1024) bytes.
*/
- final synchronized long getRecomputedActualSizeInBytes() {
+ public final synchronized long getRecomputedActualSizeInBytes() {
long size = 0;
Iterator it = fileMap.values().iterator();
while (it.hasNext())
@@ -245,6 +244,16 @@
* mock.failOn(failure.reset())
*/
public Failure reset() { return this; }
+
+ protected boolean doFail;
+
+ public void setDoFail() {
+ doFail = true;
+ }
+
+ public void clearDoFail() {
+ doFail = false;
+ }
}
ArrayList failures;
@@ -253,7 +262,7 @@
* add a Failure object to the list of objects to be evaluated
* at every potential failure point
*/
- public void failOn(Failure fail) {
+ synchronized public void failOn(Failure fail) {
if (failures == null) {
failures = new ArrayList();
}
@@ -261,10 +270,10 @@
}
/**
- * Itterate through the failures list, giving each object a
+ * Iterate through the failures list, giving each object a
* chance to throw an IOE
*/
- void maybeThrowDeterministicException() throws IOException {
+ synchronized void maybeThrowDeterministicException() throws IOException {
if (failures != null) {
for(int i = 0; i < failures.size(); i++) {
((Failure)failures.get(i)).eval(this);
Modified: lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMOutputStream.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMOutputStream.java?rev=611855&r1=611854&r2=611855&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMOutputStream.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMOutputStream.java Mon Jan 14 09:06:21 2008
@@ -18,7 +18,6 @@
*/
import java.io.IOException;
-import java.util.Iterator;
/**
* Used by MockRAMDirectory to create an output stream that
@@ -50,6 +49,11 @@
}
}
+ public void flush() throws IOException {
+ dir.maybeThrowDeterministicException();
+ super.flush();
+ }
+
public void writeByte(byte b) throws IOException {
singleByte[0] = b;
writeBytes(singleByte, 0, 1);
@@ -79,6 +83,8 @@
} else {
super.writeBytes(b, offset, len);
}
+
+ dir.maybeThrowDeterministicException();
if (first) {
// Maybe throw random exception; only do this on first