You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-commits@lucene.apache.org by mi...@apache.org on 2008/01/14 18:06:24 UTC

svn commit: r611855 - in /lucene/java/trunk/src: java/org/apache/lucene/index/ test/org/apache/lucene/index/ test/org/apache/lucene/store/

Author: mikemccand
Date: Mon Jan 14 09:06:21 2008
New Revision: 611855

URL: http://svn.apache.org/viewvc?rev=611855&view=rev
Log:
LUCENE-1130: fix thread safety issues when hitting IOExceptions in DocumentsWriter

Modified:
    lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
    lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java
    lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java
    lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
    lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMDirectory.java
    lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMOutputStream.java

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java?rev=611855&r1=611854&r2=611855&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java Mon Jan 14 09:06:21 2008
@@ -251,7 +251,7 @@
 
         message("  merge thread: done");
 
-      } catch (IOException exc) {
+      } catch (Throwable exc) {
 
         if (merge != null) {
           merge.setException(exc);

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=611855&r1=611854&r2=611855&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java Mon Jan 14 09:06:21 2008
@@ -145,11 +145,12 @@
   private ThreadState[] threadStates = new ThreadState[0];
   private final HashMap threadBindings = new HashMap();
   private int numWaiting;
-  private ThreadState[] waitingThreadStates = new ThreadState[1];
+  private final ThreadState[] waitingThreadStates = new ThreadState[MAX_THREAD_STATE];
   private int pauseThreads;                       // Non-zero when we need all threads to
                                                   // pause (eg to flush)
   private boolean flushPending;                   // True when a thread has decided to flush
-  private boolean bufferIsFull;                 // True when it's time to write segment
+  private boolean bufferIsFull;                   // True when it's time to write segment
+  private boolean aborting;                       // True while abort is running
 
   private PrintStream infoStream;
 
@@ -321,74 +322,129 @@
     return files;
   }
 
+  synchronized void setAborting() {
+    aborting = true;
+  }
+
   /** Called if we hit an exception when adding docs,
    *  flushing, etc.  This resets our state, discarding any
-   *  docs added since last flush. */
-  synchronized void abort() throws IOException {
+   *  docs added since last flush.  If ae is non-null, it
+   *  contains the root cause exception (which we re-throw
+   *  after we are done aborting). */
+  synchronized void abort(AbortException ae) throws IOException {
+
+    // Anywhere that throws an AbortException must first
+    // mark aborting to make sure while the exception is
+    // unwinding the un-synchronized stack, no thread grabs
+    // the corrupt ThreadState that hit the aborting
+    // exception:
+    assert ae == null || aborting;
 
-    if (infoStream != null)
-      infoStream.println("docWriter: now abort");
+    try {
 
-    // Forcefully remove waiting ThreadStates from line
-    for(int i=0;i<numWaiting;i++)
-      waitingThreadStates[i].isIdle = true;
-    numWaiting = 0;
+      if (infoStream != null)
+        infoStream.println("docWriter: now abort");
 
-    pauseAllThreads();
+      // Forcefully remove waiting ThreadStates from line
+      for(int i=0;i<numWaiting;i++)
+        waitingThreadStates[i].isIdle = true;
+      numWaiting = 0;
 
-    bufferedDeleteTerms.clear();
-    bufferedDeleteDocIDs.clear();
-    numBufferedDeleteTerms = 0;
+      // Wait for all other threads to finish with DocumentsWriter:
+      pauseAllThreads();
 
-    try {
+      assert 0 == numWaiting;
 
-      abortedFiles = files();
+      try {
 
-      // Discard pending norms:
-      final int numField = fieldInfos.size();
-      for (int i=0;i<numField;i++) {
-        FieldInfo fi = fieldInfos.fieldInfo(i);
-        if (fi.isIndexed && !fi.omitNorms) {
-          BufferedNorms n = norms[i];
-          if (n != null) {
-            n.out.reset();
-            n.reset();
+        bufferedDeleteTerms.clear();
+        bufferedDeleteDocIDs.clear();
+        numBufferedDeleteTerms = 0;
+
+        abortedFiles = files();
+
+        // Discard pending norms:
+        final int numField = fieldInfos.size();
+        for (int i=0;i<numField;i++) {
+          FieldInfo fi = fieldInfos.fieldInfo(i);
+          if (fi.isIndexed && !fi.omitNorms) {
+            BufferedNorms n = norms[i];
+            if (n != null) {
+              n.out.reset();
+              n.reset();
+            }
           }
         }
-      }
-
-      // Reset vectors writer
-      if (tvx != null) {
-        tvx.close();
-        tvf.close();
-        tvd.close();
-        tvx = null;
-      }
 
-      // Reset fields writer
-      if (fieldsWriter != null) {
-        fieldsWriter.close();
-        fieldsWriter = null;
-      }
+        // Reset vectors writer
+        if (tvx != null) {
+          try {
+            tvx.close();
+          } catch (IOException ioe) {
+          }
+          tvx = null;
+        }
+        if (tvd != null) {
+          try {
+            tvd.close();
+          } catch (IOException ioe) {
+          }
+          tvd = null;
+        }
+        if (tvf != null) {
+          try {
+            tvf.close();
+          } catch (IOException ioe) {
+          }
+          tvf = null;
+        }
 
-      // Reset all postings data
-      resetPostingsData();
+        // Reset fields writer
+        if (fieldsWriter != null) {
+          try {
+            fieldsWriter.close();
+          } catch (IOException ioe) {
+          }
+          fieldsWriter = null;
+        }
 
-      // Clear vectors & fields from ThreadStates
-      for(int i=0;i<threadStates.length;i++) {
-        ThreadState state = threadStates[i];
-        if (state.localFieldsWriter != null) {
-          state.localFieldsWriter.close();
-          state.localFieldsWriter = null;
+        // Clear vectors & fields from ThreadStates
+        for(int i=0;i<threadStates.length;i++) {
+          ThreadState state = threadStates[i];
+          if (state.localFieldsWriter != null) {
+            state.localFieldsWriter.close();
+            state.localFieldsWriter = null;
+          }
+          state.tvfLocal.reset();
+          state.fdtLocal.reset();
         }
-        state.tvfLocal.reset();
-        state.fdtLocal.reset();
+
+        // Reset all postings data
+        resetPostingsData();
+
+        docStoreSegment = null;
+        files = null;
+
+      } finally {
+        resumeAllThreads();
       }
-      docStoreSegment = null;
-      files = null;
 
+      // If we have a root cause exception, re-throw it now:
+      if (ae != null) {
+        Throwable t = ae.getCause();
+        if (t instanceof IOException)
+          throw (IOException) t;
+        else if (t instanceof RuntimeException)
+          throw (RuntimeException) t;
+        else if (t instanceof Error)
+          throw (Error) t;
+        else
+          // Should not get here
+          assert false: "unknown exception: " + t;
+      }
     } finally {
-      resumeAllThreads();
+      aborting = false;
+      notifyAll();
     }
   }
 
@@ -412,17 +468,17 @@
     files = null;
   }
 
-  synchronized void pauseAllThreads() {
+  // Returns true if an abort is in progress
+  synchronized boolean pauseAllThreads() {
     pauseThreads++;
-    if (1 == pauseThreads) {
-      while(!allThreadsIdle()) {
-        try {
-          wait();
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        }
+    while(!allThreadsIdle()) {
+      try {
+        wait();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
       }
     }
+    return aborting;
   }
 
   synchronized void resumeAllThreads() {
@@ -444,7 +500,7 @@
   List newFiles;
 
   /** Flush all pending docs to a new segment */
-  int flush(boolean closeDocStore) throws IOException {
+  synchronized int flush(boolean closeDocStore) throws IOException {
 
     assert allThreadsIdle();
 
@@ -456,13 +512,6 @@
 
     docStoreOffset = numDocsInStore;
 
-    if (closeDocStore) {
-      assert docStoreSegment != null;
-      assert docStoreSegment.equals(segment);
-      newFiles.addAll(files());
-      closeDocStore();
-    }
-    
     int docCount;
 
     assert numDocsInRAM > 0;
@@ -474,6 +523,13 @@
 
     try {
 
+      if (closeDocStore) {
+        assert docStoreSegment != null;
+        assert docStoreSegment.equals(segment);
+        newFiles.addAll(files());
+        closeDocStore();
+      }
+    
       fieldInfos.write(directory, segment + ".fnm");
 
       docCount = numDocsInRAM;
@@ -484,7 +540,7 @@
 
     } finally {
       if (!success)
-        abort();
+        abort(null);
     }
 
     return docCount;
@@ -553,7 +609,6 @@
                                           // doc has one
 
     boolean doFlushAfter;
-    boolean abortOnExc;
 
     public ThreadState() {
       fieldDataArray = new FieldData[8];
@@ -574,6 +629,7 @@
         localFieldsWriter.close();
         localFieldsWriter = null;
       }
+      fieldGen = 0;
       maxPostingsVectors = 0;
       doFlushAfter = false;
       postingsPool.reset();
@@ -589,52 +645,56 @@
 
     /** Move all per-document state that was accumulated in
      *  the ThreadState into the "real" stores. */
-    public void writeDocument() throws IOException {
+    public void writeDocument() throws IOException, AbortException {
 
       // If we hit an exception while appending to the
       // stored fields or term vectors files, we have to
       // abort all documents since we last flushed because
       // it means those files are possibly inconsistent.
-      abortOnExc = true;
-
-      // Append stored fields to the real FieldsWriter:
-      fieldsWriter.flushDocument(numStoredFields, fdtLocal);
-      fdtLocal.reset();
-      numStoredFields = 0;
+      try {
 
-      // Append term vectors to the real outputs:
-      if (tvx != null) {
-        tvx.writeLong(tvd.getFilePointer());
-        tvd.writeVInt(numVectorFields);
-        if (numVectorFields > 0) {
-          for(int i=0;i<numVectorFields;i++)
-            tvd.writeVInt(vectorFieldNumbers[i]);
-          assert 0 == vectorFieldPointers[0];
-          tvd.writeVLong(tvf.getFilePointer());
-          long lastPos = vectorFieldPointers[0];
-          for(int i=1;i<numVectorFields;i++) {
-            long pos = vectorFieldPointers[i];
-            tvd.writeVLong(pos-lastPos);
-            lastPos = pos;
-          }
-          tvfLocal.writeTo(tvf);
-          tvfLocal.reset();
+        // Append stored fields to the real FieldsWriter:
+        fieldsWriter.flushDocument(numStoredFields, fdtLocal);
+        fdtLocal.reset();
+
+        // Append term vectors to the real outputs:
+        if (tvx != null) {
+          tvx.writeLong(tvd.getFilePointer());
+          tvd.writeVInt(numVectorFields);
+          if (numVectorFields > 0) {
+            for(int i=0;i<numVectorFields;i++)
+              tvd.writeVInt(vectorFieldNumbers[i]);
+            assert 0 == vectorFieldPointers[0];
+            tvd.writeVLong(tvf.getFilePointer());
+            long lastPos = vectorFieldPointers[0];
+            for(int i=1;i<numVectorFields;i++) {
+              long pos = vectorFieldPointers[i];
+              tvd.writeVLong(pos-lastPos);
+              lastPos = pos;
+            }
+            tvfLocal.writeTo(tvf);
+            tvfLocal.reset();
+          }
         }
-      }
 
-      // Append norms for the fields we saw:
-      for(int i=0;i<numFieldData;i++) {
-        FieldData fp = fieldDataArray[i];
-        if (fp.doNorms) {
-          BufferedNorms bn = norms[fp.fieldInfo.number];
-          assert bn != null;
-          assert bn.upto <= docID;
-          bn.fill(docID);
-          float norm = fp.boost * writer.getSimilarity().lengthNorm(fp.fieldInfo.name, fp.length);
-          bn.add(norm);
-        }
+        // Append norms for the fields we saw:
+        for(int i=0;i<numFieldData;i++) {
+          FieldData fp = fieldDataArray[i];
+          if (fp.doNorms) {
+            BufferedNorms bn = norms[fp.fieldInfo.number];
+            assert bn != null;
+            assert bn.upto <= docID;
+            bn.fill(docID);
+            float norm = fp.boost * writer.getSimilarity().lengthNorm(fp.fieldInfo.name, fp.length);
+            bn.add(norm);
+          }
+        }
+      } catch (Throwable t) {
+        // Forcefully idle this threadstate -- its state will
+        // be reset by abort()
+        isIdle = true;
+        throw new AbortException(t, DocumentsWriter.this);
       }
-      abortOnExc = false;
 
       if (bufferIsFull && !flushPending) {
         flushPending = true;
@@ -642,10 +702,13 @@
       }
     }
 
+    int fieldGen;
+
     /** Initializes shared state for this new document */
-    void init(Document doc, int docID) throws IOException {
+    void init(Document doc, int docID) throws IOException, AbortException {
+
+      assert !isIdle;
 
-      abortOnExc = false;
       this.docID = docID;
       docBoost = doc.getBoost();
       numStoredFields = 0;
@@ -654,7 +717,10 @@
       maxTermPrefix = null;
 
       assert 0 == fdtLocal.length();
+      assert 0 == fdtLocal.getFilePointer();
       assert 0 == tvfLocal.length();
+      assert 0 == tvfLocal.getFilePointer();
+      final int thisFieldGen = fieldGen++;
 
       List docFields = doc.getFields();
       final int numDocFields = docFields.size();
@@ -700,36 +766,37 @@
 
           if (numAllFieldData == allFieldDataArray.length) {
             int newSize = (int) (allFieldDataArray.length*1.5);
+            int newHashSize = fieldDataHash.length*2;
 
             FieldData newArray[] = new FieldData[newSize];
+            FieldData newHashArray[] = new FieldData[newHashSize];
             System.arraycopy(allFieldDataArray, 0, newArray, 0, numAllFieldData);
-            allFieldDataArray = newArray;
 
             // Rehash
-            newSize = fieldDataHash.length*2;
-            newArray = new FieldData[newSize];
             fieldDataHashMask = newSize-1;
             for(int j=0;j<fieldDataHash.length;j++) {
               FieldData fp0 = fieldDataHash[j];
               while(fp0 != null) {
                 hashPos = fp0.fieldInfo.name.hashCode() & fieldDataHashMask;
                 FieldData nextFP0 = fp0.next;
-                fp0.next = newArray[hashPos];
-                newArray[hashPos] = fp0;
+                fp0.next = newHashArray[hashPos];
+                newHashArray[hashPos] = fp0;
                 fp0 = nextFP0;
               }
             }
-            fieldDataHash = newArray;
+
+            allFieldDataArray = newArray;
+            fieldDataHash = newHashArray;
           }
           allFieldDataArray[numAllFieldData++] = fp;
         } else {
           assert fp.fieldInfo == fi;
         }
 
-        if (docID != fp.lastDocID) {
+        if (thisFieldGen != fp.lastGen) {
 
           // First time we're seeing this field for this doc
-          fp.lastDocID = docID;
+          fp.lastGen = thisFieldGen;
           fp.fieldCount = 0;
           fp.doVectors = fp.doVectorPositions = fp.doVectorOffsets = false;
           fp.doNorms = fi.isIndexed && !fi.omitNorms;
@@ -776,7 +843,15 @@
           assert docStoreSegment == null;
           assert segment != null;
           docStoreSegment = segment;
-          fieldsWriter = new FieldsWriter(directory, docStoreSegment, fieldInfos);
+          // If we hit an exception while init'ing the
+          // fieldsWriter, we must abort this segment
+          // because those files will be in an unknown
+          // state:
+          try {
+            fieldsWriter = new FieldsWriter(directory, docStoreSegment, fieldInfos);
+          } catch (Throwable t) {
+            throw new AbortException(t, DocumentsWriter.this);
+          }
           files = null;
         }
         localFieldsWriter = new FieldsWriter(null, fdtLocal, fieldInfos);
@@ -787,18 +862,26 @@
       if (docHasVectors) {
         if (tvx == null) {
           assert docStoreSegment != null;
-          tvx = directory.createOutput(docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION);
-          tvx.writeInt(TermVectorsReader.FORMAT_VERSION);
-          tvd = directory.createOutput(docStoreSegment +  "." + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION);
-          tvd.writeInt(TermVectorsReader.FORMAT_VERSION);
-          tvf = directory.createOutput(docStoreSegment +  "." + IndexFileNames.VECTORS_FIELDS_EXTENSION);
-          tvf.writeInt(TermVectorsReader.FORMAT_VERSION);
+          // If we hit an exception while init'ing the term
+          // vector output files, we must abort this segment
+          // because those files will be in an unknown
+          // state:
+          try {
+            tvx = directory.createOutput(docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION);
+            tvx.writeInt(TermVectorsReader.FORMAT_VERSION);
+            tvd = directory.createOutput(docStoreSegment +  "." + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION);
+            tvd.writeInt(TermVectorsReader.FORMAT_VERSION);
+            tvf = directory.createOutput(docStoreSegment +  "." + IndexFileNames.VECTORS_FIELDS_EXTENSION);
+            tvf.writeInt(TermVectorsReader.FORMAT_VERSION);
+
+            // We must "catch up" for all docIDs that had no
+            // vectors before this one
+            for(int i=0;i<docID;i++)
+              tvx.writeLong(0);
+          } catch (Throwable t) {
+            throw new AbortException(t, DocumentsWriter.this);
+          }
           files = null;
-
-          // We must "catch up" for all docIDs that had no
-          // vectors before this one
-          for(int i=0;i<docID;i++)
-            tvx.writeLong(0);
         }
 
         numVectorFields = 0;
@@ -929,7 +1012,7 @@
       int upto = 0;
       for(int i=0;i<numAllFieldData;i++) {
         FieldData fp = allFieldDataArray[i];
-        if (fp.lastDocID == -1) {
+        if (fp.lastGen == -1) {
           // This field was not seen since the previous
           // flush, so, free up its resources now
 
@@ -953,7 +1036,7 @@
 
         } else {
           // Reset
-          fp.lastDocID = -1;
+          fp.lastGen = -1;
           allFieldDataArray[upto++] = fp;
           
           if (fp.numPostings > 0 && ((float) fp.numPostings) / fp.postingsHashSize < 0.2) {
@@ -996,7 +1079,7 @@
 
     /** Tokenizes the fields of a document into Postings */
     void processDocument(Analyzer analyzer)
-      throws IOException {
+      throws IOException, AbortException {
 
       final int numFields = numFieldData;
 
@@ -1215,7 +1298,7 @@
       int fieldCount;
       Fieldable[] docFields = new Fieldable[1];
 
-      int lastDocID = -1;
+      int lastGen = -1;
       FieldData next;
 
       boolean doNorms;
@@ -1284,7 +1367,7 @@
       }
 
       /** Process all occurrences of one field in the document. */
-      public void processField(Analyzer analyzer) throws IOException {
+      public void processField(Analyzer analyzer) throws IOException, AbortException {
         length = 0;
         position = 0;
         offset = 0;
@@ -1316,10 +1399,8 @@
                 // contents of fdtLocal can be corrupt, so
                 // we must discard all stored fields for
                 // this document:
-                if (!success) {
-                  numStoredFields = 0;
+                if (!success)
                   fdtLocal.reset();
-                }
               }
             }
 
@@ -1354,7 +1435,7 @@
       Token localToken = new Token();
 
       /* Invert one occurrence of one field in the document */
-      public void invertField(Fieldable field, Analyzer analyzer, final int maxFieldLength) throws IOException {
+      public void invertField(Fieldable field, Analyzer analyzer, final int maxFieldLength) throws IOException, AbortException {
 
         if (length>0)
           position += analyzer.getPositionIncrementGap(fieldInfo.name);
@@ -1475,7 +1556,7 @@
        *  for every term of every document.  Its job is to *
        *  update the postings byte stream (Postings hash) *
        *  based on the occurence of a single term. */
-      private void addPosition(Token token) {
+      private void addPosition(Token token) throws AbortException {
 
         final Payload payload = token.getPayload();
 
@@ -1519,167 +1600,168 @@
         // partially written and thus inconsistent if
         // flushed, so we have to abort all documents
         // since the last flush:
-        abortOnExc = true;
 
-        if (p != null) {       // term seen since last flush
+        try {
 
-          if (docID != p.lastDocID) { // term not yet seen in this doc
-            
-            // System.out.println("    seen before (new docID=" + docID + ") freqUpto=" + p.freqUpto +" proxUpto=" + p.proxUpto);
+          if (p != null) {       // term seen since last flush
 
-            assert p.docFreq > 0;
+            if (docID != p.lastDocID) { // term not yet seen in this doc
+            
+              // System.out.println("    seen before (new docID=" + docID + ") freqUpto=" + p.freqUpto +" proxUpto=" + p.proxUpto);
 
-            // Now that we know doc freq for previous doc,
-            // write it & lastDocCode
-            freqUpto = p.freqUpto & BYTE_BLOCK_MASK;
-            freq = postingsPool.buffers[p.freqUpto >> BYTE_BLOCK_SHIFT];
-            if (1 == p.docFreq)
-              writeFreqVInt(p.lastDocCode|1);
-            else {
-              writeFreqVInt(p.lastDocCode);
-              writeFreqVInt(p.docFreq);
-            }
-            p.freqUpto = freqUpto + (p.freqUpto & BYTE_BLOCK_NOT_MASK);
+              assert p.docFreq > 0;
 
-            if (doVectors) {
-              vector = addNewVector();
-              if (doVectorOffsets) {
-                offsetStartCode = offsetStart = offset + token.startOffset();
-                offsetEnd = offset + token.endOffset();
+              // Now that we know doc freq for previous doc,
+              // write it & lastDocCode
+              freqUpto = p.freqUpto & BYTE_BLOCK_MASK;
+              freq = postingsPool.buffers[p.freqUpto >> BYTE_BLOCK_SHIFT];
+              if (1 == p.docFreq)
+                writeFreqVInt(p.lastDocCode|1);
+              else {
+                writeFreqVInt(p.lastDocCode);
+                writeFreqVInt(p.docFreq);
               }
-            }
+              p.freqUpto = freqUpto + (p.freqUpto & BYTE_BLOCK_NOT_MASK);
 
-            proxCode = position;
-
-            p.docFreq = 1;
-
-            // Store code so we can write this after we're
-            // done with this new doc
-            p.lastDocCode = (docID-p.lastDocID) << 1;
-            p.lastDocID = docID;
+              if (doVectors) {
+                vector = addNewVector();
+                if (doVectorOffsets) {
+                  offsetStartCode = offsetStart = offset + token.startOffset();
+                  offsetEnd = offset + token.endOffset();
+                }
+              }
 
-          } else {                                // term already seen in this doc
-            // System.out.println("    seen before (same docID=" + docID + ") proxUpto=" + p.proxUpto);
-            p.docFreq++;
+              proxCode = position;
 
-            proxCode = position-p.lastPosition;
+              p.docFreq = 1;
 
-            if (doVectors) {
-              vector = p.vector;
-              if (vector == null)
-                vector = addNewVector();
-              if (doVectorOffsets) {
-                offsetStart = offset + token.startOffset();
-                offsetEnd = offset + token.endOffset();
-                offsetStartCode = offsetStart-vector.lastOffset;
+              // Store code so we can write this after we're
+              // done with this new doc
+              p.lastDocCode = (docID-p.lastDocID) << 1;
+              p.lastDocID = docID;
+
+            } else {                                // term already seen in this doc
+              // System.out.println("    seen before (same docID=" + docID + ") proxUpto=" + p.proxUpto);
+              p.docFreq++;
+
+              proxCode = position-p.lastPosition;
+
+              if (doVectors) {
+                vector = p.vector;
+                if (vector == null)
+                  vector = addNewVector();
+                if (doVectorOffsets) {
+                  offsetStart = offset + token.startOffset();
+                  offsetEnd = offset + token.endOffset();
+                  offsetStartCode = offsetStart-vector.lastOffset;
+                }
               }
             }
-          }
-        } else {					  // term not seen before
-          // System.out.println("    never seen docID=" + docID);
+          } else {					  // term not seen before
+            // System.out.println("    never seen docID=" + docID);
 
-          // Refill?
-          if (0 == postingsFreeCount) {
-            postingsFreeCount = postingsFreeList.length;
-            getPostings(postingsFreeList);
-          }
-
-          final int textLen1 = 1+tokenTextLen;
-          if (textLen1 + charPool.byteUpto > CHAR_BLOCK_SIZE) {
-            if (textLen1 > CHAR_BLOCK_SIZE) {
-              // Just skip this term, to remain as robust as
-              // possible during indexing.  A TokenFilter
-              // can be inserted into the analyzer chain if
-              // other behavior is wanted (pruning the term
-              // to a prefix, throwing an exception, etc).
-              abortOnExc = false;
-              if (maxTermPrefix == null)
-                maxTermPrefix = new String(tokenText, 0, 30);
-
-              // Still increment position:
-              position++;
-              return;
+            // Refill?
+            if (0 == postingsFreeCount) {
+              postingsFreeCount = postingsFreeList.length;
+              getPostings(postingsFreeList);
             }
-            charPool.nextBuffer();
-          }
-          final char[] text = charPool.buffer;
-          final int textUpto = charPool.byteUpto;
 
-          // Pull next free Posting from free list
-          p = postingsFreeList[--postingsFreeCount];
+            final int textLen1 = 1+tokenTextLen;
+            if (textLen1 + charPool.byteUpto > CHAR_BLOCK_SIZE) {
+              if (textLen1 > CHAR_BLOCK_SIZE) {
+                // Just skip this term, to remain as robust as
+                // possible during indexing.  A TokenFilter
+                // can be inserted into the analyzer chain if
+                // other behavior is wanted (pruning the term
+                // to a prefix, throwing an exception, etc).
+                if (maxTermPrefix == null)
+                  maxTermPrefix = new String(tokenText, 0, 30);
+
+                // Still increment position:
+                position++;
+                return;
+              }
+              charPool.nextBuffer();
+            }
+            final char[] text = charPool.buffer;
+            final int textUpto = charPool.byteUpto;
+
+            // Pull next free Posting from free list
+            p = postingsFreeList[--postingsFreeCount];
 
-          p.textStart = textUpto + charPool.byteOffset;
-          charPool.byteUpto += textLen1;
+            p.textStart = textUpto + charPool.byteOffset;
+            charPool.byteUpto += textLen1;
 
-          System.arraycopy(tokenText, 0, text, textUpto, tokenTextLen);
+            System.arraycopy(tokenText, 0, text, textUpto, tokenTextLen);
 
-          text[textUpto+tokenTextLen] = 0xffff;
+            text[textUpto+tokenTextLen] = 0xffff;
           
-          assert postingsHash[hashPos] == null;
+            assert postingsHash[hashPos] == null;
 
-          postingsHash[hashPos] = p;
-          numPostings++;
+            postingsHash[hashPos] = p;
+            numPostings++;
 
-          if (numPostings == postingsHashHalfSize)
-            rehashPostings(2*postingsHashSize);
+            if (numPostings == postingsHashHalfSize)
+              rehashPostings(2*postingsHashSize);
 
-          // Init first slice for freq & prox streams
-          final int firstSize = levelSizeArray[0];
+            // Init first slice for freq & prox streams
+            final int firstSize = levelSizeArray[0];
 
-          final int upto1 = postingsPool.newSlice(firstSize);
-          p.freqStart = p.freqUpto = postingsPool.byteOffset + upto1;
+            final int upto1 = postingsPool.newSlice(firstSize);
+            p.freqStart = p.freqUpto = postingsPool.byteOffset + upto1;
 
-          final int upto2 = postingsPool.newSlice(firstSize);
-          p.proxStart = p.proxUpto = postingsPool.byteOffset + upto2;
+            final int upto2 = postingsPool.newSlice(firstSize);
+            p.proxStart = p.proxUpto = postingsPool.byteOffset + upto2;
 
-          p.lastDocCode = docID << 1;
-          p.lastDocID = docID;
-          p.docFreq = 1;
+            p.lastDocCode = docID << 1;
+            p.lastDocID = docID;
+            p.docFreq = 1;
 
-          if (doVectors) {
-            vector = addNewVector();
-            if (doVectorOffsets) {
-              offsetStart = offsetStartCode = offset + token.startOffset();
-              offsetEnd = offset + token.endOffset();
+            if (doVectors) {
+              vector = addNewVector();
+              if (doVectorOffsets) {
+                offsetStart = offsetStartCode = offset + token.startOffset();
+                offsetEnd = offset + token.endOffset();
+              }
             }
-          }
 
-          proxCode = position;
-        }
-
-        proxUpto = p.proxUpto & BYTE_BLOCK_MASK;
-        prox = postingsPool.buffers[p.proxUpto >> BYTE_BLOCK_SHIFT];
-        assert prox != null;
+            proxCode = position;
+          }
 
-        if (payload != null && payload.length > 0) {
-          writeProxVInt((proxCode<<1)|1);
-          writeProxVInt(payload.length);
-          writeProxBytes(payload.data, payload.offset, payload.length);
-          fieldInfo.storePayloads = true;
-        } else
-          writeProxVInt(proxCode<<1);
+          proxUpto = p.proxUpto & BYTE_BLOCK_MASK;
+          prox = postingsPool.buffers[p.proxUpto >> BYTE_BLOCK_SHIFT];
+          assert prox != null;
+
+          if (payload != null && payload.length > 0) {
+            writeProxVInt((proxCode<<1)|1);
+            writeProxVInt(payload.length);
+            writeProxBytes(payload.data, payload.offset, payload.length);
+            fieldInfo.storePayloads = true;
+          } else
+            writeProxVInt(proxCode<<1);
 
-        p.proxUpto = proxUpto + (p.proxUpto & BYTE_BLOCK_NOT_MASK);
+          p.proxUpto = proxUpto + (p.proxUpto & BYTE_BLOCK_NOT_MASK);
 
-        p.lastPosition = position++;
+          p.lastPosition = position++;
 
-        if (doVectorPositions) {
-          posUpto = vector.posUpto & BYTE_BLOCK_MASK;
-          pos = vectorsPool.buffers[vector.posUpto >> BYTE_BLOCK_SHIFT];
-          writePosVInt(proxCode);
-          vector.posUpto = posUpto + (vector.posUpto & BYTE_BLOCK_NOT_MASK);
-        }
+          if (doVectorPositions) {
+            posUpto = vector.posUpto & BYTE_BLOCK_MASK;
+            pos = vectorsPool.buffers[vector.posUpto >> BYTE_BLOCK_SHIFT];
+            writePosVInt(proxCode);
+            vector.posUpto = posUpto + (vector.posUpto & BYTE_BLOCK_NOT_MASK);
+          }
 
-        if (doVectorOffsets) {
-          offsetUpto = vector.offsetUpto & BYTE_BLOCK_MASK;
-          offsets = vectorsPool.buffers[vector.offsetUpto >> BYTE_BLOCK_SHIFT];
-          writeOffsetVInt(offsetStartCode);
-          writeOffsetVInt(offsetEnd-offsetStart);
-          vector.lastOffset = offsetEnd;
-          vector.offsetUpto = offsetUpto + (vector.offsetUpto & BYTE_BLOCK_NOT_MASK);
+          if (doVectorOffsets) {
+            offsetUpto = vector.offsetUpto & BYTE_BLOCK_MASK;
+            offsets = vectorsPool.buffers[vector.offsetUpto >> BYTE_BLOCK_SHIFT];
+            writeOffsetVInt(offsetStartCode);
+            writeOffsetVInt(offsetEnd-offsetStart);
+            vector.lastOffset = offsetEnd;
+            vector.offsetUpto = offsetUpto + (vector.offsetUpto & BYTE_BLOCK_NOT_MASK);
+          }
+        } catch (Throwable t) {
+          throw new AbortException(t, DocumentsWriter.this);
         }
-
-        abortOnExc = false;
       }
 
       /** Called when postings hash is too small (> 50%
@@ -2209,6 +2291,7 @@
 
   synchronized void close() {
     closed = true;
+    notifyAll();
   }
 
   /** Returns a free (idle) ThreadState that may be used for
@@ -2247,7 +2330,7 @@
     // Next, wait until my thread state is idle (in case
     // it's shared with other threads) and for threads to
     // not be paused nor a flush pending:
-    while(!state.isIdle || pauseThreads != 0 || flushPending)
+    while(!closed && (!state.isIdle || pauseThreads != 0 || flushPending || aborting))
       try {
         wait();
       } catch (InterruptedException e) {
@@ -2275,28 +2358,31 @@
 
     state.isIdle = false;
 
-    boolean success = false;
     try {
-      state.init(doc, nextDocID++);
-
-      if (delTerm != null) {
-        addDeleteTerm(delTerm, state.docID);
-        if (!state.doFlushAfter)
-          state.doFlushAfter = timeToFlushDeletes();
-      }
-
-      success = true;
-    } finally {
-      if (!success) {
-        synchronized(this) {
+      boolean success = false;
+      try {
+        state.init(doc, nextDocID);
+        if (delTerm != null) {
+          addDeleteTerm(delTerm, state.docID);
+          if (!state.doFlushAfter)
+            state.doFlushAfter = timeToFlushDeletes();
+        }
+        // Only increment nextDocID on successful init
+        nextDocID++;
+        success = true;
+      } finally {
+        if (!success) {
+          // Forcefully idle this ThreadState:
           state.isIdle = true;
+          notifyAll();
           if (state.doFlushAfter) {
             state.doFlushAfter = false;
             flushPending = false;
           }
-          notifyAll();
         }
       }
+    } catch (AbortException ae) {
+      abort(ae);
     }
 
     return state;
@@ -2319,32 +2405,30 @@
 
     // This call is synchronized but fast
     final ThreadState state = getThreadState(doc, delTerm);
-    boolean success = false;
     try {
+      boolean success = false;
       try {
-        // This call is not synchronized and does all the work
-        state.processDocument(analyzer);
+        try {
+          // This call is not synchronized and does all the work
+          state.processDocument(analyzer);
+        } finally {
+          // This call is synchronized but fast
+          finishDocument(state);
+        }
+        success = true;
       } finally {
-        // This call is synchronized but fast
-        finishDocument(state);
-      }
-      success = true;
-    } finally {
-      if (!success) {
-        synchronized(this) {
-          state.isIdle = true;
-          if (state.abortOnExc)
-            // Abort all buffered docs since last flush
-            abort();
-          else
+        if (!success) {
+          synchronized(this) {
             // Immediately mark this document as deleted
             // since likely it was partially added.  This
             // keeps indexing as "all or none" (atomic) when
             // adding a document:
             addDeleteDocID(state.docID);
-          notifyAll();
+          }
         }
       }
+    } catch (AbortException ae) {
+      abort(ae);
     }
 
     return state.doFlushAfter || timeToFlushDeletes();
@@ -2467,51 +2551,57 @@
 
   /** Does the synchronized work to finish/flush the
    * inverted document. */
-  private synchronized void finishDocument(ThreadState state) throws IOException {
+  private synchronized void finishDocument(ThreadState state) throws IOException, AbortException {
+    if (aborting) {
+      // Forcefully idle this threadstate -- its state will
+      // be reset by abort()
+      state.isIdle = true;
+      notifyAll();
+      return;
+    }
 
     // Now write the indexed document to the real files.
-
     if (nextWriteDocID == state.docID) {
       // It's my turn, so write everything now:
-      state.isIdle = true;
       nextWriteDocID++;
       state.writeDocument();
+      state.isIdle = true;
+      notifyAll();
 
       // If any states were waiting on me, sweep through and
       // flush those that are enabled by my write.
       if (numWaiting > 0) {
-        while(true) {
-          int upto = 0;
-          for(int i=0;i<numWaiting;i++) {
-            ThreadState s = waitingThreadStates[i];
+        boolean any = true;
+        while(any) {
+          any = false;
+          for(int i=0;i<numWaiting;) {
+            final ThreadState s = waitingThreadStates[i];
             if (s.docID == nextWriteDocID) {
+              s.writeDocument();
               s.isIdle = true;
               nextWriteDocID++;
-              s.writeDocument();
-            } else
-              // Compact as we go
-              waitingThreadStates[upto++] = waitingThreadStates[i];
+              any = true;
+              if (numWaiting > i+1)
+                // Swap in the last waiting state to fill in
+                // the hole we just created.  It's important
+                // to do this as-we-go and not at the end of
+                // the loop, because if we hit an aborting
+                // exception in one of the s.writeDocument
+                // calls (above), it leaves this array in an
+                // inconsistent state:
+                waitingThreadStates[i] = waitingThreadStates[numWaiting-1];
+              numWaiting--;
+            } else {
+              assert !s.isIdle;
+              i++;
+            }
           }
-          if (upto == numWaiting) 
-            break;
-          numWaiting = upto;
         }
       }
-
-      // Now notify any incoming calls to addDocument
-      // (above) that are waiting on our line to
-      // shrink
-      notifyAll();
-
     } else {
       // Another thread got a docID before me, but, it
       // hasn't finished its processing.  So add myself to
       // the line but don't hold up this thread.
-      if (numWaiting == waitingThreadStates.length) {
-        ThreadState[] newWaiting = new ThreadState[2*waitingThreadStates.length];
-        System.arraycopy(waitingThreadStates, 0, newWaiting, 0, numWaiting);
-        waitingThreadStates = newWaiting;
-      }
       waitingThreadStates[numWaiting++] = state;
     }
   }
@@ -3135,5 +3225,14 @@
     int offsetUpto;                                 // Next write address for offsets
     int posStart;                                   // Address of first slice for positions
     int posUpto;                                    // Next write address for positions
+  }
+}
+
+// Used only internally to DW to call abort "up the stack"
+class AbortException extends IOException {
+  public AbortException(Throwable cause, DocumentsWriter docWriter) {
+    super();
+    initCause(cause);
+    docWriter.setAborting();
   }
 }

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java?rev=611855&r1=611854&r2=611855&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java Mon Jan 14 09:06:21 2008
@@ -1278,7 +1278,7 @@
         if (!success) {
           if (infoStream != null)
             message("hit exception closing doc store segment");
-          docWriter.abort();
+          docWriter.abort(null);
         }
       }
 
@@ -1999,7 +1999,7 @@
         segmentInfos.clear();
         segmentInfos.addAll(rollbackSegmentInfos);
 
-        docWriter.abort();
+        docWriter.abort(null);
 
         // Ask deleter to locate unreferenced files & remove
         // them:
@@ -2401,7 +2401,13 @@
   private synchronized final boolean doFlush(boolean flushDocStores) throws CorruptIndexException, IOException {
 
     // Make sure no threads are actively adding a document
-    docWriter.pauseAllThreads();
+
+    // Returns true if docWriter is currently aborting, in
+    // which case we skip flushing this segment
+    if (docWriter.pauseAllThreads()) {
+      docWriter.resumeAllThreads();
+      return false;
+    }
 
     try {
 
@@ -2536,7 +2542,7 @@
                 segmentInfos.remove(segmentInfos.size()-1);
             }
             if (flushDocs)
-              docWriter.abort();
+              docWriter.abort(null);
             deletePartialSegmentsFile();
             deleter.checkpoint(segmentInfos, false);
 

Modified: lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java?rev=611855&r1=611854&r2=611855&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java Mon Jan 14 09:06:21 2008
@@ -1887,6 +1887,11 @@
         throw new IOException("I'm experiencing problems");
       return input.next(result);
     }
+
+    public void reset() throws IOException {
+      super.reset();
+      count = 0;
+    }
   }
 
   public void testDocumentsWriterExceptions() throws IOException {
@@ -1969,6 +1974,122 @@
     }
   }
 
+  public void testDocumentsWriterExceptionThreads() throws IOException {
+    Analyzer analyzer = new Analyzer() {
+      public TokenStream tokenStream(String fieldName, Reader reader) {
+        return new CrashingFilter(fieldName, new WhitespaceTokenizer(reader));
+      }
+    };
+
+    final int NUM_THREAD = 3;
+    final int NUM_ITER = 100;
+
+    for(int i=0;i<2;i++) {
+      MockRAMDirectory dir = new MockRAMDirectory();
+
+      {
+        final IndexWriter writer = new IndexWriter(dir, analyzer);
+
+        final int finalI = i;
+
+        Thread[] threads = new Thread[NUM_THREAD];
+        for(int t=0;t<NUM_THREAD;t++) {
+          threads[t] = new Thread() {
+              public void run() {
+                try {
+                  for(int iter=0;iter<NUM_ITER;iter++) {
+                    Document doc = new Document();
+                    doc.add(new Field("contents", "here are some contents", Field.Store.YES,
+                                      Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+                    writer.addDocument(doc);
+                    writer.addDocument(doc);
+                    doc.add(new Field("crash", "this should crash after 4 terms", Field.Store.YES,
+                                      Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+                    doc.add(new Field("other", "this will not get indexed", Field.Store.YES,
+                                      Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+                    try {
+                      writer.addDocument(doc);
+                      fail("did not hit expected exception");
+                    } catch (IOException ioe) {
+                    }
+
+                    if (0 == finalI) {
+                      doc = new Document();
+                      doc.add(new Field("contents", "here are some contents", Field.Store.YES,
+                                        Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+                      writer.addDocument(doc);
+                      writer.addDocument(doc);
+                    }
+                  }
+                } catch (Throwable t) {
+                  synchronized(this) {
+                    System.out.println(Thread.currentThread().getName() + ": ERROR: hit unexpected exception");
+                    t.printStackTrace(System.out);
+                  }
+                  fail();
+                }
+              }
+            };
+          threads[t].start();
+        }
+
+        for(int t=0;t<NUM_THREAD;t++)
+          while (true)
+            try {
+              threads[t].join();
+              break;
+            } catch (InterruptedException ie) {
+              Thread.currentThread().interrupt();
+            }            
+            
+        writer.close();
+      }
+
+      IndexReader reader = IndexReader.open(dir);
+      int expected = (3+(1-i)*2)*NUM_THREAD*NUM_ITER;
+      assertEquals(expected, reader.docFreq(new Term("contents", "here")));
+      assertEquals(expected, reader.maxDoc());
+      int numDel = 0;
+      for(int j=0;j<reader.maxDoc();j++) {
+        if (reader.isDeleted(j))
+          numDel++;
+        else
+          reader.document(j);
+        reader.getTermFreqVectors(j);
+      }
+      reader.close();
+
+      assertEquals(NUM_THREAD*NUM_ITER, numDel);
+
+      IndexWriter writer = new IndexWriter(dir, analyzer);
+      writer.setMaxBufferedDocs(10);
+      Document doc = new Document();
+      doc.add(new Field("contents", "here are some contents", Field.Store.YES,
+                        Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+      for(int j=0;j<17;j++)
+        writer.addDocument(doc);
+      writer.optimize();
+      writer.close();
+
+      reader = IndexReader.open(dir);
+      expected += 17-NUM_THREAD*NUM_ITER;
+      assertEquals(expected, reader.docFreq(new Term("contents", "here")));
+      assertEquals(expected, reader.maxDoc());
+      numDel = 0;
+      for(int j=0;j<reader.maxDoc();j++) {
+        if (reader.isDeleted(j))
+          numDel++;
+        else
+          reader.document(j);
+        reader.getTermFreqVectors(j);
+      }
+      reader.close();
+      assertEquals(0, numDel);
+
+      dir.close();
+    }
+  }
+
   public void testVariableSchema() throws IOException {
     MockRAMDirectory dir = new MockRAMDirectory();
     int delID = 0;
@@ -2112,4 +2233,358 @@
     directory.close();
   }
 
+  // Used by test cases below
+  private class IndexerThread extends Thread {
+
+    boolean diskFull;
+    Throwable error;
+    AlreadyClosedException ace;
+    IndexWriter writer;
+    boolean noErrors;
+
+    public IndexerThread(IndexWriter writer, boolean noErrors) {
+      this.writer = writer;
+      this.noErrors = noErrors;
+    }
+
+    public void run() {
+
+      final Document doc = new Document();
+      doc.add(new Field("field", "aaa bbb ccc ddd eee fff ggg hhh iii jjj", Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+
+      int idUpto = 0;
+      int fullCount = 0;
+
+      while(true) {
+        try {
+          writer.updateDocument(new Term("id", ""+(idUpto++)), doc);
+        } catch (IOException ioe) {
+          if (ioe.getMessage().startsWith("fake disk full at") ||
+              ioe.getMessage().equals("now failing on purpose")) {
+            diskFull = true;
+            try {
+              Thread.sleep(1);
+            } catch (InterruptedException ie) {
+              Thread.currentThread().interrupt();
+            }
+            if (fullCount++ >= 5)
+              break;
+          } else {
+            if (noErrors) {
+              System.out.println(Thread.currentThread().getName() + ": ERROR: unexpected IOException:");
+              ioe.printStackTrace(System.out);
+              error = ioe;
+            }
+            break;
+          }
+        } catch (Throwable t) {
+          if (noErrors) {
+            System.out.println(Thread.currentThread().getName() + ": ERROR: unexpected Throwable:");
+            t.printStackTrace(System.out);
+            error = t;
+          }
+          break;
+        }
+      }
+    }
+  }
+
+  // LUCENE-1130: make sure we can close() even while
+  // threads are trying to add documents.  Strictly
+  // speaking, this isn't valid us of Lucene's APIs, but we
+  // still want to be robust to this case:
+  public void testCloseWithThreads() throws IOException {
+    int NUM_THREADS = 3;
+
+    for(int iter=0;iter<50;iter++) {
+      MockRAMDirectory dir = new MockRAMDirectory();
+      IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer());
+      ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
+
+      writer.setMergeScheduler(cms);
+      writer.setMaxBufferedDocs(10);
+      writer.setMergeFactor(4);
+
+      IndexerThread[] threads = new IndexerThread[NUM_THREADS];
+      boolean diskFull = false;
+
+      for(int i=0;i<NUM_THREADS;i++)
+        threads[i] = new IndexerThread(writer, false);
+
+      for(int i=0;i<NUM_THREADS;i++)
+        threads[i].start();
+
+      try {
+        Thread.sleep(50);
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      }
+
+      writer.close(false);
+
+      // Make sure threads that are adding docs are not hung:
+      for(int i=0;i<NUM_THREADS;i++) {
+        while(true) {
+          try {
+            // Without fix for LUCENE-1130: one of the
+            // threads will hang
+            threads[i].join();
+            break;
+          } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+          }
+        }
+        if (threads[i].isAlive())
+          fail("thread seems to be hung");
+      }
+
+      // Quick test to make sure index is not corrupt:
+      IndexReader reader = IndexReader.open(dir);
+      TermDocs tdocs = reader.termDocs(new Term("field", "aaa"));
+      int count = 0;
+      while(tdocs.next()) {
+        count++;
+      }
+      assertTrue(count > 0);
+      reader.close();
+      
+      dir.close();
+    }
+  }
+
+  // LUCENE-1130: make sure immeidate disk full on creating
+  // an IndexWriter (hit during DW.ThreadState.init()) is
+  // OK:
+  public void testImmediateDiskFull() throws IOException {
+    MockRAMDirectory dir = new MockRAMDirectory();
+    IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer());
+    dir.setMaxSizeInBytes(dir.getRecomputedActualSizeInBytes());
+    writer.setMaxBufferedDocs(2);
+    final Document doc = new Document();
+    doc.add(new Field("field", "aaa bbb ccc ddd eee fff ggg hhh iii jjj", Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+    try {
+      writer.addDocument(doc);
+      fail("did not hit disk full");
+    } catch (IOException ioe) {
+    }
+    // Without fix for LUCENE-1130: this call will hang:
+    try {
+      writer.addDocument(doc);
+      fail("did not hit disk full");
+    } catch (IOException ioe) {
+    }
+    try {
+      writer.close(false);
+      fail("did not hit disk full");
+    } catch (IOException ioe) {
+    }
+  }
+
+  // LUCENE-1130: make sure immeidate disk full on creating
+  // an IndexWriter (hit during DW.ThreadState.init()), with
+  // multiple threads, is OK:
+  public void testImmediateDiskFullWithThreads() throws IOException {
+
+    int NUM_THREADS = 3;
+
+    for(int iter=0;iter<10;iter++) {
+      MockRAMDirectory dir = new MockRAMDirectory();
+      IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer());
+      ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
+      // We expect disk full exceptions in the merge threads
+      cms.setSuppressExceptions();
+      writer.setMergeScheduler(cms);
+      writer.setMaxBufferedDocs(2);
+      writer.setMergeFactor(4);
+      dir.setMaxSizeInBytes(4*1024+20*iter);
+
+      IndexerThread[] threads = new IndexerThread[NUM_THREADS];
+      boolean diskFull = false;
+
+      for(int i=0;i<NUM_THREADS;i++)
+        threads[i] = new IndexerThread(writer, true);
+
+      for(int i=0;i<NUM_THREADS;i++)
+        threads[i].start();
+
+      for(int i=0;i<NUM_THREADS;i++) {
+        while(true) {
+          try {
+            // Without fix for LUCENE-1130: one of the
+            // threads will hang
+            threads[i].join();
+            break;
+          } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+          }
+        }
+        if (threads[i].isAlive())
+          fail("thread seems to be hung");
+        else
+          assertTrue("hit unexpected Throwable", threads[i].error == null);
+      }
+
+      try {
+        writer.close(false);
+      } catch (IOException ioe) {
+      }
+
+      dir.close();
+    }
+  }
+
+  // Throws IOException during FieldsWriter.flushDocument and during DocumentsWriter.abort
+  private static class FailOnlyOnAbortOrFlush extends MockRAMDirectory.Failure {
+    public void eval(MockRAMDirectory dir)  throws IOException {
+      if (doFail) {
+        StackTraceElement[] trace = new Exception().getStackTrace();
+        for (int i = 0; i < trace.length; i++) {
+          if ("abort".equals(trace[i].getMethodName()) ||
+              "flushDocument".equals(trace[i].getMethodName()))
+            throw new IOException("now failing on purpose");
+        }
+      }
+    }
+  }
+
+  // Runs test, with one thread, using the specific failure
+  // to trigger an IOException
+  public void _testSingleThreadFailure(MockRAMDirectory.Failure failure) throws IOException {
+    MockRAMDirectory dir = new MockRAMDirectory();
+
+    IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer());
+    writer.setMaxBufferedDocs(2);
+    final Document doc = new Document();
+    doc.add(new Field("field", "aaa bbb ccc ddd eee fff ggg hhh iii jjj", Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
+
+    for(int i=0;i<6;i++)
+      writer.addDocument(doc);
+
+    dir.failOn(failure);
+    failure.setDoFail();
+    try {
+      writer.addDocument(doc);
+      writer.addDocument(doc);
+      fail("did not hit exception");
+    } catch (IOException ioe) {
+    }
+    failure.clearDoFail();
+    writer.addDocument(doc);
+    writer.close(false);
+  }
+
+  // Runs test, with multiple threads, using the specific
+  // failure to trigger an IOException
+  public void _testMultipleThreadsFailure(MockRAMDirectory.Failure failure) throws IOException {
+
+    int NUM_THREADS = 3;
+
+    for(int iter=0;iter<5;iter++) {
+      MockRAMDirectory dir = new MockRAMDirectory();
+      IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer());
+      ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
+      // We expect disk full exceptions in the merge threads
+      cms.setSuppressExceptions();
+      writer.setMergeScheduler(cms);
+      writer.setMaxBufferedDocs(2);
+      writer.setMergeFactor(4);
+
+      IndexerThread[] threads = new IndexerThread[NUM_THREADS];
+      boolean diskFull = false;
+
+      for(int i=0;i<NUM_THREADS;i++)
+        threads[i] = new IndexerThread(writer, true);
+
+      for(int i=0;i<NUM_THREADS;i++)
+        threads[i].start();
+
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      }
+
+      dir.failOn(failure);
+      failure.setDoFail();
+
+      for(int i=0;i<NUM_THREADS;i++) {
+        while(true) {
+          try {
+            threads[i].join();
+            break;
+          } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+          }
+        }
+        if (threads[i].isAlive())
+          fail("thread seems to be hung");
+        else
+          assertTrue("hit unexpected Throwable", threads[i].error == null);
+      }
+
+      try {
+        writer.close(false);
+      } catch (IOException ioe) {
+      }
+
+      dir.close();
+    }
+  }
+
+  // LUCENE-1130: make sure initial IOException, and then 2nd
+  // IOException during abort(), is OK:
+  public void testIOExceptionDuringAbort() throws IOException {
+    _testSingleThreadFailure(new FailOnlyOnAbortOrFlush());
+  }
+
+  // LUCENE-1130: make sure initial IOException, and then 2nd
+  // IOException during abort(), with multiple threads, is OK:
+  public void testIOExceptionDuringAbortWithThreads() throws IOException {
+    _testMultipleThreadsFailure(new FailOnlyOnAbortOrFlush());
+  }
+
+  // Throws IOException during DocumentsWriter.closeDocStore
+  private static class FailOnlyInCloseDocStore extends MockRAMDirectory.Failure {
+    public void eval(MockRAMDirectory dir)  throws IOException {
+      if (doFail) {
+        StackTraceElement[] trace = new Exception().getStackTrace();
+        for (int i = 0; i < trace.length; i++) {
+          if ("closeDocStore".equals(trace[i].getMethodName()))
+            throw new IOException("now failing on purpose");
+        }
+      }
+    }
+  }
+
+  // LUCENE-1130: test IOException in closeDocStore
+  public void testIOExceptionDuringCloseDocStore() throws IOException {
+    _testSingleThreadFailure(new FailOnlyInCloseDocStore());
+  }
+
+  // LUCENE-1130: test IOException in closeDocStore, with threads
+  public void testIOExceptionDuringCloseDocStoreWithThreads() throws IOException {
+    _testMultipleThreadsFailure(new FailOnlyInCloseDocStore());
+  }
+
+  // Throws IOException during DocumentsWriter.writeSegment
+  private static class FailOnlyInWriteSegment extends MockRAMDirectory.Failure {
+    public void eval(MockRAMDirectory dir)  throws IOException {
+      if (doFail) {
+        StackTraceElement[] trace = new Exception().getStackTrace();
+        for (int i = 0; i < trace.length; i++) {
+          if ("writeSegment".equals(trace[i].getMethodName()))
+            throw new IOException("now failing on purpose");
+        }
+      }
+    }
+  }
+  // LUCENE-1130: test IOException in writeSegment
+  public void testIOExceptionDuringWriteSegment() throws IOException {
+    _testSingleThreadFailure(new FailOnlyInWriteSegment());
+  }
+
+  // LUCENE-1130: test IOException in writeSegment, with threads
+  public void testIOExceptionDuringWriteSegmentWithThreads() throws IOException {
+    _testMultipleThreadsFailure(new FailOnlyInWriteSegment());
+  }
 }

Modified: lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriterDelete.java?rev=611855&r1=611854&r2=611855&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriterDelete.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriterDelete.java Mon Jan 14 09:06:21 2008
@@ -19,7 +19,6 @@
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.lang.StackTraceElement;
 
 import org.apache.lucene.util.LuceneTestCase;
 
@@ -454,7 +453,7 @@
           String[] startFiles = dir.list();
           SegmentInfos infos = new SegmentInfos();
           infos.read(dir);
-          IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null);
+          new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null);
           String[] endFiles = dir.list();
 
           Arrays.sort(startFiles);
@@ -560,8 +559,19 @@
         }
         public void eval(MockRAMDirectory dir)  throws IOException {
           if (sawMaybe && !failed) {
-            failed = true;
-            throw new IOException("fail after applyDeletes");
+            boolean seen = false;
+            StackTraceElement[] trace = new Exception().getStackTrace();
+            for (int i = 0; i < trace.length; i++) {
+              if ("applyDeletes".equals(trace[i].getMethodName())) {
+                seen = true;
+                break;
+              }
+            }
+            if (!seen) {
+              // Only fail once we are no longer in applyDeletes
+              failed = true;
+              throw new IOException("fail after applyDeletes");
+            }
           }
           if (!failed) {
             StackTraceElement[] trace = new Exception().getStackTrace();
@@ -740,7 +750,7 @@
       String[] startFiles = dir.list();
       SegmentInfos infos = new SegmentInfos();
       infos.read(dir);
-      IndexFileDeleter d = new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null);
+      new IndexFileDeleter(dir, new KeepOnlyLastCommitDeletionPolicy(), infos, null, null);
       String[] endFiles = dir.list();
 
       if (!Arrays.equals(startFiles, endFiles)) {

Modified: lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMDirectory.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMDirectory.java?rev=611855&r1=611854&r2=611855&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMDirectory.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMDirectory.java Mon Jan 14 09:06:21 2008
@@ -117,7 +117,6 @@
   }
 
   void maybeThrowIOException() throws IOException {
-    maybeThrowDeterministicException();
     if (randomIOExceptionRate > 0.0) {
       int number = Math.abs(randomState.nextInt() % 1000);
       if (number < randomIOExceptionRate*1000) {
@@ -198,7 +197,7 @@
    * RAMOutputStream.BUFFER_SIZE (now 1024) bytes.
    */
 
-  final synchronized long getRecomputedActualSizeInBytes() {
+  public final synchronized long getRecomputedActualSizeInBytes() {
     long size = 0;
     Iterator it = fileMap.values().iterator();
     while (it.hasNext())
@@ -245,6 +244,16 @@
      * mock.failOn(failure.reset())
      */
     public Failure reset() { return this; }
+
+    protected boolean doFail;
+
+    public void setDoFail() {
+      doFail = true;
+    }
+
+    public void clearDoFail() {
+      doFail = false;
+    }
   }
 
   ArrayList failures;
@@ -253,7 +262,7 @@
    * add a Failure object to the list of objects to be evaluated
    * at every potential failure point
    */
-  public void failOn(Failure fail) {
+  synchronized public void failOn(Failure fail) {
     if (failures == null) {
       failures = new ArrayList();
     }
@@ -261,10 +270,10 @@
   }
 
   /**
-   * Itterate through the failures list, giving each object a
+   * Iterate through the failures list, giving each object a
    * chance to throw an IOE
    */
-  void maybeThrowDeterministicException() throws IOException {
+  synchronized void maybeThrowDeterministicException() throws IOException {
     if (failures != null) {
       for(int i = 0; i < failures.size(); i++) {
         ((Failure)failures.get(i)).eval(this);

Modified: lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMOutputStream.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMOutputStream.java?rev=611855&r1=611854&r2=611855&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMOutputStream.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMOutputStream.java Mon Jan 14 09:06:21 2008
@@ -18,7 +18,6 @@
  */
 
 import java.io.IOException;
-import java.util.Iterator;
 
 /**
  * Used by MockRAMDirectory to create an output stream that
@@ -50,6 +49,11 @@
     }
   }
 
+  public void flush() throws IOException {
+    dir.maybeThrowDeterministicException();
+    super.flush();
+  }
+
   public void writeByte(byte b) throws IOException {
     singleByte[0] = b;
     writeBytes(singleByte, 0, 1);
@@ -79,6 +83,8 @@
     } else {
       super.writeBytes(b, offset, len);
     }
+
+    dir.maybeThrowDeterministicException();
 
     if (first) {
       // Maybe throw random exception; only do this on first