You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-commits@lucene.apache.org by mi...@apache.org on 2007/12/04 23:29:43 UTC

svn commit: r601121 - in /lucene/java/trunk/src: java/org/apache/lucene/index/DocumentsWriter.java test/org/apache/lucene/index/TestIndexWriter.java

Author: mikemccand
Date: Tue Dec  4 14:29:42 2007
New Revision: 601121

URL: http://svn.apache.org/viewvc?rev=601121&view=rev
Log:
LUCENE-1072: make sure an exception raised in Tokenizer.next() leaves DocumentsWriter in OK state

Modified:
    lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=601121&r1=601120&r2=601121&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java Tue Dec  4 14:29:42 2007
@@ -389,6 +389,7 @@
         try {
           wait();
         } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
         }
       }
     }
@@ -521,6 +522,7 @@
     int maxTermHit;                       // Set to > 0 if this doc has a too-large term
 
     boolean doFlushAfter;
+    boolean abortOnExc;
 
     public ThreadState() {
       fieldDataArray = new FieldData[8];
@@ -558,6 +560,12 @@
      *  the ThreadState into the "real" stores. */
     public void writeDocument() throws IOException {
 
+      // If we hit an exception while appending to the
+      // stored fields or term vectors files, we have to
+      // abort because it means those files are possibly
+      // inconsistent.
+      abortOnExc = true;
+
       // Append stored fields to the real FieldsWriter:
       fieldsWriter.flushDocument(fdtLocal);
       fdtLocal.reset();
@@ -581,6 +589,7 @@
           tvfLocal.reset();
         }
       }
+      abortOnExc = false;
 
       // Append norms for the fields we saw:
       for(int i=0;i<numFieldData;i++) {
@@ -604,6 +613,7 @@
     /** Initializes shared state for this new document */
     void init(Document doc, int docID) throws IOException {
 
+      abortOnExc = false;
       this.docID = docID;
       docBoost = doc.getBoost();
       numStoredFields = 0;
@@ -1177,6 +1187,7 @@
       boolean doVectors;
       boolean doVectorPositions;
       boolean doVectorOffsets;
+      boolean postingsCompacted;
 
       int numPostings;
       
@@ -1197,8 +1208,11 @@
       }
 
       void resetPostingArrays() {
+        if (!postingsCompacted)
+          compactPostings();
         recyclePostings(this.postingsHash, numPostings);
         Arrays.fill(postingsHash, 0, postingsHash.length, null);
+        postingsCompacted = false;
         numPostings = 0;
       }
 
@@ -1217,15 +1231,20 @@
         return fieldInfo.name.compareTo(((FieldData) o).fieldInfo.name);
       }
 
-      /** Collapse the hash table & sort in-place. */
-      public Posting[] sortPostings() {
+      private void compactPostings() {
         int upto = 0;
         for(int i=0;i<postingsHashSize;i++)
           if (postingsHash[i] != null)
             postingsHash[upto++] = postingsHash[i];
 
         assert upto == numPostings;
-        doPostingSort(postingsHash, upto);
+        postingsCompacted = true;
+      }
+
+      /** Collapse the hash table & sort in-place. */
+      public Posting[] sortPostings() {
+        compactPostings();
+        doPostingSort(postingsHash, numPostings);
         return postingsHash;
       }
 
@@ -1241,26 +1260,29 @@
         final int limit = fieldCount;
         final Fieldable[] docFieldsFinal = docFields;
 
-        // Walk through all occurrences in this doc for this field:
-        for(int j=0;j<limit;j++) {
-          Fieldable field = docFieldsFinal[j];
-
-          if (field.isIndexed())
-            invertField(field, analyzer, maxFieldLength);
-
-          if (field.isStored())
-            localFieldsWriter.writeField(fieldInfo, field);
-
-          docFieldsFinal[j] = null;
-        }
-
-        if (postingsVectorsUpto > 0) {
-          // Add term vectors for this field
-          writeVectors(fieldInfo);
-          if (postingsVectorsUpto > maxPostingsVectors)
-            maxPostingsVectors = postingsVectorsUpto;
-          postingsVectorsUpto = 0;
-          vectorsPool.reset();
+        // Walk through all occurrences in this doc for this
+        // field:
+        try {
+          for(int j=0;j<limit;j++) {
+            Fieldable field = docFieldsFinal[j];
+
+            if (field.isIndexed())
+              invertField(field, analyzer, maxFieldLength);
+
+            if (field.isStored())
+              localFieldsWriter.writeField(fieldInfo, field);
+
+            docFieldsFinal[j] = null;
+          }
+        } finally {
+          if (postingsVectorsUpto > 0) {
+            // Add term vectors for this field
+            writeVectors(fieldInfo);
+            if (postingsVectorsUpto > maxPostingsVectors)
+              maxPostingsVectors = postingsVectorsUpto;
+            postingsVectorsUpto = 0;
+            vectorsPool.reset();
+          }
         }
       }
 
@@ -1406,6 +1428,8 @@
 
         int hashPos = code & postingsHashMask;
 
+        assert !postingsCompacted;
+
         // Locate Posting in hash
         p = postingsHash[hashPos];
 
@@ -1422,6 +1446,12 @@
         
         final int proxCode;
 
+        // If we hit an exception below, it's possible the
+        // posting list or term vectors data will be
+        // partially written and thus inconsistent if
+        // flushed, so we have to abort:
+        abortOnExc = true;
+
         if (p != null) {       // term seen since last flush
 
           if (docID != p.lastDocID) { // term not yet seen in this doc
@@ -1492,6 +1522,7 @@
               // Just skip this term; we will throw an
               // exception after processing all accepted
               // terms in the doc
+              abortOnExc = false;
               return;
             }
             charPool.nextBuffer();
@@ -1572,6 +1603,8 @@
           vector.lastOffset = offsetEnd;
           vector.offsetUpto = offsetUpto + (vector.offsetUpto & BYTE_BLOCK_NOT_MASK);
         }
+
+        abortOnExc = false;
       }
 
       /** Called when postings hash is too small (> 50%
@@ -2142,7 +2175,9 @@
     while(!state.isIdle || pauseThreads != 0 || flushPending)
       try {
         wait();
-      } catch (InterruptedException e) {}
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
 
     if (segment == null)
       segment = writer.newSegmentName();
@@ -2165,22 +2200,24 @@
     boolean success = false;
     try {
       state.init(doc, nextDocID++);
+
+      if (delTerm != null) {
+        addDeleteTerm(delTerm, state.docID);
+        if (!state.doFlushAfter)
+          state.doFlushAfter = timeToFlushDeletes();
+      }
+
       success = true;
     } finally {
       if (!success) {
-        state.isIdle = true;
-        if (state.doFlushAfter) {
-          state.doFlushAfter = false;
-          flushPending = false;
+        synchronized(this) {
+          state.isIdle = true;
+          if (state.doFlushAfter) {
+            state.doFlushAfter = false;
+            flushPending = false;
+          }
+          notifyAll();
         }
-        abort();
-      }
-    }
-
-    if (delTerm != null) {
-      addDeleteTerm(delTerm, state.docID);
-      if (!state.doFlushAfter) {
-        state.doFlushAfter = timeToFlushDeletes();
       }
     }
 
@@ -2208,15 +2245,22 @@
     int maxTermHit;
     try {
       // This call is not synchronized and does all the work
-      state.processDocument(analyzer);
-      // This call synchronized but fast
-      maxTermHit = state.maxTermHit;
-      finishDocument(state);
+      try {
+        state.processDocument(analyzer);
+      } finally {
+        maxTermHit = state.maxTermHit;
+        // This call synchronized but fast
+        finishDocument(state);
+      }
       success = true;
     } finally {
       if (!success) {
-        state.isIdle = true;
-        abort();
+        synchronized(this) {
+          state.isIdle = true;
+          if (state.abortOnExc)
+            abort();
+          notifyAll();
+        }
       }
     }
 
@@ -2246,7 +2290,9 @@
     while(pauseThreads != 0 || flushPending)
       try {
         wait();
-      } catch (InterruptedException e) {}
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
       for (int i = 0; i < terms.length; i++)
         addDeleteTerm(terms[i], numDocsInRAM);
     return timeToFlushDeletes();
@@ -2256,7 +2302,9 @@
     while(pauseThreads != 0 || flushPending)
       try {
         wait();
-      } catch (InterruptedException e) {}
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
     addDeleteTerm(term, numDocsInRAM);
     return timeToFlushDeletes();
   }

Modified: lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java?rev=601121&r1=601120&r2=601121&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java Tue Dec  4 14:29:42 2007
@@ -18,6 +18,7 @@
  */
 
 import java.io.IOException;
+import java.io.Reader;
 import java.io.File;
 import java.util.Arrays;
 import java.util.Random;
@@ -25,7 +26,12 @@
 import org.apache.lucene.util.LuceneTestCase;
 
 import org.apache.lucene.analysis.WhitespaceAnalyzer;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.TokenFilter;
+import org.apache.lucene.analysis.TokenStream;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.analysis.standard.StandardTokenizer;
+import org.apache.lucene.analysis.Token;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.search.IndexSearcher;
@@ -1735,5 +1741,109 @@
     for(int i=0;i<177;i++)
       iw.addDocument(document);
     iw.close();
+  }
+
+  // LUCENE-1072
+  public void testExceptionFromTokenStream() throws IOException {
+    RAMDirectory dir = new MockRAMDirectory();
+    IndexWriter writer = new IndexWriter(dir, new Analyzer() {
+
+      public TokenStream tokenStream(String fieldName, Reader reader) {
+        return new TokenFilter(new StandardTokenizer(reader)) {
+          private int count = 0;
+
+          public Token next() throws IOException {
+            if (count++ == 5) {
+              throw new IOException();
+            }
+            return input.next();
+          }
+        };
+      }
+
+    }, true);
+
+    Document doc = new Document();
+    String contents = "aa bb cc dd ee ff gg hh ii jj kk";
+    doc.add(new Field("content", contents, Field.Store.NO,
+        Field.Index.TOKENIZED));
+    try {
+      writer.addDocument(doc);
+      fail("did not hit expected exception");
+    } catch (Exception e) {
+    }
+
+    // Make sure we can add another normal document
+    doc = new Document();
+    doc.add(new Field("content", "aa bb cc dd", Field.Store.NO,
+        Field.Index.TOKENIZED));
+    writer.addDocument(doc);
+
+    // Make sure we can add another normal document
+    doc = new Document();
+    doc.add(new Field("content", "aa bb cc dd", Field.Store.NO,
+        Field.Index.TOKENIZED));
+    writer.addDocument(doc);
+
+    writer.close();
+    IndexReader reader = IndexReader.open(dir);
+    assertEquals(reader.docFreq(new Term("content", "aa")), 3);
+    assertEquals(reader.docFreq(new Term("content", "gg")), 0);
+    reader.close();
+    dir.close();
+  }
+
+  private static class FailOnlyOnFlush extends MockRAMDirectory.Failure {
+    boolean doFail = false;
+    int count;
+
+    public void setDoFail() {
+      this.doFail = true;
+    }
+    public void clearDoFail() {
+      this.doFail = false;
+    }
+
+    public void eval(MockRAMDirectory dir)  throws IOException {
+      if (doFail) {
+        StackTraceElement[] trace = new Exception().getStackTrace();
+        for (int i = 0; i < trace.length; i++) {
+          if ("appendPostings".equals(trace[i].getMethodName()) && count++ == 30) {
+            doFail = false;
+            throw new IOException("now failing during flush");
+          }
+        }
+      }
+    }
+  }
+
+  // LUCENE-1072: make sure an errant exception on flushing
+  // one segment only takes out those docs in that one flush
+  public void testDocumentsWriterAbort() throws IOException {
+    MockRAMDirectory dir = new MockRAMDirectory();
+    FailOnlyOnFlush failure = new FailOnlyOnFlush();
+    failure.setDoFail();
+    dir.failOn(failure);
+
+    IndexWriter writer = new IndexWriter(dir, new WhitespaceAnalyzer());
+    writer.setMaxBufferedDocs(2);
+    Document doc = new Document();
+    String contents = "aa bb cc dd ee ff gg hh ii jj kk";
+    doc.add(new Field("content", contents, Field.Store.NO,
+        Field.Index.TOKENIZED));
+    boolean hitError = false;
+    for(int i=0;i<200;i++) {
+      try {
+        writer.addDocument(doc);
+      } catch (IOException ioe) {
+        // only one flush should fail:
+        assertFalse(hitError);
+        hitError = true;
+      }
+    }
+    assertTrue(hitError);
+    writer.close();
+    IndexReader reader = IndexReader.open(dir);
+    assertEquals(198, reader.docFreq(new Term("content", "aa")));
   }
 }